kafkata / embedded-kafka

In-memory Kafka Cluster (including ZooKeeper) to run your tests against.

GitHub

embedded-kafka

Build Status Codacy Badge Maven Central

This library enables you to run a kafka cluster (v2.3.1) with an accompanying zookeeper (v3.5.6) in memory. You can use it e.g. in integration tests when needing to assert the correct behaviour of your kafka powered application. The main reason for it's creation though was to have an easy way of providing a kafka cluster in the Kafkatas (coming soon).

The original code was taken from the kafka-streams-examples repository.

how to

create a kafka cluster

Add the following dependency to your build.sbt file: "com.kafkata" %% "embedded-kafka" % "<current version>" % Test.

After you've done that, you will be able to create a kafka cluster in your tests:

    import com.kafkata.embedded.kafka._
    
    val kafkaCluster = new KafkaCluster()
    try {
      val brokerList = kafkaCluster.start()

      // Your test code here
    } finally {
      kafkaCluster.close()
    }

Simply instantiate com.kafkata.emabbed.kafka.KafkaCluster with optional additional broker properties and start it up. The function will return the list of brokers in the cluster (e.g. 127.0.0.1:59576). The port of the brokers is randomly chosen at startup and cannot be set explicitly for now.

The list of available Properties along with their documentation can be found in the kafka.server.KafkaConfig object. The default values are specified in kafka.server.Defaults.

As you can see in the code snippet the cluster needs to be closed once the test has finished. Please make sure to always close the cluster as otherwise resources might not be freed. Please note: A cluster that has been closed cannot be restarted.

If you need to shutdown the cluster and restart it you may do so by calling the stop() function followed by another invocation of the start() function. The cluster will keep its ports and any previously created topics will still be available.

manage topics

In order to to anything meaningful with the kafka cluster you will need to create topics. The class com.kafkata.embedded.kafka.KafkaTopicUtils lets you create, delete and list topics:

  def listTopicNames(): List[String]
  def createTopic(topic: String, partitions: Int, replication: Short, topicConfig: Map[String, String]): Unit
  def deleteTopic(topic: String): Boolean 

limitations

  • Ports cannot be set explicitly: The ports of kafka brokers and zookeeper are chosen randomly at startup. It is currently not possible to choose a specific port.
  • Only one broker in cluster: The kafka cluster is started with only one broker and there is no option to create a multi node cluster.