In this tutorial, I will guide you through the process of setting up a Kafka Consumer using the @Configuration class and the @Bean method. This approach offers more flexibility and control compared to using the application.properties
file.
Kafka consumer configuration can be achieved in two primary ways in a Spring Boot application:
- Using the
application.properties
file: This is a straightforward method where you define all your Kafka Consumer configurations in theapplication.properties
orapplication.yml
file of your Spring Boot project. If you’re interested in this method, I recommend reading the tutorial “Creating Kafka Consumer in a Spring Boot Microservice” for a comprehensive guide. - Using the
@Configuration
class and@Bean
method: This method, which is our focus in this tutorial, involves creating a dedicated configuration class annotated with@Configuration
. Inside this class, you define methods annotated with@Bean
to set up your Kafka Consumer. This method provides a programmatic way to define your configurations, giving you more control and flexibility, especially in complex applications.
If you are new to Kafka, you might wonder why choose one method over the other. The answer lies in your project’s requirements. If you need a quick and simple setup, the application.properties
method is suitable. However, if your project demands more dynamic and complex configurations, using @Configuration
and @Bean
annotations is the way to go.
In the following sections, I will walk you through creating your @Configuration
class and configuring your Kafka Consumer using the @Bean
method.
If you are interested in video lessons then check my video course Apache Kafka for Event-Driven Spring Boot Microservices.
Creating the @Configuration
Class
In Spring Boot, an @Configuration
class is a special type of class that holds bean definitions. Beans are objects that form the backbone of your application and are managed by the Spring container. When you annotate a class with @Configuration
, you’re telling Spring Boot that this class contains methods that produce beans. These beans are then used by Spring Boot to perform various actions in your application.
The @Configuration
class is where you’ll define your Kafka Consumer configurations.
package com.yourpackage; import org.springframework.context.annotation.Configuration; @Configuration public class KafkaConsumerConfig { // Beans will be defined here }
Configuring Kafka Consumer with @Bean
Method
Now that you have your @Configuration
class ready, it’s time to configure the Kafka Consumer within it. This is where the @Bean
annotation comes into play.
The @Bean
annotation tells Spring Boot that a method produces a bean to be managed by the Spring container. When you annotate a method with @Bean
, the return value of the method becomes a bean in the Spring application context, available for other parts of the application to use. Read more about @Bean annotation.
Inside your @Configuration
class, define a method that returns a ConsumerFactory
. This factory is responsible for creating Kafka Consumer instances. Annotate this method with @Bean
.
@Bean public ConsumerFactory<String, Object> consumerFactory() { // Configuration details will go here }
Inside this method, you’ll specify various properties that configure the Kafka Consumer.
Bootstrap Servers
The bootstrap-servers
property defines the Kafka server addresses. It’s crucial for the consumer to know where to connect for fetching data.
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9094");
Key Deserializer
This property tells the consumer how to deserialize the key of a message. Kafka messages have a key and value, and you can specify different deserializers for each. For most cases, StringDeserializer
is used for keys.
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Value Deserializer
Similar to the key, this property specifies how to deserialize the message value. Since Kafka can handle various types of data, the deserializer depends on the type of data your messages will have. In our case, we are using JsonDeserializer
for JSON formatted message values.
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
Group ID
The group-id
property defines the consumer group ID. This is important for Kafka to know which messages have been processed by which consumer.
config.put(ConsumerConfig.GROUP_ID_CONFIG, "product-created-events");
Trusted Packages
When using JsonDeserializer
, you need to specify which packages are trusted for deserialization. This is a security measure to ensure only known classes are deserialized.
config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.appsdeveloperblog.ws.core");
When JSON is transformed into Java objects, there’s a potential security risk. An attacker could exploit this by crafting a JSON that, when deserialized, executes unwanted code. To mitigate this, you specify which packages are allowed for deserialization.
For example, setting spring.kafka.consumer.properties.spring.json.trusted.packages=com.appsdeveloperblog.ws.core
means only classes in the com.appsdeveloperblog.ws.core
package are deemed safe for deserialization. It’s a way to tell your application, “Only convert JSON into Java objects if they match the classes in these trusted packages.”
Setting this property to *
, which implies trusting all packages, is risky and opens up your application to potential security vulnerabilities. It’s advisable to explicitly list the packages you trust, ensuring that your application remains secure against deserialization attacks.
Final version
Here’s the final version of the KafkaConsumerConfig
class with the consumerFactory
method included. This class is fully configured to create a Kafka Consumer in your Spring Boot application:
package com.appsdeveloperblog.ws.emailnotification; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> config = new HashMap<>(); // Kafka Bootstrap Server configuration config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9094"); // Key Deserializer config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Value Deserializer config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName()); config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.appsdeveloperblog.ws.core"); // Consumer Group ID config.put(ConsumerConfig.GROUP_ID_CONFIG, "product-created-events"); return new DefaultKafkaConsumerFactory<>(config); } }
Conclusion
I hope this tutorial was helpful to you.
To learn more about how Apache Kafka can be used to build Event-Driven Spring Boot Microservices, please check my Kafka tutorials for beginners page.
If you are interested in video lessons then check my video course Apache Kafka for Event-Driven Spring Boot Microservices.