Spring Boot + Kafka - Producer and Consumer Example

 In this section, we will learn how to do asynchronous communication using Kafka in Spring Boot application.

We will develop two Spring Boot applications, one producer and one consumer application, then we will learn how to use Kafka in the Spring boot application to send and receive messages between the Producer and the Consumer.

In the digital world, different systems are constantly sending or receiving messages. This must be carried out in a controlled manner so that messages do not block each other, end up creating a jam and processes cannot function optimally. In order for applications to be able to communicate with each other easily, it makes sense to create an intermediary, that is, a service that is responsible for managing the distribution of messages: this is what is known as a messaging brokerKafka, one of the best known.

1. What we will build?

We will develop two Spring Boot applications, one producer and one consumer application, then we will learn how to use Kafka in the Spring boot application to send and receive messages between the Producer and the Consumer.

The following diagram is an overview of the components in our messaging system.


2.Key terminologies of Kafka:

  • Producer: A producer is a client that sends messages to the Kafka server to the specified topic. 
  • Consumer: Consumers are the recipients who receive messages from the Kafka server. 
  • Broker: Brokers can create a Kafka cluster by sharing information using Zookeeper. A broker receives messages from producers and consumers fetch messages from the broker by topic, partition, and offset. 
  • Cluster: A Kafka cluster is a system that consists of several Brokers, Topics, and Partitions for both. 
  • Topic: Kafka topics are the categories used to organize messages. Each topic has a name that is unique across the entire Kafka cluster. Messages are sent to and read from specific topics. In other words, producers write data to topics, and consumers read data from topics. 

When working with Apache Kafka, ZooKeeper is primarily used to track the status of nodes in the Kafka cluster and maintain a list of Kafka topics and messages.


3. Install and Setup Apache Kafka

  • Extract Kafka zip in the local file system
  • Start Zookeeper service. Use the below command to start the Zookeeper service:
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
  • Start Kafka Broker. Open another terminal session and run the below command to start the Kafka broker:
.\bin\windows\kafka-server-start.bat .\config\server.properties
Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.
Note: If you are getting error while run the command please refer this link - click here

4. Creating spring boot consumer application

First, open the Spring initializr https://start.spring.io/ 
Then, Provide the Group and Artifact name. We have provided Group name com.knf.dev.demo and Artifact consumer. Here I selected the Maven project - language Java 17 - Spring Boot 3.1.5 , and Spring for Apache Kafka.


Then, click on the Generate button. When we click on the Generate button, it starts packing the project in a .zip(consumer) file and downloads the project. Then, Extract the Zip file. 

Then, import the project on your favourite IDE.

Final Project directory:


Complete pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.knf.dev.demo</groupId>
<artifactId>consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>consumer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

application.yaml

You can then configure kafka properties according to the Documentation - scroll down to the kafka properties...

Then, specify bootstrap-servers, key-serializer, value-serializer, topic name, and group.

spring:
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
kafka:
topic:
name: knowledgeFactory_Topic
group: knowledgeFactory_group


Create Consumer

Consumer is  the service that will be responsible for reading messages and processing them according to the needs of your own business logic. @KafkaListener allows a method to consume messages from Kafka topic(s).

package com.knf.dev.demo.consumer.service;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class Consumer {

@KafkaListener(topics = {"${kafka.topic.name}"}, groupId = "${kafka.topic.name}")
public void listenToTopic(String message)
{
System.out.println("Message arrived! Message: " + message);
}
}


ConsumerApplication.java

package com.knf.dev.demo.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}

}

5. Creating spring boot producer application

First, open the Spring initializr https://start.spring.io/ 

Then, Provide the Group and Artifact name. We have provided Group name com.knf.dev.demo and Artifact producer. Here I selected the Maven project - language Java 17 - Spring Boot 3.1.5 and add Spring Web, and Spring for Apache Kafka. 

Then, click on the Generate button. When we click on the Generate button, it starts packing the project in a .zip(producer) file and downloads the project. Then, Extract the Zip file. Then, import the project on your favourite IDE.

Final Project directory:


Complete pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.knf.dev.demo</groupId>
<artifactId>producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>producer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

application.yaml

Add the following configuration to application.yaml file:

spring:
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
kafka:
topic:
name: knowledgeFactory_Topic

Create Producer

Creating a producer will write our messages on the topic. KafkaTemplate is a class that is part of the Spring to produce messages into the Kafka Topics.

package com.knf.dev.demo.producer.service;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class Producer {

KafkaTemplate<String,String> kafkaTemplate;

@Value("${kafka.topic.name}")
private String topic;

public Producer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

public void sendMessageToTopic(String message)
{
kafkaTemplate.send(topic,message);
}
}

Create MessageController

package com.knf.dev.demo.producer.controller;

import com.knf.dev.demo.producer.service.Producer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/v1")
public class MessageController {

Producer producer;

public MessageController(Producer producer) {
this.producer = producer;
}

@GetMapping("/sendMessage")
public String publishMessage(@RequestParam("message") String message) {

producer.sendMessageToTopic(message);
return "Message was sent successfully";
}
}

ProducerApplication.java

package com.knf.dev.demo.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProducerApplication {

public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}

}

6. Verify our system is working as expected

First start the producer application, then start the consumer application:


Run the Spring Boot application - mvn spring-boot:run


OR 


Run this Spring boot application from 

  • IntelliJ IDEA IDE by right click - Run 'Application.main()' 
  • Eclipse/STS - You can right click the project or the Application.java file and run as java application or Spring boot application.

Open a Postman or any other tool and send a message to the producer:

The console of the consumer application now prints:

Popular posts from this blog

Learn Java 8 streams with an example - print odd/even numbers from Array and List

Java Stream API - How to convert List of objects to another List of objects using Java streams?

Registration and Login with Spring Boot + Spring Security + Thymeleaf

Java, Spring Boot Mini Project - Library Management System - Download

ReactJS, Spring Boot JWT Authentication Example

Spring Boot + Mockito simple application with 100% code coverage

Top 5 Java ORM tools - 2024

Java - Blowfish Encryption and decryption Example

Spring boot video streaming example-HTML5

Google Cloud Storage + Spring Boot - File Upload, Download, and Delete