How to – Consume Kafka Events Using spring-kafka

In our "Kafka Producer" article we have looked at how we can leverage the Kafka server to produce messages to be sent to a topic.

In this article, we will concentrate on how to consume messages from a topic.

1. Overview

Kafka consumer applications are intended to read the generated events by the producers from a specific topic defined on Kafka brokers.

In traditional messaging systems, we know that there are two patterns: queue and publish-subscribe patterns. A Kafka consumer application works like using a publish-subscribe pattern where producers put messages into topics and consumers read the message from topics.

As we mentioned before, Kafka stores its data organized in topics. Topics can be distributed across Kafka brokers to enable high availability and scalability. In order to enable scalability, topics can be divided into partitions. Several partitions of a topic can reside in different Kafka nodes. This allows client applications to read the data from many brokers at the same time. The same is true for writing also.

As we know, messages are stored with their relevant keys. Kafka ensures that messages with the same key are written to the same partition as FIFO buffers in the topic. For the consumers, it is also guaranteed to read the messages in the same order as they are stored.

In Kafka, we can also group consumers to efficiently consume the divided data in the same topic. Every consumer group is identified by their group id. When a topic is consumed by consumers in the same consume group, every message will be delivered to only one consumer. This enables effective load balancing on consuming messages in topics.

As you see in the above diagram, every consumer belongs to a consumer group and consumers can read messages from several partitions and topics. The only restriction is that only one consumer can read the messages from the same topic within the same consumer group. It is also possible to consume the same partitions from different consumer groups.

By using consumer groups, we can achieve different types of parallelism. In order to achieve the highest parallelism in a consumer group, the number of partitions of a topic should be equal to the number of consumers.

Prerequisites

Before creating our application, we should first run the Kafka server. For details see the "Kafka producer" article.

First, execute the following command in Kafka folder bin/zookeeper-server-start.sh config/zookeeper.properties to start the Kafka Zookeeper service.

Now execute the following command: bin/kafka-server-start.sh config/server.properties in order to start the Kafka Broker Service.

Finally, start the Kafka producer example which creates 10 partitions for a sample topic and sends messages to those partitions.

2. Step by step – code to consume & process events

Let’s create a new SpringBootApplication and wire the KafkaTemplate with String as a message key and value which uses DefaultKafkaProducerFactory.

    @Autowired
    private KafkaTemplate<String, String> template;

We will now implement an event-driven listener to consume all partitions of a topic from the same worker. Spring provides KafkaListener annotation to implement listeners. Spring Kafka listeners can listen to a group of partitions or several topics at once. You can also override concurrency settings for listeners.

    @KafkaListener(topics = "mySampleTopic", groupId = "default")
    public void listen(ConsumerRecord<?, ?> rec) throws Exception {
        System.out.println("Event:" + rec.toString());
    }

Now start the consumer application by running mvn spring-boot:run

You will see that all the partitions are assigned to the same listener, and the following events are consumed:

KafkaMessageListenerContainer: default: partitions assigned: [mySampleTopic-0, mySampleTopic-1, mySampleTopic-2, mySampleTopic-3, mySampleTopic-4, mySampleTopic-5, mySampleTopic-6, mySampleTopic-7, mySampleTopic-8, mySampleTopic-9]
Event:ConsumerRecord(topic = mySampleTopic, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1609005696129, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey0, value = mySampleEvent0)
Event:ConsumerRecord(topic = mySampleTopic, partition = 1, leaderEpoch = 0, offset = 3, CreateTime = 1609005701156, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey1, value = mySampleEvent1)
Event:ConsumerRecord(topic = mySampleTopic, partition = 2, leaderEpoch = 0, offset = 3, CreateTime = 1609005706160, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey2, value = mySampleEvent2)
Event:ConsumerRecord(topic = mySampleTopic, partition = 3, leaderEpoch = 0, offset = 3, CreateTime = 1609005711163, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey3, value = mySampleEvent3)
Event:ConsumerRecord(topic = mySampleTopic, partition = 4, leaderEpoch = 0, offset = 3, CreateTime = 1609005716168, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey4, value = mySampleEvent4)
Event:ConsumerRecord(topic = mySampleTopic, partition = 5, leaderEpoch = 0, offset = 3, CreateTime = 1609005721183, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey5, value = mySampleEvent5)
Event:ConsumerRecord(topic = mySampleTopic, partition = 6, leaderEpoch = 0, offset = 3, CreateTime = 1609005726192, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey6, value = mySampleEvent6)
Event:ConsumerRecord(topic = mySampleTopic, partition = 7, leaderEpoch = 0, offset = 3, CreateTime = 1609005731196, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey7, value = mySampleEvent7)
Event:ConsumerRecord(topic = mySampleTopic, partition = 8, leaderEpoch = 0, offset = 3, CreateTime = 1609005736211, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey8, value = mySampleEvent8)
Event:ConsumerRecord(topic = mySampleTopic, partition = 9, leaderEpoch = 0, offset = 3, CreateTime = 1609005741215, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey9, value = mySampleEvent9)

