Spring Boot Kafka Clients
Spring-kafka
project provides high level abstraction for kafka-clients
API. By using such high level API we can easily send or receive messages , and most of the client configurations will be handled automatically with best practices, such as breaking poll loops, graceful terminations, thread safety, etc. The library provides:
Template
to be used for sending messages@KafkaListener
annotation to be used for receiving messages.KafkaAdmin
bean to configure the management, topics, partitions, etc.
In order to use the spring-kafka
we should start with adding the library to our dependencies:
If you would like to check the Apache Kafka Basics, or Java implementation of Kafka clients please check the previous posts.
Creating Producer
The KafkaTemplate
wraps a producer and provides useful methods to produce messages. Some of the methods that it provides is as follows:
// send value to default topic
ListenableFuture<SendResult<K, V>> sendDefault(V data);
// send key and value to default topic
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
// send value to given topic
ListenableFuture<SendResult<K, V>> send(String topic, V data);
// send key and value to given topic
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
To use the template, you can configure a producer factory and provide it in the template’s constructor.
We can create different templates with different configurations. Now, in order to send messages we can use the configured template. To separate the message sending logic from our application logic and we can create our custom KafkaSender
:
This class will be our own abstraction for sending message and it will use the template. We can provide different implementations of KafkaSender for different purposes. For example we can create different implementations of this abstract class with different @Profile
annotations.
@Profile("kafka")
@Slf4j
@Service
public class KafkaSenderImpl extends KafkaSender {
private static String TOPIC;
private final KafkaTemplate <String, String> kafkaTemplate;
public KafkaSenderImpl(KafkaTemplate <String, String> kafkaTemplate, @Value("${kafka.topic}") String topic) {
this.kafkaTemplate = kafkaTemplate;
Objects.requireNonNull(topic);
TOPIC = topic;
}
@Override
public void sendMessage(String message) {
log.info(String.format("Producing message: %s", message));
ListenableFuture <SendResult <String, String>> future = this.kafkaTemplate.send(TOPIC, message);
future.addCallback(new ListenableFutureCallback <>() {
@Override
public void onFailure(Throwable ex) {
log.error("Sending Kafka Message is failed with the following exception: " + ex.getMessage());
ex.printStackTrace();
}
@Override
public void onSuccess(SendResult <String, String> result) {
log.debug("Sending Kafka Message is completed successfully");
}
});
}
}
Note that template returns ListenableFuture
which gives us to add callback functions as above.
And in some environments we can disable sending kafka message and just mock the behaviour, create a fake sender which possibly just logs the message, but not really interacts with Kafka.
@Profile("!kafka")
@Service
@Slf4j
public class KafkaSenderMock extends KafkaSender {
/**
* Mock implementation of Kafka Sender.
* In order for using the application without Kafka Server
* When no-Kafka profile is set, messages are only logged
*/
@Override
public void sendMessage(String message) {
log.info("Producing message: " + message);
}
}
By this way we can run the app without really sending the messages to Kafka if we did not set the kafka
profile.
Creating Consumer
For creating a consumer we need to configure a MessageListenerContainer
and to receive messages we should provide either a MessageListener
or a method with @KafkaListener
annotation. There are 2 implementation for message listener container:
KafkaMessageListenerContainer
: receives all message from all topics on a single thread.ConcurrentMessageListenerContainer
: provides multi-threaded consumption.
If we look at the class diagrams:
So as a first step we need to provide an implementation of MessageListenerContainer
Our configuration simply looks like as follows:
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.bootstrap-server}")
String bootstrapServer;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroup-1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory <Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory <>(consumerConfigs());
}
@Bean
KafkaListenerContainerFactory <ConcurrentMessageListenerContainer <Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory <Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
Note that here we are basically configuring the ConcurrentKafkaListenerContainerFactory
with the given ConsumerFactor
which holds the properties of our consumer.
As a second step, in order to receive the messages we should create a MessageListener
or we use @KafkaListener
annotation.
1. MessageListener
One of the MessageListener interface is as follows:
public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
}
So we can use this interface for processing individual ConsumerRecord
instances received from the Kafka consumer poll()
operation.
2. @KafkaListener
KafkaListener annotation is the easy way to handle receiving messages. This annotation requires @EnableKafka
annotation on configuration. Annotation can be set to bean methods. For example: