[Kafka] – Spring Boot Kafka Producer – Consumer Example

Tiếp tục chuỗi bài viết về Kafka. Bài viết này mình sẽ hướng dẫn cấu hình một ứng dụng Spring Boot config sử dụng với Kafka đơn giản Producer và Consumer. Bài viết sẽ hướng dẫn cách cấu hình gửi nhận tin nhắn dạng String object và Java object do chúng ta định nghĩa ra.

Tiếp tục chuỗi bài viết về Kafka. Bài viết này mình sẽ hướng dẫn cấu hình một ứng dụng Spring Boot config sử dụng với Kafka đơn giản Producer và Consumer. Bài viết sẽ hướng dẫn cách cấu hình gửi nhận tin nhắn dạng String object và Java object do chúng ta định nghĩa ra. Nội dung bài viết tập trung vào việc hướng dẫn gửi nhận các loại đối tượng khác nhau là chính, nên các phần cấu hình như đọc thông tin cấu hình từ file .yaml, tạo các @Bean phức tạp hơn mình sẽ loại bỏ.

Chúng ta có một common hay constant class như sau:

public class ApplicationConstant {
	public static final String KAFKA_LOCAL_SERVER_CONFIG = "localhost:9092";
	public static final String GROUP_ID_STRING = "group-id-string";
    public static final String GROUP_ID_JSON = "group-id-json";
	public static final String TOPIC_NAME = "topic-test";
}

1. Gửi/nhận String message

Config : class cấu hình Consumer nhận tin nhắn (String message)

public class SpringKafkaConfig {
	@Bean
	public ConsumerFactory<String, String> consumingEventStringMessage() {
		Map<String, Object> configMap = new HashMap<>();
		configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstant.KAFKA_LOCAL_SERVER_CONFIG);
		configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		configMap.put(ConsumerConfig.GROUP_ID_CONFIG, ApplicationConstant.GROUP_ID_STRING);
		return new DefaultKafkaConsumerFactory<>(configMap);
	}

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, String> listenerEventSendStringMessage() {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
		factory.setConsumerFactory(consumingEventStringMessage());
		return factory;
	}
}

KafkaProducer : class gửi tin nhắn

@RestController
@RequestMapping("/produce")
public class KafkaProducer {

	@Autowired
	private KafkaTemplate<String, Object> kafkaTemplate;

	@GetMapping("/{message}")
	public String sendMessage(@PathVariable String message) {

		try {
			kafkaTemplate.send(ApplicationConstant.TOPIC_NAME, message);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return "Message sent succuessfully";
	}
}

KafkaConsumer : class nhận tin nhắn

@Component
public class KafkaConsumer {

	private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

	@KafkaListener(groupId = ApplicationConstant.GROUP_ID_STRING, topics = ApplicationConstant.TOPIC_NAME, 
			containerFactory = "listenerEventSendStringMessage")
	public void receivedMessage(String message) {
		logger.info("Message Received using Kafka listener " + message);
	}
}

Gửi/nhận JSON message

Model

public class Student {
	private Long id;
	private String name;
	private String rollNumber;

    //getter & setter
}

Config : class cấu hình Consumer nhận tin nhắn (Student object message)

@Configuration
@EnableKafka
public class SpringKafkaConfig {

	@Bean
	public ProducerFactory<String, Object> producerFactory() {
		Map<String, Object> configMap = new HashMap<>();
		configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstant.KAFKA_LOCAL_SERVER_CONFIG);
		configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
		return new DefaultKafkaProducerFactory<String, Object>(configMap);
	}

	@Bean
	public KafkaTemplate<String, Object> kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}

	@Bean
	public ConsumerFactory<String, Student> consumingEventSendStudentMessage() {
		Map<String, Object> configMap = new HashMap<>();
		configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstant.KAFKA_LOCAL_SERVER_CONFIG);
		configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
		configMap.put(ConsumerConfig.GROUP_ID_CONFIG, ApplicationConstant.GROUP_ID_JSON);
		configMap.put(JsonDeserializer.TRUSTED_PACKAGES, "tiendv.example.model.dto");
//line-50
		return new DefaultKafkaConsumerFactory<>(configMap);
	}

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, Student> listenerEventSendStudentMessage() {
		ConcurrentKafkaListenerContainerFactory<String, Student> factory = new ConcurrentKafkaListenerContainerFactory<String, Student>();
		factory.setConsumerFactory(consumingEventSendStudentMessage());
		return factory;
	}
}

KafkaProducer : class gửi tin nhắn

@RestController
@RequestMapping("/produce")
public class KafkaProducer {

	@Autowired
	private KafkaTemplate<String, Object> kafkaTemplate;

	@PostMapping("/message")
	public String sendMessage(@RequestBody Student message) {
		try {
			kafkaTemplate.send(ApplicationConstant.TOPIC_NAME, message);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return "json message sent succuessfully";
}

KafkaConsumer : class nhận tin nhắn

@Component
public class KafkaConsumer {

	private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

	@KafkaListener(groupId = ApplicationConstant.GROUP_ID_JSON, topics = ApplicationConstant.TOPIC_NAME, containerFactory = "listenerEventSendStudentMessage")
	public void receivedMessage(Student message) throws JsonProcessingException {
		ObjectMapper mapper = new ObjectMapper();
		String jsonString = mapper.writeValueAsString(message);
		logger.info("Json message received using Kafka listener " + jsonString);
	}
}

Định nghĩa một Rest end-point để nhận message từ Kafka topic

@RestController
@RequestMapping("/consume")
public class KafkaConsumer {

	@Autowired
	private ConcurrentKafkaListenerContainerFactory<String, Student> factory;

	@GetMapping("/message")
	public List<Student> receiveMessage() {
		List<Student> students = new ArrayList<>();
		ConsumerFactory<String, Student> consumerFactory = factory.getConsumerFactory();
		Consumer<String, Student> consumer = consumerFactory.createConsumer();
		try {
			consumer.subscribe(Arrays.asList(ApplicationConstant.TOPIC_NAME));
			ConsumerRecords<String, Student> consumerRecords = consumer.poll(10000);
			Iterable<ConsumerRecord<String, Student>> records = consumerRecords.records(ApplicationConstant.TOPIC_NAME);
			Iterator<ConsumerRecord<String, Student>> iterator = records.iterator();

			while (iterator.hasNext()) {
				students.add(iterator.next().value());
			}

		} catch (Exception e) {
			e.printStackTrace();
		}
		return students;
	}
}

Tổng kết

Trên đây là hướng dẫn để mọi người biết thêm cách cấu hình gửi nhận tin nhắn với 2 loại đối tượng là String và Java object.
Hy vọng mọi người sẽ hiểu được ý tưởng tổng thể để có thể áp dụng nó vào dự án của mọi người một cách thoải mái nhất.

Mọi người có thể tìm hiểu các bài viết liên quan tại đây:

Nguồn:https://thenewstack.wordpress.com/2021/11/24/kafka-spring-boot-kafka-producer-consumer-example/

Follow me: thenewstack.wordpress.com

Nguồn: viblo.asia

Bài viết liên quan

9 Mẹo lập trình Web “ẩn mình” giúp tiết kiệm hàng giờ đồng hồ

Hầu hết các lập trình viên (kể cả những người giỏi) đều tốn thời gian x

Can GPT-4o Generate Images? All You Need to Know about GPT-4o-image

OpenAI‘s GPT-4o, introduced on March 25, 2025, has revolutionized the way we create visual con

Khi nào nên dùng main, section, article, header, footer, và aside trong HTML5

HTML5 đã giới thiệu các thẻ ngữ nghĩa giúp cấu trúc nội dung web một cách có

So sánh Webhook và API: Khi nào nên sử dụng?

Trong lĩnh vực công nghệ thông tin và phát triển phần mềm, Webhook và API là hai th