Spring Boot Kafka Clients

Spring-kafka project provides high level abstraction for kafka-clients API. By using such high level API we can easily send or receive messages , and most of the client configurations will be handled automatically with best practices, such as breaking poll loops, graceful terminations, thread safety, etc. The library provides:

  • Template to be used for sending messages
  • @KafkaListener annotation to be used for receiving messages.
  • KafkaAdmin bean to configure the management, topics, partitions, etc.

In order to use the spring-kafka we should start with adding the library to our dependencies:

dependencies {
	compile 'org.springframework.kafka:spring-kafka'
}
build.gradle

If you would like to check the Apache Kafka Basics, or Java implementation of Kafka clients please check the previous posts.

Creating Producer

The KafkaTemplate wraps a producer and provides useful methods to produce messages. Some of the methods that it provides is as follows:

// send value to default topic
ListenableFuture<SendResult<K, V>> sendDefault(V data);

// send key and value to default topic
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

// send value to given topic
ListenableFuture<SendResult<K, V>> send(String topic, V data);

// send key and value to given topic
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

To use the template, you can configure a producer factory and provide it in the template’s constructor.

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${kafka.bootstrap-server}")
    String bootstrapServer;

    @Bean
    public Map <String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap <>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // set the producer idempotent
        // idempotent producer props:
        // acks = all, retries = Integer.Max, max_in_flight_request_per_connection = 5
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

        // add batching to get high throughput
        props.put(ProducerConfig.LINGER_MS_CONFIG, "50");
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        return props;
    }


    @Bean
    public ProducerFactory <String, String> producerFactory() {
        return new DefaultKafkaProducerFactory <>(producerConfigs());
    }

    @Bean
    public KafkaTemplate <String, String> kafkaTemplate() {
        return new KafkaTemplate <>(producerFactory());
    }

}
KafkaConfig.java

We can create different templates with different configurations. Now, in order to send messages we can use the configured template. To separate the message sending logic from our application logic and we can create our custom KafkaSender:

public abstract class KafkaSender {
    public abstract void sendMessage(String message);
}
KafkaSender.class

This class will be our own abstraction for sending message and it will use the template. We can provide different implementations of KafkaSender for different purposes. For example we can create different implementations of this abstract class with different @Profile annotations.

@Profile("kafka")
@Slf4j
@Service
public class KafkaSenderImpl extends KafkaSender {
    private static String TOPIC;

    private final KafkaTemplate <String, String> kafkaTemplate;

    public KafkaSenderImpl(KafkaTemplate <String, String> kafkaTemplate, @Value("${kafka.topic}") String topic) {
        this.kafkaTemplate = kafkaTemplate;
        Objects.requireNonNull(topic);
        TOPIC = topic;
    }

    @Override
    public void sendMessage(String message) {
        log.info(String.format("Producing message: %s", message));
        ListenableFuture <SendResult <String, String>> future = this.kafkaTemplate.send(TOPIC, message);

        future.addCallback(new ListenableFutureCallback <>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("Sending Kafka Message is failed with the following exception: " + ex.getMessage());
                ex.printStackTrace();
            }

            @Override
            public void onSuccess(SendResult <String, String> result) {
                log.debug("Sending Kafka Message is completed successfully");
            }
        });
    }
}

Note that template returns ListenableFuture which gives us to add callback functions as above.

And in some environments we can disable sending kafka message and just mock the behaviour, create a fake sender which possibly just logs the message, but not really interacts with Kafka.

@Profile("!kafka")
@Service
@Slf4j
public class KafkaSenderMock extends KafkaSender {

    /**
     *   Mock implementation of Kafka Sender.
     *   In order for using the application without Kafka Server
     *   When no-Kafka profile is set, messages are only logged
     */

    @Override
    public void sendMessage(String message) {
        log.info("Producing message: " + message);
    }
}

By this way we can run the app without really sending the messages to Kafka if we did not set the kafka profile.

Creating Consumer

For creating a consumer we need to configure a  MessageListenerContainer and to receive messages we should provide either a MessageListener or a method with @KafkaListener annotation. There are 2 implementation for message listener container:

  • KafkaMessageListenerContainer: receives all message from all topics on a single thread.
  • ConcurrentMessageListenerContainer: provides multi-threaded consumption.

If we look at the class diagrams:

MessageListenerContainer class diagram

So as a first step we need to provide an implementation of MessageListenerContainer Our configuration simply looks like as follows:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${kafka.bootstrap-server}")
    String bootstrapServer;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroup-1");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory <Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory <>(consumerConfigs());
    }

    @Bean
    KafkaListenerContainerFactory <ConcurrentMessageListenerContainer <Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory <Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

Note that here we are basically configuring the ConcurrentKafkaListenerContainerFactory with the given ConsumerFactor which holds the properties of our consumer.

As a second step, in order to receive the messages we should create a MessageListener or we use @KafkaListener annotation.

1. MessageListener

One of the MessageListener interface is as follows:

public interface MessageListener<K, V> {
    void onMessage(ConsumerRecord<K, V> data);
}

So we can use this interface for processing individual ConsumerRecord instances received from the Kafka consumer poll() operation.

2. @KafkaListener

KafkaListener annotation is the easy way to handle receiving messages. This annotation requires @EnableKafka annotation on configuration. Annotation can be set to bean methods. For example:

@Component
@Slf4j
public class KafkaListener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        log.info("Message received:" + data);
    }

}
KafkaListener.java