Setting up Kafka with a Spring Boot application involves integrating the Spring Kafka library and configuring Kafka producers, consumers, and listeners. Below is a step-by-step guide, along with an example, to build a Kafka-enabled Spring Boot application.
Steps to Setup Kafka with Spring Boot
1. Prerequisites
- Apache Kafka is installed and running.
- Download Kafka: Kafka Downloads
- Start Kafka and Zookeeper:
# Start Zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties # Start Kafka bin/kafka-server-start.sh config/server.properties
- Java Development Kit (JDK) installed.
- Spring Boot application setup.
2. Add Dependencies to pom.xml
Add the required Kafka dependencies to your Spring Boot project:
<dependencies>
<!-- Spring Boot Starter for Kafka -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
3. Configure Kafka Properties
Add Kafka configuration in application.properties
or application.yml
:
# Kafka broker address
spring.kafka.bootstrap-servers=localhost:9092
# Producer configuration
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# Consumer configuration
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=false
4. Create a Kafka Topic Configuration
Define a topic configuration using a @Configuration
class:
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic myTopic() {
return new NewTopic("my-topic", 3, (short) 1); // Topic name, partitions, replication factor
}
}
5. Create a Kafka Producer
Create a service class to send messages to Kafka:
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
kafkaTemplate.send("my-topic", message); // Send message to the topic
}
}
6. Create a Kafka Consumer
Create a service class to consume messages from Kafka:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void consumeMessage(String message) {
System.out.println("Received message: " + message);
}
}
7. Add a Controller to Test Kafka
Create a REST controller to trigger the producer and send messages:
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
private final KafkaProducer kafkaProducer;
public KafkaController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@GetMapping("/send")
public String sendMessage(@RequestParam String message) {
kafkaProducer.sendMessage(message);
return "Message sent: " + message;
}
}
8. Run the Application
- Start your Kafka broker and Spring Boot application.
- Use a tool like Postman or a browser to send a message:
http://localhost:8080/send?message=HelloKafka
- Check the application logs for the consumer output:
Received message: HelloKafka
Project Structure
src/main/java
├── com.example.kafka
│ ├── KafkaApplication.java (Main class)
│ ├── KafkaProducer.java (Producer service)
│ ├── KafkaConsumer.java (Consumer service)
│ ├── KafkaTopicConfig.java (Topic configuration)
│ ├── KafkaController.java (REST controller)
Code Explanation
- Producer:
- Sends messages to the Kafka topic using the
KafkaTemplate
.
- Sends messages to the Kafka topic using the
- Consumer:
- Listens to messages from the Kafka topic using
@KafkaListener
.
- Listens to messages from the Kafka topic using
- Topic Configuration:
- Automatically creates the topic if it doesn’t exist using the
AdminClient
.
- Automatically creates the topic if it doesn’t exist using the
- Controller:
- Exposes an endpoint to test the producer by sending a message to Kafka.
Testing Kafka Locally
- Use the Kafka console to produce and consume messages:
# Produce messages bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic # Consume messages bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
- Verify the integration by sending a REST request to the application and observing the output in the logs or Kafka consumer.
Example Output
- Producer sends a message:
Sending message: HelloKafka
- Consumer receives the message:
Received message: HelloKafka
This simple Kafka setup with Spring Boot allows you to build scalable event-driven applications. You can further enhance it with advanced Kafka features like transactions, error handling, and monitoring.