
Do you take into account unit testing as not sufficient resolution for maintaining the appliance’s reliability and stability? Are you afraid that someway or someplace there’s a potential bug hiding within the assumption that unit assessments ought to cowl all circumstances? And in addition is mocking Kafka not sufficient for challenge necessities? If even one reply is ‘sure’, then welcome to a pleasant and simple information on how you can arrange Integration Assessments for Kafka utilizing TestContainers and Embedded Kafka for Spring!
What’s TestContainers?
TestContainers is an open-source Java library specialised in offering all wanted options for the combination and testing of exterior sources. It implies that we’re in a position to mimic an precise database, net server, and even an occasion bus atmosphere and deal with that as a dependable place to check app performance. All these fancy options are hooked into docker pictures, outlined as containers. Do we have to take a look at the database layer with precise MongoDB? No worries, we’ve a take a look at container for that. We cannot additionally neglect about UI assessments – Selenium Container will do something that we really need.
In our case, we are going to concentrate on Kafka Testcontainer.
What’s Embedded Kafka?
Because the identify suggests, we’re going to take care of an in-memory Kafka occasion, prepared for use as a standard dealer with full performance. It permits us to work with producers and shoppers, as normal, making our integration assessments light-weight.
Earlier than we begin
The idea for our take a look at is easy – I want to take a look at Kafka shopper and producer utilizing two totally different approaches and verify how we are able to make the most of them in precise circumstances.
Kafka Messages are serialized utilizing Avro schemas.
Embedded Kafka – Producer Take a look at
The idea is simple – let’s create a easy challenge with the controller, which invokes a service methodology to push a Kafka Avro serialized message.
Dependencies:
dependencies
implementation "org.apache.avro:avro:1.10.1"
implementation("io.confluent:kafka-avro-serializer:6.1.0")
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.kafka:spring-kafka'
implementation('org.springframework.cloud:spring-cloud-stream:3.1.1')
implementation('org.springframework.cloud:spring-cloud-stream-binder-kafka:3.1.1')
implementation('org.springframework.boot:spring-boot-starter-web:2.4.3')
implementation 'org.projectlombok:lombok:1.18.16'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.cloud:spring-cloud-stream-test-support:3.1.1')
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
Additionally value mentioning implausible plugin for Avro. Right here plugins part:
plugins
id 'org.springframework.boot' model '2.6.8'
id 'io.spring.dependency-management' model '1.0.11.RELEASE'
id 'java'
id "com.github.davidmc24.gradle.plugin.avro" model "1.3.0"
Avro Plugin helps schema auto-generating. This can be a must-have.
Hyperlink to plugin: https://github.com/davidmc24/gradle-avro-plugin
Now let’s outline the Avro schema:
"namespace": "com.grapeup.myawesome.myawesomeproducer",
"sort": "file",
"identify": "RegisterRequest",
"fields": [
"name": "id", "type": "long",
"name": "address", "type": "string", "avro.java.string": "String"
]
Our ProducerService might be centered solely on sending messages to Kafka utilizing a template, nothing thrilling about that half. Important performance may be executed simply utilizing this line:
ListenableFuture<SendResult<String, RegisterRequest>> future = this.kafkaTemplate.ship("register-request", kafkaMessage);
We are able to’t neglect about take a look at properties:
spring:
foremost:
allow-bean-definition-overriding: true
kafka:
shopper:
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.frequent.serialization.StringDeserializer
value-deserializer: com.grapeup.myawesome.myawesomeconsumer.frequent.CustomKafkaAvroDeserializer
producer:
auto.register.schemas: true
key-serializer: org.apache.kafka.frequent.serialization.StringSerializer
value-serializer: com.grapeup.myawesome.myawesomeconsumer.frequent.CustomKafkaAvroSerializer
properties:
particular.avro.reader: true
As we see within the talked about take a look at properties, we declare a customized deserializer/serializer for KafkaMessages. It’s extremely beneficial to make use of Kafka with Avro – don’t let JSONs preserve object construction, let’s use civilized mapper and object definition like Avro.
Serializer:
public class CustomKafkaAvroSerializer extends KafkaAvroSerializer
public CustomKafkaAvroSerializer()
tremendous();
tremendous.schemaRegistry = new MockSchemaRegistryClient();
public CustomKafkaAvroSerializer(SchemaRegistryClient consumer)
tremendous(new MockSchemaRegistryClient());
public CustomKafkaAvroSerializer(SchemaRegistryClient consumer, Map<String, ?> props)
tremendous(new MockSchemaRegistryClient(), props);
Deserializer:
public class CustomKafkaAvroSerializer extends KafkaAvroSerializer
public CustomKafkaAvroSerializer()
tremendous();
tremendous.schemaRegistry = new MockSchemaRegistryClient();
public CustomKafkaAvroSerializer(SchemaRegistryClient consumer)
tremendous(new MockSchemaRegistryClient());
public CustomKafkaAvroSerializer(SchemaRegistryClient consumer, Map<String, ?> props)
tremendous(new MockSchemaRegistryClient(), props);
And we’ve every thing to start out writing our take a look at.
@ExtendWith(SpringExtension.class)
@SpringBootTest
@AutoConfigureMockMvc
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ActiveProfiles("take a look at")
@EmbeddedKafka(partitions = 1, subjects = "register-request")
class ProducerControllerTest {
All we have to do is add @EmbeddedKafka annotation with listed subjects and partitions. Software Context will boot Kafka Dealer with offered configuration identical to that. Understand that @TestInstance must be used with particular consideration. Lifecycle.PER_CLASS will keep away from creating the identical objects/context for every take a look at methodology. Value checking if assessments are too time-consuming.
Shopper<String, RegisterRequest> consumerServiceTest;
@BeforeEach
void setUp()
DefaultKafkaConsumerFactory<String, RegisterRequest> shopper = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties();
consumerServiceTest = shopper.createConsumer();
consumerServiceTest.subscribe(Collections.singletonList(TOPIC_NAME));
Right here we are able to declare the take a look at shopper, based mostly on the Avro schema return sort. All Kafka properties are already offered within the .yml file. That shopper might be used as a verify if the producer really pushed a message.
Right here is the precise take a look at methodology:
@Take a look at
void whenValidInput_therReturns200() throws Exception
RegisterRequestDto request = RegisterRequestDto.builder()
.id(12)
.deal with("tempAddress")
.construct();
mockMvc.carry out(
publish("/register-request")
.contentType("software/json")
.content material(objectMapper.writeValueAsBytes(request)))
.andExpect(standing().isOk());
ConsumerRecord<String, RegisterRequest> consumedRegisterRequest = KafkaTestUtils.getSingleRecord(consumerServiceTest, TOPIC_NAME);
RegisterRequest valueReceived = consumedRegisterRequest.worth();
assertEquals(12, valueReceived.getId());
assertEquals("tempAddress", valueReceived.getAddress());
To start with, we use MockMvc to carry out an motion on our endpoint. That endpoint makes use of ProducerService to push messages to Kafka. KafkaConsumer is used to confirm if the producer labored as anticipated. And that’s it – we’ve a completely working take a look at with embedded Kafka.
Take a look at Containers – Shopper Take a look at
TestContainers are nothing else like unbiased docker pictures prepared for being dockerized. The next take a look at state of affairs might be enhanced by a MongoDB picture. Why not preserve our information within the database proper after something occurred in Kafka stream?
Dependencies should not a lot totally different than within the earlier instance. The next steps are wanted for take a look at containers:
testImplementation 'org.testcontainers:junit-jupiter'
testImplementation 'org.testcontainers:kafka'
testImplementation 'org.testcontainers:mongodb'
ext
set('testcontainersVersion', "1.17.1")
dependencyManagement
imports
mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"
Let’s focus now on the Shopper half. The take a look at case might be easy – one shopper service might be answerable for getting the Kafka message and storing the parsed payload within the MongoDB assortment. All that we have to learn about KafkaListeners, for now, is that annotation:
@KafkaListener(subjects = "register-request")
By the performance of the annotation processor, KafkaListenerContainerFactory might be accountable to create a listener on our methodology. From this second our methodology will react to any upcoming Kafka message with the talked about matter.
Avro serializer and deserializer configs are the identical as within the earlier take a look at.
Relating to TestContainer, we should always begin with the next annotations:
@SpringBootTest
@ActiveProfiles("take a look at")
@Testcontainers
public class AbstractIntegrationTest {
Throughout startup, all configured TestContainers modules might be activated. It means that we are going to get entry to the total working atmosphere of the chosen supply. As instance:
@Autowired
personal KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Container
public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
@Container
static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.4.2").withExposedPorts(27017);
Because of booting the take a look at, we are able to anticipate two docker containers to start out with the offered configuration.

What is absolutely necessary for the mongo container – it offers us full entry to the database utilizing only a easy connection uri. With such a function, we’re in a position to have a look what’s the present state in our collections, even throughout debug mode and ready breakpoints.
Have a look additionally on the Ryuk container – it really works like overwatch and checks if our containers have began accurately.
And right here is the final a part of the configuration:
@DynamicPropertySource
static void dataSourceProperties(DynamicPropertyRegistry registry)
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
registry.add("spring.kafka.shopper.bootstrap-servers", kafkaContainer::getBootstrapServers);
registry.add("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers);
registry.add("spring.information.mongodb.uri", mongoDBContainer::getReplicaSetUrl);
static
kafkaContainer.begin();
mongoDBContainer.begin();
mongoDBContainer.waitingFor(Wait.forListeningPort()
.withStartupTimeout(Length.ofSeconds(180L)));
@BeforeTestClass
public void beforeTest()
kafkaListenerEndpointRegistry.getListenerContainers().forEach(
messageListenerContainer ->
ContainerTestUtils
.waitForAssignment(messageListenerContainer, 1);
);
@AfterAll
static void tearDown()
kafkaContainer.cease();
mongoDBContainer.cease();
DynamicPropertySource offers us the choice to set all wanted atmosphere variables in the course of the take a look at lifecycle. Strongly wanted for any config functions for TestContainers. Additionally, beforeTestClass kafkaListenerEndpointRegistry waits for every listener to get anticipated partitions throughout container startup.
And the final a part of the Kafka take a look at containers journey – the primary physique of the take a look at:
@Take a look at
public void containerStartsAndPublicPortIsAvailable() throws Exception
writeToTopic("register-request", RegisterRequest.newBuilder().setId(123).setAddress("dummyAddress").construct());
//Look ahead to KafkaListener
TimeUnit.SECONDS.sleep(5);
Assertions.assertEquals(1, taxiRepository.findAll().measurement());
personal KafkaProducer<String, RegisterRequest> createProducer()
return new KafkaProducer<>(kafkaProperties.buildProducerProperties());
personal void writeToTopic(String topicName, RegisterRequest... registerRequests)
attempt (KafkaProducer<String, RegisterRequest> producer = createProducer())
Arrays.stream(registerRequests)
.forEach(registerRequest ->
ProducerRecord<String, RegisterRequest> file = new ProducerRecord<>(topicName, registerRequest);
producer.ship(file);
);
The customized producer is answerable for writing our message to KafkaBroker. Additionally, it is strongly recommended to present a while for shoppers to deal with messages correctly. As we see, the message was not simply consumed by the listener, but additionally saved within the MongoDB assortment.
Conclusions
As we are able to see, present options for integration assessments are fairly straightforward to implement and preserve in initiatives. There is no such thing as a level in maintaining simply unit assessments and relying on all strains coated as an indication of code/logic high quality. Now the query is, ought to we use an Embedded resolution or TestContainers? I recommend initially specializing in the phrase “Embedded”. As an ideal integration take a look at, we wish to get an virtually very best copy of the manufacturing atmosphere with all properties/options included. In-memory options are good, however principally, not sufficient for giant enterprise initiatives. Undoubtedly, the benefit of Embedded companies is the simple method to implement such assessments and preserve configuration, simply when something occurs in reminiscence.
TestContainers on the first sight would possibly seem like overkill, however they offer us an important function, which is a separate atmosphere. We don’t should even depend on present docker pictures – if we wish we are able to use customized ones. This can be a big enchancment for potential take a look at eventualities.
What about Jenkins? There is no such thing as a cause to be afraid additionally to make use of TestContainers in Jenkins. I firmly advocate checking TestContainers documentation on how simply we are able to arrange the configuration for Jenkins brokers.
To sum up – if there is no such thing as a blocker or any undesirable situation for utilizing TestContainers, then don’t hesitate. It’s all the time good to maintain all companies managed and secured with integration take a look at contracts.