[Kafka] – Spring Boot Kafka Producer – Consumer Example

Tiếp tục chuỗi bài viết về Kafka. Bài viết này mình sẽ hướng dẫn cấu hình một ứng dụng Spring Boot config sử dụng với Kafka đơn giản Producer và Consumer. Bài viết sẽ hướng dẫn cách cấu hình gửi nhận tin nhắn dạng String object và Java object do chúng ta định nghĩa ra.

Tiếp tục chuỗi bài viết về Kafka. Bài viết này mình sẽ hướng dẫn cấu hình một ứng dụng Spring Boot config sử dụng với Kafka đơn giản Producer và Consumer. Bài viết sẽ hướng dẫn cách cấu hình gửi nhận tin nhắn dạng String object và Java object do chúng ta định nghĩa ra. Nội dung bài viết tập trung vào việc hướng dẫn gửi nhận các loại đối tượng khác nhau là chính, nên các phần cấu hình như đọc thông tin cấu hình từ file .yaml, tạo các @Bean phức tạp hơn mình sẽ loại bỏ.

Chúng ta có một common hay constant class như sau:

public class ApplicationConstant {
	public static final String KAFKA_LOCAL_SERVER_CONFIG = "localhost:9092";
	public static final String GROUP_ID_STRING = "group-id-string";
    public static final String GROUP_ID_JSON = "group-id-json";
	public static final String TOPIC_NAME = "topic-test";
}

1. Gửi/nhận String message

Config : class cấu hình Consumer nhận tin nhắn (String message)

public class SpringKafkaConfig {
	@Bean
	public ConsumerFactory<String, String> consumingEventStringMessage() {
		Map<String, Object> configMap = new HashMap<>();
		configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstant.KAFKA_LOCAL_SERVER_CONFIG);
		configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		configMap.put(ConsumerConfig.GROUP_ID_CONFIG, ApplicationConstant.GROUP_ID_STRING);
		return new DefaultKafkaConsumerFactory<>(configMap);
	}

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, String> listenerEventSendStringMessage() {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
		factory.setConsumerFactory(consumingEventStringMessage());
		return factory;
	}
}

KafkaProducer : class gửi tin nhắn

@RestController
@RequestMapping("/produce")
public class KafkaProducer {

	@Autowired
	private KafkaTemplate<String, Object> kafkaTemplate;

	@GetMapping("/{message}")
	public String sendMessage(@PathVariable String message) {

		try {
			kafkaTemplate.send(ApplicationConstant.TOPIC_NAME, message);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return "Message sent succuessfully";
	}
}

KafkaConsumer : class nhận tin nhắn

@Component
public class KafkaConsumer {

	private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

	@KafkaListener(groupId = ApplicationConstant.GROUP_ID_STRING, topics = ApplicationConstant.TOPIC_NAME, 
			containerFactory = "listenerEventSendStringMessage")
	public void receivedMessage(String message) {
		logger.info("Message Received using Kafka listener " + message);
	}
}

Gửi/nhận JSON message

Model

public class Student {
	private Long id;
	private String name;
	private String rollNumber;

    //getter & setter
}

Config : class cấu hình Consumer nhận tin nhắn (Student object message)

@Configuration
@EnableKafka
public class SpringKafkaConfig {

	@Bean
	public ProducerFactory<String, Object> producerFactory() {
		Map<String, Object> configMap = new HashMap<>();
		configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstant.KAFKA_LOCAL_SERVER_CONFIG);
		configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
		return new DefaultKafkaProducerFactory<String, Object>(configMap);
	}

	@Bean
	public KafkaTemplate<String, Object> kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}

	@Bean
	public ConsumerFactory<String, Student> consumingEventSendStudentMessage() {
		Map<String, Object> configMap = new HashMap<>();
		configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstant.KAFKA_LOCAL_SERVER_CONFIG);
		configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
		configMap.put(ConsumerConfig.GROUP_ID_CONFIG, ApplicationConstant.GROUP_ID_JSON);
		configMap.put(JsonDeserializer.TRUSTED_PACKAGES, "tiendv.example.model.dto");
