Tiếp tục chuỗi bài viết về Kafka. Bài viết này mình sẽ đi vào chi tiết một ứng dụng Spring Boot config sử dụng với Kafka.
Cũng giống như các message queue truyền thống ActiveMQ, RabbitMQ, …, Kafka thường được sử dụng cho các công việc chạy nền hoặc giao tiếp giữa các dịch vụ, đặc biệt là các hệ thống theo kiến trúc Microservices ngày nay (MSA – Microservice Architecture). Chúng ta có thể sử dụng Kafka khi phải di chuyển lượng lớn dữ liệu và cần xử lý theo thời gian thực,…Nói chung là lý thuyết về kafka thì khá là nhiều thứ, mình sẽ viết ở một bài viết khác. Còn bài viết này tập trung vào việc sử dụng Spring Boot với Kafka.
Một ví dụ là khi chúng ta muốn xử lý hành vi của người dùng trên trang web để tạo ra các đề xuất về sản phẩm liên quan (ví dụ người dùng click tìm kiếm bột giặt OMO, thì trang web của chúng sẽ gửi lại thêm các đề xuất liên quan bột giặt ABA, ARIEL,…)
Cấu hình Kafka Client
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.2.RELEASE</version>
</dependency>
Sử dụng Java configuration
Để phân chia trách nhiệm, chúng ta sẽ tách việc cấu hình trong 2 class KafkaProductionerConfig
và KafkaConsumerConfig
.
Cấu hình Producer
:
@Configuration
class KafkaProducerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
ProducerFactory
chịu trách nhiệm tạo các instance Kafka Producer để gửi các object message khác nhau (trong trường hợp trên object là String, chúng ta cũng có thể custom message dạng Java object do chúng ta định nghĩa, tham khảo tại đây).KafkaTemplate
chịu trách nhiệm gửi tin nhắn đến cáctopic
tương ứng. Chúng ta sẽ tìm hiểu hơn về KafkaTemplate ở phần bên dưới.
Cấu hình Consumer
:
@Configuration
class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
ConsumerFactory
chịu trách nhiệm tạo các instance Kafka Croducer để nhận các object message khác nhau (trong trường hợp trên object là String, chúng ta cũng có thể custom message dạng Java object do chúng ta định nghĩa ở gần cuối bài viết này.ConcurrentKafkaListenerContainerFactory
tạo ra container cho các phương thức được gán annotaion@KafkaListener
.KafkaListenerContainer
sẽ nhận tất cả tin nhắn từ tất cả cáctopic
hoặcpartition
trên một luồng duy nhất. Chúng ta sẽ xem tìm hiểu chi tiết hơn về vùng chứa trình nghe thông báo trong phần thông báo tiêu thụ.
Sử dụng Spring Boot Configuration
Spring Boot thực hiện hầu hết các cấu hình một cách tự động, vì vậy chúng ta có thể tập trung vào việc xây dựng các Producer
gửi tin nhắn và các Consumer
nhận tin nhắn. Spring Boot cũng cung cấp tùy chọn ghi đè cấu hình mặc định thông qua file application.yml
hay application.properties
. Cấu hình Kafka được định nghĩa bởi các thuộc tính cấu hình với tiền tố spring.kafka. *
:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
Tạo Topic
Để Producer có thể gửi tin nhắn, topic
cần phải tồn tại trước. Dưới đây là cách cấu hình tạo ra Kafka topics:
@Configuration
class KafkaTopicConfig {
@Bean
public NewTopic topic1() {
return TopicBuilder.name("reflectoring-1").build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("reflectoring-2").build();
}
...
}
Bean KafkaAdmin
chịu trách nhiệm tạo các topic
bên trong brocker
. Với Spring Boot, một Bean KafkaAdmin
được đăng ký tự động. Đối với ứng dụng không phải Spring Boot, chúng ta phải đăng ký Bean KafkaAdmin
theo cách thủ công như sau:
@Bean
KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
return new KafkaAdmin(configs);
}
Để tạo một chủ đề (topic), chúng ta đăng ký một Bean NewTopic
cho mỗi topic vào application context. Nếu topic đã tồn tại, Bean sẽ bị bỏ qua. Chúng ta có thể sử dụng TopicBuilder
để tạo các Bean này. KafkaAdmin
cũng tăng số lượng phân vùng (partitions) nếu nó nhận thấy rằng một topic hiện có có ít phân vùng hơn NewTopic.numPartitions
.
Sending Messages
Sử dụng KafkaTemplate
KafkaTemplate
cung cấp phương pháp thuận tiện để gửi tin nhắn đến các topic:
@Component
class KafkaSenderExample {
private KafkaTemplate<String, String> kafkaTemplate;
...
@Autowired
KafkaSenderExample(KafkaTemplate<String, String> kafkaTemplate, ...) {
this.kafkaTemplate = kafkaTemplate;
...
}
void sendMessage(String message, String topicName) {
kafkaTemplate.send(topicName, message);
}
...
}
Tất cả những gì chúng ta cần làm là gọi phương thức send()
với tham số là tên topic và nội dung message. Spring Kafka cũng cho phép chúng ta định cấu hình một lời gọi không đồng bộ trả lại kết quả của việc thực thi gửi tin nhắn:
@Component
class KafkaSenderExample {
...
void sendMessageWithCallback(String message) {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic1, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOG.info("Message [{}] delivered with offset {}",
message,
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
LOG.warn("Unable to deliver message [{}]. {}",
message,
ex.getMessage());
}
});
}
}
Phương thức send()
của KafkaTemplate
trả về ListenableFuture<SendResult>
. Chúng ta có thể đăng ký ListenableFutureCallback
với listener để nhận kết quả của việc gửi và thực hiện một số công việc khác. Nếu không muốn làm việc với Futures
, chúng ta có thể đăng ký ProducerListener
để thay thế:
@Configuration
class KafkaProducerConfig {
@Bean
KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate =
new KafkaTemplate<>(producerFactory());
...
kafkaTemplate.setProducerListener(new ProducerListener<String, String>() {
@Override
public void onSuccess(
ProducerRecord<String, String> producerRecord,
RecordMetadata recordMetadata) {
LOG.info("ACK from ProducerListener message: {} offset: {}",
producerRecord.value(),
recordMetadata.offset());
}
});
return kafkaTemplate;
}
}
Việc chúng ta cấu hình KafkaTemplat
bằng ProducerListener
cũng cho phép chúng ta implements các phương thức onSuccess()
và onError()
.
Sử dụng RoutingKafkaTemplate
Chúng ta có thể sử dụng RoutingKafkaTemplate
khi chúng ta có nhiều Producer
với các cấu hình khác nhau và chúng ta muốn chọn Producer
trong thời gian chạy dựa trên chủ đề (topic).
@Configuration
class KafkaProducerConfig {
...
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) {
// ProducerFactory with Bytes serializer
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF =
new DefaultKafkaProducerFactory<>(props);
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
// ProducerFactory with String serializer
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
DefaultKafkaProducerFactory<Object, Object> stringPF =
new DefaultKafkaProducerFactory<>(props);
context.registerBean(DefaultKafkaProducerFactory.class, "stringPF", stringPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile(".*-bytes"), bytesPF);
map.put(Pattern.compile("reflectoring-.*"), stringPF);
return new RoutingKafkaTemplate(map);
}
...
}
RoutingKafkaTemplate
sử dụng một Map của các phần tử java.util.regex.Pattern
và ProducerFactory
để định tuyến các mesage đến ProducerFactory
phù hợp đầu tiên với tên topic nhất định. Nếu chúng ta có hai pattern ref.*
Và reflectoring-.*
, Thì pattern reflectoring-.*
phải đặt ở trước vì pattern ref.*
sẽ ghi đè nó.
Trong ví dụ trên, chúng ta đã tạo ra hai pattern .*-byte
và reflectoring-.*
. Tên topic kết thúc bằng -byte
và bắt đầu bằng reflectoring
sẽ sử dụng ByteArraySerializer
và StringSerializer
tương ứng khi chúng ta sử dụng phiên bản RoutingKafkaTemplate
.
Consuming Messages
Message Listener
KafkaMessageListenerContainer
sẽ nhận tất cả tin nhắn (messages) từ tất cả các chủ đề (topics) trên một thread duy nhất.
ConcurrentMessageListenerContainer
chỉ định các message này cho nhiều instance KafkaMessageListenerContainer
để cung cấp khả năng đa luồng.
Sử dụng @KafkaListener
tại level method
Annotaion @KafkaListener
cho phép chúng ta tạo listener (mình gọi nó là trình lắng nghe), là nơi subscribe message đến từ topic:
@Component
class KafkaListenersExample {
Logger LOG = LoggerFactory.getLogger(KafkaListenersExample.class);
@KafkaListener(topics = "reflectoring-1", containerFactory = "kafkaListenerContainerFactory")
void listener(String data) {
LOG.info(data);
}
@KafkaListener(
topics = "reflectoring-1, reflectoring-2",
groupId = "reflectoring-group-2")
void commonListenerForMultipleTopics(String message) {
LOG.info("MultipleTopicListener - {}", message);
}
}
Để sử dụng annotation này, chúng ta nên thêm annotaion @EnableKafka
vào một class bất kỳ được gán annotation @Configuration
. Ngoài ra, nó cũng yêu cầu cấu hình các trình lắng nghe cho mỗi loại message (trong ví dụ này listener kafkaListenerContainerFactory
được cấu hình trong class KafkaconsumerConfig
ở phía trên cho loại message String, chúng ta có thể cấu hình gửi nhận message dạng Java object thay vì String trong bài viết này).
Chúng ta cũng có thể chỉ định nhiều topic cho trình lắng nghe bằng cách sử dụng thuộc tính topics
như trên.
Sử dụng @KafkaListener
tại level class
Nếu sử dụng @KafkaListener
ở level class thì chúng ta cần chỉ định @KafkaHandler
tại level method.
@Component
@KafkaListener(id = "class-level", topics = "reflectoring-3")
class KafkaClassListener {
...
@KafkaHandler
void listen(String message) {
LOG.info("KafkaHandler[String] {}", message);
}
@KafkaHandler(isDefault = true)
void listenDefault(Object object) {
LOG.info("KafkaHandler[Default] {}", object);
}
}
Khi trình lắng nghe nhận được message, nó sẽ chuyển đổi chúng thành các kiểu dữ liệu đích trong các phương thức để tìm ra phương thức nào cần gọi. Trong ví dụ, các message kiểu String sẽ được nhận bởi listening()
và kiểu Object sẽ được nhận bởi listenerDefault()
. Bất cứ khi nào không có kết quả phù hợp, trình xử lý mặc định (được định nghĩa bởi isDefault = true
) sẽ được gọi.
Nhận messages từ partition cụ thể với Offset
Chúng ta có thể cấu hình trình lắng nghe để subscribe từ nhiều chủ đề (topic), phân vùng (partitions) và vị trí đọc message cụ thể. Ví dụ: nếu chúng ta muốn nhận tất cả các message được gửi đến một topic từ thời điểm tạo topic khi khởi động ứng dụng, chúng ta có thể đặt giá trị offset
là 0:
@Component
class KafkaListenersExample {
...
@KafkaListener(
groupId = "reflectoring-group-3",
topicPartitions = @TopicPartition(
topic = "reflectoring-1",
partitionOffsets = { @PartitionOffset(
partition = "0",
initialOffset = "0") }))
void listenToPartitionWithOffset(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) int offset) {
LOG.info("Received message [{}] from partition-{} with offset-{}",
message,
partition,
offset);
}
}
Vì chúng ta đã chỉ định initialOffset = "0"
, chúng ta sẽ nhận được tất cả các message bắt đầu từ offset 0
mỗi khi chúng ta khởi động lại ứng dụng. Chúng ta cũng có thể truy xuất metadata hữu ích của message bằng cách sử dụng annotation @Header()
.
Filter messages
Spring cung cấp một cách thức để lọc các message trước khi chúng đến trình lắng nghe:
class KafkaConsumerConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(record ->
record.value().contains("ignored"));
return factory;
}
}
Spring bọc trình lắng nghe bằng FilteringMessageListenerAdapter
. Chúng ta sẽ gọi phương thức factory.setRecordFilterStrategy()
để triển khia điều kiện lọc. Message phù hợp với bộ lọc sẽ bị loại bỏ trước khi đến trình lắng nghe (cụ thể là đến phương thức được gắn annotation @KafkaListener
). Trong ví dụ trên, chúng ta đã thêm một bộ lọc để loại bỏ các message có từ “ignored”.
Reply, Forwarding Listener Results với @SendTo
Spring cho phép Forwarding giá trị trả về bên trong phương thức được gán annotation @KafkaLisstener
với chỉ định annotation @SendTo
đến một topic:
@Component
class KafkaListenersExample {
...
@KafkaListener(topics = "reflectoring-others")
@SendTo("reflectoring-1")
String listenAndReply(String message) {
LOG.info("ListenAndReply [{}]", message);
return "This is a reply sent after receiving message";
}
}
Ví dụ về @SendTo
mọi người có thể xem tại bài viết Spring Kafka – Forwarding Listener Results using @SendTo.
Spring boot sẽ đưa cho chúng ta một cấu hình mặc định Reply Template. Chúng ta có thể cấu hình override Reply Template này trong khi cấu hình Listener Container Factory bằng cách sử dụng phương thức setReplyTemplate()
như dưới đây:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> listenerEventSendStringMessage() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumingEventSendStringMessage());
factory.setReplyTemplate(customKafkaTemplate); // customKafkaTemplate là một KafkaTemplate khác
return factory;
}
Custom Messages
Bây giờ chúng ta hãy xem cách gửi/nhận một đối tượng Java. Chúng tôi sẽ gửi và nhận các đối tượng User như trong ví dụ dưới đây:
class User {
private String name;
...
}
Để gửi/nhận Java object, chúng ta phải định cấu hình Producer và Consumer sử dụng JSON serializer
và JSON deserializer
:
@Configuration
class KafkaProducerConfig {
...
@Bean
public ProducerFactory<String, User> userProducerFactory() {
...
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, User> userKafkaTemplate() {
return new KafkaTemplate<>(userProducerFactory());
}
}
@Configuration
class KafkaConsumerConfig {
...
public ConsumerFactory<String, User> userConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "reflectoring-user");
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(User.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(userConsumerFactory());
return factory;
}
...
}
Spring Kafka cung cấp các implementations JsonSerializer
và JsonDeserializer
dựa trên Jackson JSON object mapper. Nó cho phép chúng ta chuyển đổi bất kỳ đối tượng Java nào thành byte []. Trong ví dụ trên, chúng ta tạo thêm một ConcurrentKafkaListenerContainerFactory
để thực hiện JSON serialization
.
Chúng ta cũng tạo một Listener Container (vùng chứa trình lắng nghe message) riêng biệt userKafkaListenerContainerFactory()
để xử lý cho đối tượng User
(Java object). Nếu chúng ta có nhiều kiểu đối tượng Java cần serialized/deserialized, chúng ta phải tạo một Listener Container (vùng chứa trình lắng nghe) cho mỗi kiểu đối tượng đó.
Sending Java Objects
@Component
class KafkaSenderExample {
...
@Autowired
private KafkaTemplate<String, User> userKafkaTemplate;
void sendCustomMessage(User user, String topicName) {
userKafkaTemplate.send(topicName, user);
}
...
}
Receiving Java Objects
@Component
class KafkaListenersExample {
@KafkaListener(
topics = "reflectoring-user",
groupId="reflectoring-user",
containerFactory="userKafkaListenerContainerFactory")
void listener(User user) {
LOG.info("CustomUserListener [{}]", user);
}
}
userKafkaListenerContainerFactory
chính là tên Listener Container (vùng chứa trình lắng nghe cho message có kiểu dữ liệu User).
Nếu chúng ta không chỉ định thuộc tính containerFactory
, nó sẽ mặc định là kafkaListenerContainerFactory
sử dụng StringSerializer
và StringDeserializer
trong trường hợp này của bài viết này.
Tổng kết
Trên đây là hướng dẫn để mọi người hiểu hơn về cách cấu hình Spring Boot Kafka. Hy vọng mọi người sẽ hiểu được ý tưởng tổng thể để có thể áp dụng nó vào dự án của mọi người một cách thoải mái nhất.
Mọi người có thể tìm hiểu các bài viết liên quan tại đây:
Nguồn:https://thenewstack.wordpress.com/2021/11/24/kafka-spring-boot-kafka-in-depth/
Follow me: thenewstack.wordpress.com
Nguồn: viblo.asia