How to – Kafka Producer Example using Spring Kafka

In this article, we will quickly look at "what is Kafka" and then how to write a Java kafka producer application using Spring Framework and Spring-kafka

1. Kafka Overview

In today’s world, capturing data from varied sources like sensors, mobile devices, databases, and software applications is crucial in most business-critical applications. These sources generate event streams, and many business use cases need us to store, process, route and react to these events in real or near real-time.

Rise of event-based architectures

Event-based architectures allow processing of large numbers of financial transactions in real-time, continuously capture IoT data, capturing user interactions, etc. Event-driven architectures are fast becoming the preferred architectural pattern to set up core foundational platforms for missions critical systems at some of the world’s largest enterprises.

Apache Kafka is an event streaming platform that acts as the underlying fabric to realize such event-based architectures. Apache Kafka provides the following key capabilities:

  1. Publish and subscribe to event streams for writing and reading data, including continuous import/export of your data from other systems
  2. Durable and reliable storage of events
  3. Immediate processing of event streams

Kafka is battle-hardened for use in systems where there is a need for asynchronous, fault-tolerant, high-throughput, and durable messaging. Kafka is one of the best technologies in the messaging landscape, and some key capabilities are:

  • Work as a cluster scaling to hundreds of nodes
  • Near real-time high-speed processing performance
  • Can handle trillions of messages per day
  • Easy producer and consumer integration

Kafka is a horizontally scalable, fault-tolerant, and fast messaging system that decouples event producer and consumer applications, naturally aligning with the microservices style of organization application boundaries. Multiple producers and consumers can write and read with very short delays. Apache Kafka is built using Java and Scala programming languages by former LinkedIn engineers. In 2011, it was handed over to the open-source community as a highly scalable messaging platform.

What are events in Kafka?

The terminology of Kafka is simple where everything starts with an "event" which represents an "announcement" by the producer that indicates "something of interest to one of more consumers has happened" Event may also be referred to as a record or message in the messaging system world. Technically an event has a key, value, timestamp, and optional metadata headers in Kafka.

Event producers and consumers are completely decoupled and agnostic of each other and this core tenet of event-driven systems helps achieve high-throughput, high scalability, and resiliency.

Kafka topics and partitions

Kafka stores the events in topics that are used for the organization of data. Multiple client applications can simultaneously write and read to/from the same topics. In traditional messaging systems, when a client fetches an event from a topic, this event is deleted automatically. But in Kafka, we configure how long Kafka should retain the events per topic. In order to distribute the data across nodes, Kafka uses partitions. This distributed placement of our data is crucial for scalability because it allows the Kafka infrastructure itself to scale horizontally.

2. Introduction to Spring kafka

In order to quickly build Kafka-based messaging solutions, Spring provides "Spring Kafka" project. Spring Kafka enables developers to leverage Spring’s popular template programming model with easy to use APIs.

Spring for Apache Kafka provides KafkaTemplate, a thread-safe template for executing high-level operations on a Kafka server. Spring also provides Message-driven POJOs via @KafkaListener annotation for the consumer side.

3. Example Use case

Let’s assume we want to create a sample topic in our application and send a simple message to that topic every 5 seconds.

Before creating our application, we should first run the Kafka server. In order to run Kafka, you need to have Java 8/11 and Scala. You can download scala from here. There are binaries and sources in this URL.

After installing scala, download Apache Kafka from here. For these examples, Scala 2.13.4 and Kafka 2.6.0 are used.

Execute the following command in Kafka folder bin/zookeeper-server-start.sh config/zookeeper.properties to start the Kafka Zookeper service. In order to start the Kafka Broker Service execute the following command: bin/kafka-server-start.sh config/server.properties. If you are using Windows, there are Windows versions of these scripts as well. After starting the server, you should see a typical log below:

....
[2020-12-14 15:07:02,833] INFO Kafka version: 2.6.0 (org.apache.kafka.common.utils.AppInfoParser)
[2020-12-14 15:07:02,836] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
....

4. Step by step code snippets with explanation

Create the project

Initialize a maven Spring Boot project using Spring initializr.
Alternatively, copy Talentify’s starter Spring Boot pom.xml

Add Maven dependencies for Spring Kafka

Add the spring-kafka dependency to your pom.xml enable Spring Kafka support

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

If you plan to add tests for your kafka components, add the spring-kafka-test dependency to your pom.xml enable Spring Kafka test support

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

Create the SpringBootApplication for our Kafka producer

Let’s create a new SpringBootApplication that implements CommandLineRunner. Use the following code snippet –

@SpringBootApplication
@Configuration
public class KafkaProducerDemoApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerDemoApplication.class, args);
    }
    
    @Override
    public void run(String... args) throws Exception {
    ...
    }
}

If you started with Spring Initializr, your project should already have an Application class with @SpringBootApplication annotation. In this case, you should just make relevant changes to your existing class instead of adding a new one (implement CommandLinerunner and add the run() method as shown in the snippet above)

KafkaTemplate

After that just wire the KafkaTemplate with String as a message key and value which uses DefaultKafkaProducerFactory.

    @Autowired
    private KafkaTemplate<String, String> template;

If you have a previously created topic you can send a message to the topic easily with the following command:

Produce events using KafkaTemplate

this.template.send("mySampleTopic", "mySampleEvent");

By default, Kafka uses port 9092 as the broker port. You can override these settings for the DefaultKafkaProducerFactory bean. In that case, you have to create KafkaTemplate manually of course.
Another alternative is using the Spring application.properties or application.yaml. You can use spring.kafka.producer.bootstrap-servers property which requires a comma-delimited list of host:port pairs to establish the initial connections to the Kafka cluster.

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new DefaultKafkaProducerFactory<>(props);
}

In case where you want your application to create the topic Spring enables us to create custom beans creating new topics.

    @Bean
    public NewTopic mySampleTopic() {
        return new NewTopic("mySampleTopic", 1, (short) 1);
    }

Finally, implement the sending logic by the following code snippet which sends ten messages with a period of 5 seconds.

    @Override
    public void run(String... args) throws Exception {
        for(int i=0; i<10; i++) {
            this.template.send("mySampleTopic", "mySampleEvent" + i);
            Thread.sleep(5000);
        }
    }

5. Summary

In this article, we have looked at how to use Spring for Apacha Kafka to send messages to a specific topic and configuring the bootstrap servers.

Leave a Comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.