How to Set Up Kafka Integration Test – Grape Up


Do you look at device tests as not more than enough alternative for holding the application’s dependability and security? Are you concerned that by some means or somewhere there is a possible bug hiding in the assumption that device exams need to address all cases? And also is mocking Kafka not plenty of for task specifications? If even a person reply is  ‘yes’, then welcome to a wonderful and simple guide on how to established up Integration Tests for Kafka making use of TestContainers and Embedded Kafka for Spring!

What is TestContainers?

TestContainers is an open-resource Java library specialised in delivering all required methods for the integration and tests of exterior resources. It means that we are equipped to mimic an precise database, web server, or even an party bus setting and address that as a responsible location to test application performance. All these fancy capabilities are hooked into docker photos, defined as containers. Do we need to have to take a look at the databases layer with genuine MongoDB? No concerns, we have a test container for that. We can not also forget about UI exams – Selenium Container will do anything that we really require.
In our case, we will focus on Kafka Testcontainer.

What is Embedded Kafka?

As the identify suggests, we are going to offer with an in-memory Kafka instance, prepared to be made use of as a regular broker with complete features. It allows us to function with producers and shoppers, as normal, earning our integration tests lightweight. 

Just before we start off

The principle for our examination is basic – I would like to check Kafka purchaser and producer applying two diverse ways and check how we can benefit from them in actual conditions. 

Kafka Messages are serialized using Avro schemas.

Embedded Kafka – Producer Take a look at

The concept is effortless – let’s build a basic challenge with the controller, which invokes a company approach to press a Kafka Avro serialized information.


implementation "org.apache.avro:avro:1.10.1"
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.kafka:spring-kafka'

implementation 'org.projectlombok:lombok:1.18.16'

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'

Also value mentioning superb plugin for Avro. Here plugins segment:

id 'org.springframework.boot' version '2.6.8'
id 'io.spring.dependency-management' model '1..11.RELEASE'
id 'java'
id "com.github.davidmc24.gradle.plugin.avro" edition "1.3."

Avro Plugin supports schema vehicle-building. This is a need to-have.

Hyperlink to plugin:

