How to implement the Apache Kafka in Spring Boot application

Last updated on May 29 2022
Rajeev Agarwal

Table of Contents

How to implement the Apache Kafka in Spring Boot application

Apache Kafka is an open-source project used to publish and subscribe the messages based on the fault-tolerant messaging system. It is fast, scalable and distributed by design.
In this blog, we are going to see how to implement the Apache Kafka in Spring Boot application.
First, we need to add the Spring Kafka dependency in our build configuration file.
Maven users can add the following dependency in the pom.xml file.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
Gradle users can add the following dependency in the build.gradle file.
compile group: ‘org.springframework.kafka’, name: ‘spring-kafka’, version: ‘2.1.0.RELEASE’

Producing Messages

To produce messages into Apache Kafka, we need to define the Configuration class for Producer configuration as shown −
package com.tecklearn.kafkademo;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “localhost:9092”);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
To publish a message, auto wire the Kafka Template object and produce the message as shown.
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String msg) {
kafkaTemplate.send(topicName, msg);
}

Consuming a Message

To consume messages, we need to write a Consumer configuration class file as shown below.
package com.tecklearn.kafkademo;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, “localhost:2181”);
props.put(ConsumerConfig.GROUP_ID_CONFIG, “group-id”);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Next, write a Listener to listen to the messages.
@KafkaListener(topics = “tecklearn”, groupId = “group-id”)
public void listen(String message) {
System.out.println(“Received Messasge in group – group-id: ” + message);
}
Let us call the sendMessage() method from ApplicationRunner class run method from the main Spring Boot application class file and consume the message from the same class file.
Your main Spring Boot application class file code is given below −
package com.tecklearn.kafkademo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootApplication
public class KafkaDemoApplication implements ApplicationRunner {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String msg) {
kafkaTemplate.send(“tecklearn”, msg);
}
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
@KafkaListener(topics = “tecklearn”, groupId = “group-id”)
public void listen(String message) {
System.out.println(“Received Messasge in group – group-id: ” + message);
}
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessage(“Hi Welcome to Spring For Apache Kafka”);
}
}
The code for complete build configuration file is given below.
Maven – pom.xml
<?xml version = “1.0” encoding = “UTF-8”?>
<project xmlns = “http://maven.apache.org/POM/4.0.0”
xmlns:xsi = “http://www.w3.org/2001/XMLSchema-instance”
xsi:schemaLocation = “http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd”>

<modelVersion>4.0.0</modelVersion>
<groupId>com.tecklearn</groupId>
<artifactId>kafka-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-demo</name>
<description>Demo project for Spring Boot</description>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath /> <!– lookup parent from repository –>
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Gradle – build.gradle
buildscript {
ext {
springBootVersion = ‘1.5.9.RELEASE’
}
repositories {
mavenCentral()
}
dependencies {
classpath(“org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}”)
}
}

apply plugin: ‘java’
apply plugin: ‘eclipse’
apply plugin: ‘org.springframework.boot’

group = ‘com.tecklearn’
version = ‘0.0.1-SNAPSHOT’
sourceCompatibility = 1.8

repositories {
mavenCentral()
}
dependencies {
compile(‘org.springframework.boot:spring-boot-starter’)
compile group: ‘org.springframework.kafka’, name: ‘spring-kafka’, version: ‘2.1.0.RELEASE’
testCompile(‘org.springframework.boot:spring-boot-starter-test’)
}
Now, create an executable JAR file, and run the Spring Boot application by using the below Maven or Gradle commands as shown −
For Maven, use the command as shown −
mvn clean install
After “BUILD SUCCESS”, you can find the JAR file under the target directory.
For Gradle, use the command as shown −
gradle clean build
After “BUILD SUCCESSFUL”, you can find the JAR file under the build/libs directory.
Run the JAR file by using the command given here −
java –jar <JARFILE>
You can see the output in console window.
So, this brings us to the end of blog. This Tecklearn ‘How to implement the Apache Kafka in Spring Boot application’ blog helps you with commonly asked questions if you are looking out for a job in Java Programming. If you wish to learn Spring Boot and build a career Java Programming domain, then check out our interactive, Java and JEE Training, that comes with 24*7 support to guide you throughout your learning period. Please find the link for course details:

