In this tutorial, you will learn how to use the @Bean configuration for the Kafka producer in a Spring Boot application.
If you are interested in video lessons then check my video course Apache Kafka for Event-Driven Spring Boot Microservices.
Introduction
A Kafka producer is a client that sends messages to a Kafka topic, which is a logical name for a stream of records. A Kafka producer can specify various configuration properties to control how the messages are sent, such as the bootstrap servers, the key and value serializers, the acknowledgment mode, the delivery timeout, the linger time, and the request timeout.
By default, Spring Boot provides a convenient way to configure the Kafka producer properties in the application.properties or application.yml file. However, this approach has some limitations, such as:
- You cannot use different properties for different producers in the same application,
- You cannot use custom properties that are not supported by Spring Boot.
To overcome these limitations, you can use the @Bean annotation to create and configure the Kafka producer beans programmatically in a Spring Boot application. The @Bean
annotation tells Spring that a method returns an object that should be registered as a bean in the Spring application context. The benefits of using the @Bean configuration over the application.properties file are:
- You can change the properties at runtime using the
@RefreshScope
annotation, which refreshes the bean when a configuration change is detected, - You can create multiple producers with different properties in the same application,
- You can use any properties that are supported by the Kafka producer API.
In this tutorial, you will learn how configure Kafka Producer using in a method annotated with @Bean annotation.
Step 1. Create a Kafka Producer Configuration Class
To configure Kafka Producer in my Java code, I will create a new configuration class. This configuration class will contain Kafka Producer configuration in a @Bean method.
For example, you can create a class named KafkaProducerConfig.java
as follows:
@Configuration @RefreshScope public class KafkaProducerConfig { // ... }
Step 2. Inject the Kafka properties from application.properties file
In this section, you will learn how to inject configuration properties from application.properties file into KafkaProducerConfig
class.
Inject properties using @Value annotation
You can still use the application.properties file to store the Kafka properties, but instead of letting Spring Boot auto-configure them, you can inject them into the Kafka producer configuration class using the @Value annotation.
The @Value annotation allows you to inject a single property value using the ${…}
placeholder. For example, you can inject the following properties from the application.properties file into the Kafka producer configuration class using the @Value annotation.
application.properties file
spring.kafka.producer.bootstrap-servers=localhost:9092,localhost:9094 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.acks=all spring.kafka.producer.properties.delivery.timeout.ms=120000 spring.kafka.producer.properties.linger.ms=0 spring.kafka.producer.properties.request.timeout.ms=30000
KafkaProducerConfig.java
@Configuration @RefreshScope public class KafkaProducerConfig { @Value("${spring.kafka.producer.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.producer.key-serializer}") private String keySerializer; @Value("${spring.kafka.producer.value-serializer}") private String valueSerializer; @Value("${spring.kafka.producer.acks}") private String acks; @Value("${spring.kafka.producer.properties.delivery.timeout.ms}") private Integer deliveryTimeout; @Value("${spring.kafka.producer.properties.linger.ms}") private Integer linger; @Value("${spring.kafka.producer.properties.request.timeout.ms}") private Integer requestTimeout; // ... }
Inject Properties using the @ConfigurationProperties annotation
Alternatively(instead of using @Value annotation), you can use the @ConfigurationProperties
annotation to bind all the properties with the prefix spring.kafka.producer
to a bean named KafkaProducerProperties.java
as follows:
@ConfigurationProperties(prefix = "spring.kafka.producer") public class KafkaProducerProperties { private String bootstrapServers; private String keySerializer; private String valueSerializer; private String acks; private Map<String, Object> properties; // getters and setters }
Then, you can inject the KafkaProducerProperties
bean into the Kafka producer configuration class using the @EnableConfigurationProperties
and @Autowired
annotations as follows:
@Configuration @EnableConfigurationProperties(KafkaProducerProperties.class) @RefreshScope public class KafkaProducerConfig { @Autowired private KafkaProducerProperties kafkaProducerProperties; // ... }
Step 3. Create a producerConfigs() method that returns a Map of producer configuration properties
Next, you can create a producerConfigs()
method that returns a Map of producer configuration properties using the injected values from the previous step. The Map should contain the keys and values that are supported by the Kafka producer API.
For example, you can create the producerConfigs()
method as follows:
@Configuration @RefreshScope public class KafkaProducerConfig { @Value("${spring.kafka.producer.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.producer.key-serializer}") private String keySerializer; @Value("${spring.kafka.producer.value-serializer}") private String valueSerializer; @Value("${spring.kafka.producer.acks}") private String acks; @Value("${spring.kafka.producer.properties.delivery.timeout.ms}") private Integer deliveryTimeout; @Value("${spring.kafka.producer.properties.linger.ms}") private Integer linger; @Value("${spring.kafka.producer.properties.request.timeout.ms}") private Integer requestTimeout; public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); props.put(ProducerConfig.ACKS_CONFIG, acks); props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeout); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout); return props; } // ... }
Step 4. Create a producerFactory() method
The next step is to create a producerFactory()
method that returns a ProducerFactory
object using the producerConfigs()
method.
The ProducerFactory
is an interface that defines how to create a Kafka producer. Spring Boot provides a default implementation called DefaultKafkaProducerFactory
, which takes a Map of producer configuration properties as a constructor argument.
For example, you can create the producerFactory() method as follows:
@Configuration @RefreshScope public class KafkaProducerConfig { // ... @Bean public ProducerFactory<String, Object> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } // ... }
Notice that the producerFactory() method is annotated with @Bean annotation. This is very important because with a @Bean annotation, you can make the ProducerFactory object available for other components that need to use it, such as the KafkaTemplate. Without the @Bean annotation, the producerFactory() method would not create a bean and the ProducerFactory object would not be accessible by other beans.
Step 5. Create a kafkaTemplate() method that returns a KafkaTemplate
Finally, you can create a kafkaTemplate()
method that returns a KafkaTemplate
object using the producerFactory()
method. The KafkaTemplate
is a class that provides a high-level abstraction for sending messages to Kafka topics. It wraps the Kafka producer and handles the serialization, partitioning, and error handling.
For example, you can create the kafkaTemplate() method as follows:
@Configuration @RefreshScope public class KafkaProducerConfig { @Bean KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean ProducerFactory<String, Object> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } // ... }
Conclusion
I hope this tutorial is helpful to you.
To learn more about Apache Kafka, please check my other Apache Kafka tutorials for beginners. You can also find more resources and documentation on the official Kafka website.
If you are interested in video lessons then check my video course Apache Kafka for Event-Driven Spring Boot Microservices.