Now let’s determine the Avro schema:

  "namespace": "com.grapeup.myawesome.myawesomeproducer",
  "style": "record",
  "title": "RegisterRequest",
  "fields": [
    "name": "id", "type": "long",
    "name": "address", "type": "string", "": "String"


Our ProducerService will be focused only on sending messages to Kafka applying a template, nothing remarkable about that component. Major performance can be carried out just using this line:

ListenableFuture> long term = this.kafkaTemplate.send("sign up-request", kafkaMessage)

We can not forget about about take a look at houses:

    let-bean-definition-overriding: true
      group-id: team_id
      automobile-offset-reset: earliest
      key-deserializer: org.apache.kafka.popular.serialization.StringDeserializer
      price-deserializer: com.grapeup.myawesome.myawesomeconsumer.frequent.CustomKafkaAvroDeserializer
      car.sign up.schemas: correct
      crucial-serializer: org.apache.kafka.prevalent.serialization.StringSerializer
      benefit-serializer: com.grapeup.myawesome.myawesomeconsumer.frequent.CustomKafkaAvroSerializer
      certain.avro.reader: true

As we see in the pointed out take a look at homes, we declare a custom deserializer/serializer for KafkaMessages. It is remarkably advisable to use Kafka with Avro – really do not let JSONs preserve item framework, let us use civilized mapper and object definition like Avro.


public course CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    community CustomKafkaAvroSerializer() 
        tremendous.schemaRegistry = new MockSchemaRegistryClient()

    public CustomKafkaAvroSerializer(SchemaRegistryClient consumer) 
        super(new MockSchemaRegistryClient())

    public CustomKafkaAvroSerializer(SchemaRegistryClient customer, Map props) 
        super(new MockSchemaRegistryClient(), props)


public course CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    community CustomKafkaAvroSerializer() 
        super.schemaRegistry = new MockSchemaRegistryClient()

    public CustomKafkaAvroSerializer(SchemaRegistryClient client) 
        super(new MockSchemaRegistryClient())

    public CustomKafkaAvroSerializer(SchemaRegistryClient shopper, Map props) 
        super(new MockSchemaRegistryClient(), props)

And we have anything to start producing our test.

@TestInstance(TestInstance.Lifecycle.For every_Course)
@EmbeddedKafka(partitions = 1, subject areas = "sign up-request")
course ProducerControllerTest {

All we will need to do is add @EmbeddedKafka annotation with listed subjects and partitions. Software Context will boot Kafka Broker with presented configuration just like that. Hold in thoughts that @TestInstance should really be utilized with exclusive consideration. Lifecycle.For every_Course will prevent developing the exact same objects/context for every single examination technique. Worthy of checking if tests are way too time-consuming.

Purchaser consumerServiceTest
void Set up() 
DefaultKafkaConsumerFactory consumer = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()

consumerServiceTest = customer.createConsumer()
consumerServiceTest.subscribe(Collections.singletonList(Subject matter_Title))

Listed here we can declare the take a look at consumer, based on the Avro schema return sort. All Kafka homes are currently furnished in the .yml file. That consumer will be used as a examine if the producer truly pushed a information.

Below is the precise take a look at technique:

void whenValidInput_therReturns200() throws Exception 
        RegisterRequestDto ask for = RegisterRequestDto.builder()
                .deal with("tempAddress")

        mockMvc.carry out(
                post("/sign up-request")
                      .articles(objectMapper.writeValueAsBytes(ask for)))

      ConsumerRecord consumedRegisterRequest =  KafkaTestUtils.getSingleRecord(consumerServiceTest, Topic_Identify)

        RegisterRequest valueReceived = consumedRegisterRequest.value()

        assertEquals(12, valueReceived.getId())
        assertEquals("tempAddress", valueReceived.getAddress())

Initial of all, we use MockMvc to perform an action on our endpoint. That endpoint employs ProducerService to force messages to Kafka. KafkaConsumer is utilised to confirm if the producer worked as expected. And that is it – we have a absolutely doing work take a look at with embedded Kafka.

Examination Containers – Buyer Examination

TestContainers are nothing else like independent docker pictures ready for staying dockerized. The adhering to test state of affairs will be enhanced by a MongoDB image. Why not keep our knowledge in the database ideal following anything at all occurred in Kafka circulation?

Dependencies are not considerably different than in the prior example. The following ways are needed for check containers:

testImplementation 'org.testcontainers:junit-jupiter'
testImplementation 'org.testcontainers:kafka'
testImplementation 'org.testcontainers:mongodb'

established('testcontainersVersion', "1.17.1")

mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"

Let us aim now on the Consumer component. The check scenario will be very simple – 1 purchaser service will be responsible for acquiring the Kafka information and storing the parsed payload in the MongoDB assortment. All that we have to have to know about KafkaListeners, for now, is that annotation:

@KafkaListener(subjects = "sign up-request")

By the performance of the annotation processor, KafkaListenerContainerFactory will be dependable to produce a listener on our process. From this minute our method will react to any future Kafka concept with the mentioned subject matter.

Avro serializer and deserializer configs are the very same as in the past exam.

Regarding TestContainer, we must start out with the next annotations:

general public class AbstractIntegrationTest {

For the duration of startup, all configured TestContainers modules will be activated. It signifies that we will get entry to the total functioning surroundings of the picked source. As example:

private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry

general public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))

static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.4.2").withExposedPorts(27017)

As a outcome of booting the examination, we can count on two docker containers to start off with the provided configuration.

What is definitely vital for the mongo container – it provides us comprehensive obtain to the databases utilizing just a easy connection uri. With such a element, we are equipped to consider a search what is the recent condition in our collections, even all through debug method and organized breakpoints.
Take a glance also at the Ryuk container – it is effective like overwatch and checks if our containers have commenced correctly.

And listed here is the very last section of the configuration:

static void dataSourceProperties(DynamicPropertyRegistry registry) 
   registry.insert("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.increase("spring.kafka.purchaser.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.add("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.include("spring.details.mongodb.uri", mongoDBContainer::getReplicaSetUrl)



public void beforeTest() 

           messageListenerContainer -> 
                       .waitForAssignment(messageListenerContainer, 1)


static void tearDown() 

DynamicPropertySource gives us the alternative to established all essential atmosphere variables through the test lifecycle. Strongly necessary for any config needs for TestContainers. Also, beforeTestClass kafkaListenerEndpointRegistry waits for each listener to get anticipated partitions all through container startup.

And the past component of the Kafka check containers journey – the major body of the examination:

public void containerStartsAndPublicPortIsAvailable() throws Exception 
   writeToTopic("sign up-ask for", RegisterRequest.newBuilder().setId(123).setAddress("dummyAddress").construct())

   //Wait for KafkaListener
   Assertions.assertEquals(1, taxiRepository.findAll().size())

personal KafkaProducer createProducer() 
   return new KafkaProducer<>(kafkaProperties.buildProducerProperties())

non-public void writeToTopic(String topicName, RegisterRequest... registerRequests) 

   try (KafkaProducer producer = createProducer())
               .forEach(registerRequest -> 
                           ProducerRecord history = new ProducerRecord<>(topicName, registerRequest)

The tailor made producer is responsible for producing our information to KafkaBroker. Also, it is proposed to give some time for individuals to deal with messages adequately. As we see, the information was not just consumed by the listener, but also saved in the MongoDB collection.


As we can see, existing methods for integration assessments are fairly uncomplicated to put into action and retain in jobs. There is no point in holding just unit exams and counting on all strains covered as a indication of code/logic top quality. Now the concern is, really should we use an Embedded remedy or TestContainers? I recommend initially of all concentrating on the word “Embedded”. As a great integration test, we want to get an just about ideal duplicate of the generation ecosystem with all attributes/features bundled. In-memory solutions are superior, but primarily, not plenty of for massive business jobs. Absolutely, the benefit of Embedded companies is the straightforward way to carry out such exams and keep configuration, just when just about anything takes place in memory.
TestContainers at the initial sight may possibly seem like overkill, but they give us the most critical element, which is a independent surroundings. We never have to even count on present docker images – if we want we can use custom made types. This is a huge enhancement for prospective take a look at scenarios.
What about Jenkins? There is no rationale to be afraid also to use TestContainers in Jenkins. I firmly advise examining TestContainers documentation on how simply we can established up the configuration for Jenkins agents.
To sum up – if there is no blocker or any unwelcome situation for working with TestContainers, then do not hesitate. It is often fantastic to preserve all solutions managed and secured with integration exam contracts.


Source url