You can also listen to specific partitions. Now change the following code and check that our consumer only gets messages from partitions 5 to 7.

 @KafkaListener(topics = "mySampleTopic", groupId = "default", 
        topicPartitions = @TopicPartition(topic = "mySampleTopic", 
                partitions = "5-7"))

Now the output will be:

KafkaConsumer     : [Consumer clientId=consumer-default-1, groupId=default] Subscribed to partition(s): mySampleTopic-5, mySampleTopic-6, mySampleTopic-7
Event:ConsumerRecord(topic = mySampleTopic, partition = 5, leaderEpoch = 0, offset = 8, CreateTime = 1609010158098, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey5, value = mySampleEvent5)
Event:ConsumerRecord(topic = mySampleTopic, partition = 6, leaderEpoch = 0, offset = 8, CreateTime = 1609010163109, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey6, value = mySampleEvent6)
Event:ConsumerRecord(topic = mySampleTopic, partition = 7, leaderEpoch = 0, offset = 8, CreateTime = 1609010168116, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey7, value = mySampleEvent7)

3. Some other way(s) to setup consumers using spring kafka

Event-Driven Consumer vs using poll()

Event-driven Consumer – Event-driven Consumer is another effective way to consume and process events where we don’t have to manage the threads.

Using poll() – You have to create KafkaConsumer and use the poll method to poll for events. Of course, you will need to set some properties for the KafkaConsumer to successfully connect to broker, and serialize/deserialize event record. See the following example:

    public void testPoll() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configs.put("group.id", "default");
        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
        try {
            consumer.subscribe(Arrays.asList(new String[]{"mySampleTopic"}));
            ConsumerRecords<String, String> recs = consumer.poll(Duration.ofSeconds(3));
            System.out.println("Got " + recs.count() + " records.");
            for (ConsumerRecord rec : recs) {
                System.out.println("Event:" + rec.toString());
            }
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

4. Multiple Consumers

Now we want to test how we can load balance the messages by using consumer groups. Run two consumer applications at once with the same group ID. Then, execute the producer sample to fill the partitions with the sample message. In this scenario, we will evaluate two concurrent consumers.

When we run the first consumer, it starts to listen to all 10 partitions in the topic. Then, when we run the second consumer, we will observe that half of the partitions are now listening by this consumer. Finally, we will see that every consumer will only receive half of the messages.

You will see that the topics that consumer 1 are listening to have been revoked when consumer 2 is up. Check the logs of consumer 1:

KafkaMessageListenerContainer    : default: partitions assigned: [mySampleTopic-0, mySampleTopic-1, mySampleTopic-2, mySampleTopic-3, mySampleTopic-4, mySampleTopic-5, mySampleTopic-6, mySampleTopic-7, mySampleTopic-8, mySampleTopic-9]
ConsumerCoordinator  : [Consumer clientId=consumer-default-1, groupId=default] Revoke previously assigned partitions mySampleTopic-0, mySampleTopic-1, mySampleTopic-2, mySampleTopic-3, mySampleTopic-4, mySampleTopic-5, mySampleTopic-6, mySampleTopic-7, mySampleTopic-8, mySampleTopic-9
KafkaMessageListenerContainer    : default: partitions revoked: [mySampleTopic-0, mySampleTopic-1, mySampleTopic-2, mySampleTopic-3, mySampleTopic-4, mySampleTopic-5, mySampleTopic-6, mySampleTopic-7, mySampleTopic-8, mySampleTopic-9]
AbstractCoordinator  : [Consumer clientId=consumer-default-1, groupId=default] (Re-)joining group
ConsumerCoordinator  : [Consumer clientId=consumer-default-1, groupId=default] Finished assignment for group at generation 6: {consumer-default-1-13bda4f5-ac51-4032-a068-d9e65db38ab7=Assignment(partitions=[mySampleTopic-0, mySampleTopic-1, mySampleTopic-2, mySampleTopic-3, mySampleTopic-4]), consumer-default-1-9485d924-e1db-421a-a309-06ac897f998f=Assignment(partitions=[mySampleTopic-5, mySampleTopic-6, mySampleTopic-7, mySampleTopic-8, mySampleTopic-9])}
AbstractCoordinator  : [Consumer clientId=consumer-default-1, groupId=default] Successfully joined group with generation 6
ConsumerCoordinator  : [Consumer clientId=consumer-default-1, groupId=default] Notifying assignor about the new Assignment(partitions=[mySampleTopic-5, mySampleTopic-6, mySampleTopic-7, mySampleTopic-8, mySampleTopic-9])
ConsumerCoordinator  : [Consumer clientId=consumer-default-1, groupId=default] Adding newly assigned partitions: mySampleTopic-5, mySampleTopic-6, mySampleTopic-7, mySampleTopic-8, mySampleTopic-9
KafkaMessageListenerContainer    : default: partitions assigned: [mySampleTopic-5, mySampleTopic-6, mySampleTopic-7, mySampleTopic-8, mySampleTopic-9]
Event:ConsumerRecord(topic = mySampleTopic, partition = 5, leaderEpoch = 0, offset = 6, CreateTime = 1609008649256, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey5, value = mySampleEvent5)
Event:ConsumerRecord(topic = mySampleTopic, partition = 6, leaderEpoch = 0, offset = 6, CreateTime = 1609008654259, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey6, value = mySampleEvent6)
Event:ConsumerRecord(topic = mySampleTopic, partition = 7, leaderEpoch = 0, offset = 6, CreateTime = 1609008659273, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey7, value = mySampleEvent7)
Event:ConsumerRecord(topic = mySampleTopic, partition = 8, leaderEpoch = 0, offset = 6, CreateTime = 1609008664280, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey8, value = mySampleEvent8)
Event:ConsumerRecord(topic = mySampleTopic, partition = 9, leaderEpoch = 0, offset = 6, CreateTime = 1609008669282, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey9, value = mySampleEvent9)

Now check the logs of consumer 2:

AbstractCoordinator  : [Consumer clientId=consumer-default-1, groupId=default] (Re-)joining group
AbstractCoordinator  : [Consumer clientId=consumer-default-1, groupId=default] Successfully joined group with generation 6
ConsumerCoordinator  : [Consumer clientId=consumer-default-1, groupId=default] Adding newly assigned partitions: mySampleTopic-0, mySampleTopic-1, mySampleTopic-2, mySampleTopic-3, mySampleTopic-4
KafkaMessageListenerContainer    : default: partitions assigned: [mySampleTopic-0, mySampleTopic-1, mySampleTopic-2, mySampleTopic-3, mySampleTopic-4]
Event:ConsumerRecord(topic = mySampleTopic, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1609008624212, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey0, value = mySampleEvent0)
Event:ConsumerRecord(topic = mySampleTopic, partition = 1, leaderEpoch = 0, offset = 6, CreateTime = 1609008629229, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey1, value = mySampleEvent1)
Event:ConsumerRecord(topic = mySampleTopic, partition = 2, leaderEpoch = 0, offset = 6, CreateTime = 1609008634239, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey2, value = mySampleEvent2)
Event:ConsumerRecord(topic = mySampleTopic, partition = 3, leaderEpoch = 0, offset = 6, CreateTime = 1609008639250, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey3, value = mySampleEvent3)
Event:ConsumerRecord(topic = mySampleTopic, partition = 4, leaderEpoch = 0, offset = 6, CreateTime = 1609008644254, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey4, value = mySampleEvent4)

As you see in the above sample, each message is sent to only one consumer in the same consumer group. Now, suppose that we want to consume the same message from different consumer groups. In order to achieve this, start one of the consumers with the existing code and execute another instance after changing the group ID as below:

@KafkaListener(topics = "mySampleTopic", groupId = "additionalGroup")

Consumer 1 is running within the "default" consumer group and consumer 2 is running within the "additionalGroup" consumer group. You will now see that every message is retrieved by both consumers. The logs of each consumer will be:

KafkaMessageListenerContainer    : default: partitions assigned: [mySampleTopic-0, mySampleTopic-1, mySampleTopic-2, mySampleTopic-3, mySampleTopic-4, mySampleTopic-5, mySampleTopic-6, mySampleTopic-7, mySampleTopic-8, mySampleTopic-9]
Event:ConsumerRecord(topic = mySampleTopic, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1609009402275, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey0, value = mySampleEvent0)
Event:ConsumerRecord(topic = mySampleTopic, partition = 1, leaderEpoch = 0, offset = 7, CreateTime = 1609009407289, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey1, value = mySampleEvent1)
Event:ConsumerRecord(topic = mySampleTopic, partition = 2, leaderEpoch = 0, offset = 7, CreateTime = 1609009412296, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey2, value = mySampleEvent2)
Event:ConsumerRecord(topic = mySampleTopic, partition = 3, leaderEpoch = 0, offset = 7, CreateTime = 1609009417297, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey3, value = mySampleEvent3)
Event:ConsumerRecord(topic = mySampleTopic, partition = 4, leaderEpoch = 0, offset = 7, CreateTime = 1609009422302, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey4, value = mySampleEvent4)
Event:ConsumerRecord(topic = mySampleTopic, partition = 5, leaderEpoch = 0, offset = 7, CreateTime = 1609009427316, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey5, value = mySampleEvent5)
Event:ConsumerRecord(topic = mySampleTopic, partition = 6, leaderEpoch = 0, offset = 7, CreateTime = 1609009432329, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey6, value = mySampleEvent6)
Event:ConsumerRecord(topic = mySampleTopic, partition = 7, leaderEpoch = 0, offset = 7, CreateTime = 1609009437343, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey7, value = mySampleEvent7)
Event:ConsumerRecord(topic = mySampleTopic, partition = 8, leaderEpoch = 0, offset = 7, CreateTime = 1609009442352, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey8, value = mySampleEvent8)
Event:ConsumerRecord(topic = mySampleTopic, partition = 9, leaderEpoch = 0, offset = 7, CreateTime = 1609009447359, serialized key size = 6, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = myKey9, value = mySampleEvent9)

5. Spring Kafka vs Kafka Consumer API

It is also possible to consume events from a Kafka broker without using Spring Kafka. In addition to command-line tools for management and administrative operations, Kafka has five types of APIs for Java and Scala:

  • Admin API
  • Producer API
  • Consumer API
  • Kafka Streams API
  • Kafka Connect API

These APIs are also providing publish-subscribe operations, events stream processing as well as broker/topic/partition query and inspection.

In order to implement a Kafka client using Kafka’s own API, you have to add kafka-client dependency.

  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.7.0</version>
  </dependency>

The API is similar to KafkaConsumer that we used before for polling. This time the API is imported from org.apache.kafka.clients.consumer. You can see the following snippet as an example:

  // Add properties.
  final Properties props = new Properties();
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  props.put(KafkaJsonDeserializerConfig.JSON_VALUE_TYPE, DataRecord.class);
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "default");
  //Create consumer
  final Consumer<String, DataRecord> consumer = new KafkaConsumer<String, DataRecord>(props);
  //Subscribe to topic
  consumer.subscribe(Arrays.asList("mySampleTopic"));
  //Start consuming
  ConsumerRecords<String, DataRecord> records = consumer.poll(1000);
  for (ConsumerRecord<String, DataRecord> record : records) {
          System.out.println("Event:" + record.toString());
  }

Using Spring Kafka enables us to apply core Spring concepts to develop Kafka based applications like dependency injection and declarative programming etc. With help of the KafkaTemplate, it provides a high-level abstraction for sending messages. Also, KafkaListener provides a way to use the powerful Message Driver POJO concept by using simple annotations.

Spring Kafka exposes some of its metrics through JMX beans. If you use Spring actuator, you can expose any Kafka metrics for monitoring your application. The Spring Kafka API has also some similarities to the Spring JMS API.

6. Conclusion

In this article, we deep-dived into Spring Kafka for consuming messages and summarized several key concepts on Kafka topics. We also showcased simple concurrent consumer examples.

Leave a Comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.