Java and JEE Training

Java and JEE Training

About the Course

Java and JEE Certification Training is designed by professionals as per the industrial requirements and demands. This training encompasses comprehensive knowledge on basic and advanced concepts of core Java & J2EE along with popular frameworks like Hibernate, Spring & SOA. In this course, you will gain expertise in concepts like Java Array, Java OOPs, Java Function, Java Loops, Java Collections, Java Thread, Java Servlet, and Web Services using industry use-cases and this will help you to become a certified Java expert.

Why Should you take Java and JEE Training?

• Java developers are in great demand in the job market. With average pay going between $90,000/- to $120,000/- depending on your experience and the employers.
• Used by more than 10 Million developers worldwide to develop applications for 15 Billion devices.
• Java is one of the most popular programming languages in the software world. Rated #1 in TIOBE Popular programming languages index (15th Consecutive Year)

What you will Learn in this Course?

Introduction to Java
• Java Fundamentals
• Introduction to Java Basics
• Features of Java
• Various components of Java language
• Benefits of Java over other programming languages
• Key Benefits of Java
Installation and IDE’s for Java Programming Language
• Installation of Java
• Setting up of Eclipse IDE
• Components of Java Program
• Editors and IDEs used for Java Programming
• Writing a Simple Java Program
Data Handling and Functions
• Data types, Operations, Compilation process, Class files, Loops, Conditions
• Using Loop Constructs
• Arrays- Single Dimensional and Multi-Dimensional
• Functions
• Functions with Arguments
OOPS in Java: Concept of Object Orientation
• Object Oriented Programming in Java
• Implement classes and objects in Java
• Create Class Constructors
• Overload Constructors
• Inheritance
• Inherit Classes and create sub-classes
• Implement abstract classes and methods
• Use static keyword
• Implement Interfaces and use it
Polymorphism, Packages and String Handling
• Concept of Static and Run time Polymorphism
• Function Overloading
• String Handling –String Class
• Java Packages
Exception Handling and Multi-Threading
• Exception handling
• Various Types of Exception Handling
• Introduction to multi-threading in Java
• Extending the thread class
• Synchronizing the thread
File Handling in Java
• Input Output Streams
• Java.io Package
• File Handling in Java
Java Collections
• Wrapper Classes and Inner Classes: Integer, Character, Boolean, Float etc
• Applet Programs: How to write UI programs with Applet, Java.lang, Java.io, Java.util
• Collections: ArrayList, Vector, HashSet, TreeSet, HashMap, HashTable
Java Database Connectivity (JDBC)
• Introduction to SQL: Connect, Insert, Update, Delete, Select
• Introduction to JDBC and Architecture of JDBC
• Insert/Update/Delete/Select Operations using JDBC
• Batch Processing Transaction
• Management: Commit and Rollback
Java Enterprise Edition – Servlets
• Introduction to J2EE
• Client Server architecture
• URL, Port Number, Request, Response
• Need for servlets
• Servlet fundamentals
• Setting up a web project in Eclipse
• Configuring and running the web app with servlets
• GET and POST request in web application with demo
• Servlet lifecycle
• Servlets Continued
• Session tracking and filter
• Forward and include Servlet request dispatchers
Java Server Pages (JSP)
• Fundamentals of Java Server Page
• Writing a code using JSP
• The architecture of JSP
• JSP Continued
• JSP elements: Scriptlets, expressions, declaration
• JSP standard actions
• JSP directives
• Introduction to JavaBeans
• ServletConfig and ServletContext
• Servlet Chaining
• Cookies Management
• Session Management
Hibernate
• Introduction to Hibernate
• Introduction to ORM
• ORM features
• Hibernate as an ORM framework
• Hibernate features
• Setting up a project with Hibernate framework
• Basic APIs needed to do CRUD operations with Hibernate
• Hibernate Architecture
POJO (Plain Old Java Object)
• POJO (Plain Old Java Object)
• Persistent Objects
• Lifecycle of Persistent Object
Spring
• Introduction to Spring
• Spring Fundamentals
• Advanced Spring
Got a question for us? Please mention it in the comments section and we will get back to you.

0 responses on "How to implement the Apache Kafka in Spring Boot application"

Leave a Message

Your email address will not be published. Required fields are marked *