How to do the Kafka setup with spring boot Application ?

Setting up Kafka with a Spring Boot application involves integrating the Spring Kafka library and configuring Kafka producers, consumers, and listeners. Below is a step-by-step guide, along with an example, to build a Kafka-enabled Spring Boot application.


Steps to Setup Kafka with Spring Boot

1. Prerequisites

  • Apache Kafka is installed and running.
    • Download Kafka: Kafka Downloads
    • Start Kafka and Zookeeper: # Start Zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties # Start Kafka bin/kafka-server-start.sh config/server.properties
  • Java Development Kit (JDK) installed.
  • Spring Boot application setup.

2. Add Dependencies to pom.xml

Add the required Kafka dependencies to your Spring Boot project:

<dependencies>
    <!-- Spring Boot Starter for Kafka -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

3. Configure Kafka Properties

Add Kafka configuration in application.properties or application.yml:

# Kafka broker address
spring.kafka.bootstrap-servers=localhost:9092

# Producer configuration
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# Consumer configuration
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=false

4. Create a Kafka Topic Configuration

Define a topic configuration using a @Configuration class:

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic myTopic() {
        return new NewTopic("my-topic", 3, (short) 1); // Topic name, partitions, replication factor
    }
}

5. Create a Kafka Producer

Create a service class to send messages to Kafka:

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        kafkaTemplate.send("my-topic", message); // Send message to the topic
    }
}

6. Create a Kafka Consumer

Create a service class to consume messages from Kafka:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void consumeMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

7. Add a Controller to Test Kafka

Create a REST controller to trigger the producer and send messages:

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    private final KafkaProducer kafkaProducer;

    public KafkaController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    @GetMapping("/send")
    public String sendMessage(@RequestParam String message) {
        kafkaProducer.sendMessage(message);
        return "Message sent: " + message;
    }
}

8. Run the Application

  • Start your Kafka broker and Spring Boot application.
  • Use a tool like Postman or a browser to send a message: http://localhost:8080/send?message=HelloKafka
  • Check the application logs for the consumer output: Received message: HelloKafka

Project Structure

src/main/java
├── com.example.kafka
│   ├── KafkaApplication.java       (Main class)
│   ├── KafkaProducer.java          (Producer service)
│   ├── KafkaConsumer.java          (Consumer service)
│   ├── KafkaTopicConfig.java       (Topic configuration)
│   ├── KafkaController.java        (REST controller)

Code Explanation

  1. Producer:
    • Sends messages to the Kafka topic using the KafkaTemplate.
  2. Consumer:
    • Listens to messages from the Kafka topic using @KafkaListener.
  3. Topic Configuration:
    • Automatically creates the topic if it doesn’t exist using the AdminClient.
  4. Controller:
    • Exposes an endpoint to test the producer by sending a message to Kafka.

Testing Kafka Locally

  1. Use the Kafka console to produce and consume messages: # Produce messages bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic # Consume messages bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
  2. Verify the integration by sending a REST request to the application and observing the output in the logs or Kafka consumer.

Example Output

  • Producer sends a message: Sending message: HelloKafka
  • Consumer receives the message: Received message: HelloKafka

This simple Kafka setup with Spring Boot allows you to build scalable event-driven applications. You can further enhance it with advanced Kafka features like transactions, error handling, and monitoring.

Related Posts

Leave a Reply

Your email address will not be published. Required fields are marked *