//line-50
		return new DefaultKafkaConsumerFactory<>(configMap);
	}

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, Student> listenerEventSendStudentMessage() {
		ConcurrentKafkaListenerContainerFactory<String, Student> factory = new ConcurrentKafkaListenerContainerFactory<String, Student>();
		factory.setConsumerFactory(consumingEventSendStudentMessage());
		return factory;
	}
}

KafkaProducer : class gửi tin nhắn

@RestController
@RequestMapping("/produce")
public class KafkaProducer {

	@Autowired
	private KafkaTemplate<String, Object> kafkaTemplate;

	@PostMapping("/message")
	public String sendMessage(@RequestBody Student message) {
		try {
			kafkaTemplate.send(ApplicationConstant.TOPIC_NAME, message);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return "json message sent succuessfully";
}

KafkaConsumer : class nhận tin nhắn

@Component
public class KafkaConsumer {

	private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

	@KafkaListener(groupId = ApplicationConstant.GROUP_ID_JSON, topics = ApplicationConstant.TOPIC_NAME, containerFactory = "listenerEventSendStudentMessage")
	public void receivedMessage(Student message) throws JsonProcessingException {
		ObjectMapper mapper = new ObjectMapper();
		String jsonString = mapper.writeValueAsString(message);
		logger.info("Json message received using Kafka listener " + jsonString);
	}
}

Định nghĩa một Rest end-point để nhận message từ Kafka topic

@RestController
@RequestMapping("/consume")
public class KafkaConsumer {

	@Autowired
	private ConcurrentKafkaListenerContainerFactory<String, Student> factory;

	@GetMapping("/message")
	public List<Student> receiveMessage() {
		List<Student> students = new ArrayList<>();
		ConsumerFactory<String, Student> consumerFactory = factory.getConsumerFactory();
		Consumer<String, Student> consumer = consumerFactory.createConsumer();
		try {
			consumer.subscribe(Arrays.asList(ApplicationConstant.TOPIC_NAME));
			ConsumerRecords<String, Student> consumerRecords = consumer.poll(10000);
			Iterable<ConsumerRecord<String, Student>> records = consumerRecords.records(ApplicationConstant.TOPIC_NAME);
			Iterator<ConsumerRecord<String, Student>> iterator = records.iterator();

			while (iterator.hasNext()) {
				students.add(iterator.next().value());
			}

		} catch (Exception e) {
			e.printStackTrace();
		}
		return students;
	}
}

Tổng kết

Trên đây là hướng dẫn để mọi người biết thêm cách cấu hình gửi nhận tin nhắn với 2 loại đối tượng là String và Java object.
Hy vọng mọi người sẽ hiểu được ý tưởng tổng thể để có thể áp dụng nó vào dự án của mọi người một cách thoải mái nhất.

Mọi người có thể tìm hiểu các bài viết liên quan tại đây:

Nguồn:https://thenewstack.wordpress.com/2021/11/24/kafka-spring-boot-kafka-producer-consumer-example/

Follow me: thenewstack.wordpress.com

Nguồn: viblo.asia

Bài viết liên quan

WebP là gì? Hướng dẫn cách để chuyển hình ảnh jpg, png qua webp

WebP là gì? WebP là một định dạng ảnh hiện đại, được phát triển bởi Google

Điểm khác biệt giữa IPv4 và IPv6 là gì?

IPv4 và IPv6 là hai phiên bản của hệ thống địa chỉ Giao thức Internet (IP). IP l

Check nameservers của tên miền xem website trỏ đúng chưa

Tìm hiểu cách check nameservers của tên miền để xác định tên miền đó đang dùn

Mình đang dùng Google Domains để check tên miền hàng ngày

Từ khi thông báo dịch vụ Google Domains bỏ mác Beta, mình mới để ý và bắt đầ