Spring Boot — kafka produce and consume list of objects — short example

Buddhi Prabhath
2 min readOct 24, 2020

--

In this short example, producer will send list of java objects serialised as json string to a kafka topic and consumer will deserialise json to java list of objects

https://github.com/buddhiprab/springboot-kafka-listofobjects

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka</name>
<description>Kafka Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.11.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.11.3</version>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

producer config — kafkaTemplate for generic message values, so we can use this kafkaTemplate bean to send any object type as message value, for example i’m using this bean to send List<FooObject> in the test method sendFooObjectListMessage()

package com.example.demo.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, ?> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, ?> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

consumer config — for List<Permission>

package com.example.demo.config;


import com.example.demo.model.FooObject;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, List<FooObject>> consumerFactory(){
Map<String,Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,"foo");
ObjectMapper om = new ObjectMapper();
JavaType type = om.getTypeFactory().constructParametricType(List.class, FooObject.class);
return new DefaultKafkaConsumerFactory<>(config,new StringDeserializer(), new JsonDeserializer<List<FooObject>>(type, om, false));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, List<FooObject>> fooListener(){
ConcurrentKafkaListenerContainerFactory<String, List<FooObject>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

message producer — test method to send messages of List<FooObject> to a topic created in kafka server ex. fooTopic

package com.example.demo;

import com.example.demo.model.FooObject;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

@SpringBootTest
class DemoApplicationTests {

@Autowired
private KafkaTemplate<String, List<FooObject>> kafkaTemplate;

@Test
void sendFooObjectListMessage() {
FooObject p1 = new FooObject();
p1.setName("name 1");
p1.setType("Hi 1");

FooObject p2 = new FooObject();
p2.setName("name 2");
p2.setType("Hi 2");

List<FooObject> list = new ArrayList<>();
list.add(p1);
list.add(p2);
kafkaTemplate.send("fooTopic",UUID.randomUUID().toString(),list);
}

}

message listener — listening to kafka topic named fooTopic, receives List<FooObject> messages

package com.example.demo.listner;

import com.example.demo.model.FooObject;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class MessageListener {
@KafkaListener(topics = "fooTopic", groupId = "foo", containerFactory = "fooListener")
void listener(List<FooObject> data) {
data.forEach(o-> System.out.println(o.getType()+":"+o.getName()));
}
}

--

--

Buddhi Prabhath
Buddhi Prabhath

Responses (1)