In this tutorial, I will guide you through setting up a Kafka Consumer in a Spring Boot application. You will learn how to configure the consumer properties, create a listener class using the @KafkaListener
annotation, and how to process messages with the @KafkaHandler
annotation.
Let’s start with the configuration.
If you are interested in video lessons then check my video course Apache Kafka for Event-Driven Spring Boot Microservices.
Kafka Consumer Configuration in application.properties
In a Spring Boot application, the application.properties
file is where you define settings for your application components. For Kafka Consumer, you will need to set properties that define the connection to the Kafka broker, the group ID, and other consumer settings like concurrency levels. Here is an example of what this configuration might look like:
server.port=0 spring.kafka.consumer.bootstrap-servers=localhost:9092,localhost:9094 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer consumer.group-id=product-created-events spring.kafka.consumer.properties.spring.json.trusted.packages=com.appsdeveloperblog.ws.core
Below I will explain each of this configuration property in details.
server.port
In your Spring Boot application’s application.properties
, the server port is a basic configuration that decides where your application runs. Without setting it, Spring Boot defaults to port 8080
. If this port is busy, your application won’t start. To avoid this, you can set the port to 0
, which tells Spring Boot to use any available port. This is useful when running multiple instances of the same service:
server.port=0
spring.kafka.consumer.bootstrap-servers
Now, let’s set up the Kafka consumer configurations. You’ll need to connect to Kafka, and that’s where bootstrap-servers
come in. This property lists the Kafka broker addresses that your consumer will connect to:
spring.kafka.consumer.bootstrap-servers=localhost:9092,localhost:9094
Here, localhost:9092
and localhost:9094
are the addresses of the Kafka brokers. Including more than one address helps your consumer stay connected if one broker is down.
key-deserializer and value-deserializer
Next are the key-deserializer
and value-deserializer
properties. Kafka messages have keys and values, which are sent as byte arrays. The deserializers convert these byte arrays back into objects:
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
StringDeserializer
is for the message keys, which are strings. JsonDeserializer
is for the message values, which will be in JSON format.
consumer.group-id
When you set up a Kafka consumer, you can make multiple instances of a Microservice work as a team to handle messages from a Kafka topic. This team is what Kafka calls a consumer group. Each consumer in the group reads messages from the topic, ensuring that, together, they process messages faster.
The consumer.group-id
property is how you name your team. All consumers with the same group-id
are in the same group. Here’s how you might set it in your application.properties
file:
consumer.group-id=product-created-events
In this case, product-created-events
is the name of your consumer group.
If you have multiple instances of your application, and you want them all to share the workload of processing messages, you give them all the same group-id
. Kafka does the rest, distributing the messages among the consumers in the group.
If every consumer had a different group-id
, they wouldn’t be working as a team. Instead, each one would get all the messages, leading to duplicate processing.
spring.kafka.consumer.properties.spring.json.trusted.packages
This configuration property is quite important for security when you’re deserializing JSON messages in your Kafka consumer. It specifies which Java packages are ‘trusted’ for JSON deserialization by your application.
Here’s why it’s needed: When converting JSON to Java objects, there’s a risk. If an attacker sends a JSON that matches a class in your application, they might be able to execute unwanted code. To prevent this, Spring allows you to specify exactly which packages can be used for deserialization.
spring.kafka.consumer.properties.spring.json.trusted.packages=com.appsdeveloperblog.ws.core
This means that only the classes in the com.appsdeveloperblog.ws.core
package can be deserialized.
If you were to set it to *
, like this:
spring.kafka.consumer.properties.spring.json.trusted.packages=*
This would mean that your application trusts all packages, which is not safe. An attacker could craft a JSON payload that could exploit your application by deserializing to a malicious class.
You can also specify multiple packages by separating them with commas. For instance:
spring.kafka.consumer.properties.spring.json.trusted.packages=com.appsdeveloperblog.ws.core,com.appsdeveloperblog.ws.models
This way, both com.appsdeveloperblog.ws.core
and com.appsdeveloperblog.ws.models
packages are trusted for JSON deserialization.
For beginners, it’s like telling your application, “Only accept JSON that looks like the classes in the packages I trust. Ignore anything else.” This keeps your application secure.
Kafka Consumer Class with @KafkaListener
For your Microservice to be able to read messages from Kafka topic, you will need to create a new class and annotate it with @KafkaListener
annotation.
The @KafkaListener
annotation marks a method or a class to be the target of a Kafka message listener on the specified topics. This is how you tell Spring where to send messages from a topic. For example:
@Component @KafkaListener(topics="product-created-events-topic") public class ProductCreatedEventHandler { @KafkaHandler public void handle(ProductCreatedEvent productCreatedEvent) { // processing logic goes here } }
In this class:
@Component
makes it a Spring-managed bean.@KafkaListener(topics="product-created-events-topic")
specifies that this bean should listen to the “product-created-events-topic” topic in Kafka.- The
handle
method is where you will process eachProductCreatedEvent
message that comes in.
Consuming Messages with @KafkaHandler
The @KafkaHandler
annotation designates the method within a class annotated with @KafkaListener
that will handle messages. When a message arrives, Spring will invoke the method annotated with @KafkaHandler
that has a compatible parameter type.
For instance, in the ProductCreatedEventHandler
class, the handle
method is annotated with @KafkaHandler
and takes a ProductCreatedEvent
as a parameter:
@KafkaHandler public void handle(ProductCreatedEvent productCreatedEvent) { // Logic to process the event }
When a ProductCreatedEvent
message is received from the “product-created-events-topic”, this handle
method will be called with the message payload. The event class is below.
Event Class – ProductCreatedEvent
The ProductCreatedEvent
class is designed to match the structure of the JSON message that the Kafka consumer will receive.
package com.appsdeveloperblog.ws.core; import java.math.BigDecimal; public class ProductCreatedEvent { private String productId; private String title; private BigDecimal price; private Integer quantity; public ProductCreatedEvent() { } public ProductCreatedEvent(String productId, String title, BigDecimal price, Integer quantity) { this.productId = productId; this.title = title; this.price = price; this.quantity = quantity; } // Standard getters, and setters omitted for brevity }
When a message arrives from Kafka, Spring uses the JsonDeserializer
specified in your application.properties
to turn that JSON into a ProductCreatedEvent
object. It looks at the JSON keys (title
, price
, quantity
) and maps them to the corresponding fields in the class.
For the deserialization to work, the JSON message must have keys that exactly match the field names in the ProductCreatedEvent
class. If the message is in the right format, like the example provided:
{ "title": "iPad Pro", "price": 1250, "quantity": 19 }
Then the @KafkaHandler
annotated method will receive an instance of ProductCreatedEvent
with its title
set to "iPad Pro"
, its price
set to 1250
, and quantity
set to 19
. You can then use this object within the method to process the message as needed.
Conclusion
I hope this tutorial was helpful to you.
To learn more about how Apache Kafka can be used to build Event-Driven Spring Boot Microservices, please check my Kafka tutorials for beginners page.
If you are interested in video lessons then check my video course Apache Kafka for Event-Driven Spring Boot Microservices.