009: Kafka Consumer client API với Java

Bài viết nằm trong series Apache Kafka từ zero đến one. Tiếp tục series, cùng practice với Kafka consumer với bài toán đơn giản. 1) Scenario Thực tế, một hệ thống với Kafka có thể hoạt động với mô hình dưới đây: Event generating service: là service produce message tới Kafka. Data validation service: consume

Bài viết nằm trong series Apache Kafka từ zero đến one.

Tiếp tục series, cùng practice với Kafka consumer với bài toán đơn giản.

1) Scenario

Thực tế, một hệ thống với Kafka có thể hoạt động với mô hình dưới đây:

  • Event generating service: là service produce message tới Kafka.
  • Data validation service: consume message, thực hiện validate, dựa trên result để produce message tương ứng tới topic.
  • Data reconciliation application: nếu message invalid, DRA consume message, tiến hành correct message và produce valid message với Kafka.
  • Other service: cuối cùng message valid được đẩy tới next service để xử lý các business logic phù hợp với yêu cầu.

Với bài toán trên, ta sử dụng Data validation service làm ví dụ cho phần consumer:

  • Đầu vào là các hóa đơn, producer produce data đến invoice-topic.
  • Consumer consume message từ invoice-topic và xử lý, nếu valid sẽ chuyển tiếp đến valid-invoice-topic, ngược lại produce đến invalid-invoice-topic.

2) Practice

Có bài toán rồi, quẩy thôi. Thêm dependency Kafka client và Jackson để xử lý JSON data:

Với Maven:

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.32</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.12.5</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version></dependency></dependencies>

Gradle:

dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.8.0'
    implementation 'org.slf4j:slf4j-simple:1.7.32'
    implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.5'}

2.1) Custom serializer với JSON:

Với bài toán này, cùng practice tạo một custome serializer/deserializer với JSON type như sau.

Serializer:

publicclassJsonSerializer<T>implementsSerializer<T>{privatefinalObjectMapper objectMapper =newObjectMapper();@Overridepublicbyte[]serialize(String topic,T data){if(data ==null){returnnull;}try{return objectMapper.writeValueAsBytes(data);}catch(Exception e){thrownewSerializationException("Error serializing JSON message", e);}}}

Deserializer:

publicclassJsonDeserializer<T>implementsDeserializer<T>{privatefinalObjectMapper objectMapper =newObjectMapper();privateClass<T> className;publicstaticfinalString KEY_CLASS_NAME_CONFIG ="key.class.name";publicstaticfinalString VALUE_CLASS_NAME_CONFIG ="value.class.name";@SuppressWarnings("unchecked")@Overridepublicvoidconfigure(Map<String,?> props,boolean isKey){if(isKey){
            className =(Class<T>) props.get(KEY_CLASS_NAME_CONFIG);return;}
        className =(Class<T>) props.get(VALUE_CLASS_NAME_CONFIG);}@OverridepublicTdeserialize(String topic,byte[] data){if(data ==null){returnnull;}try{return objectMapper.readValue(data, className);}catch(Exception e){thrownewSerializationException(e);}}}

2.2) Tạo Invoice POJO

@Data@Builder@NoArgsConstructor@AllArgsConstructor@JsonInclude(JsonInclude.Include.NON_NULL)publicclassInvoice{privateString invoiceNumber;privateString storeId;privatelong created;privatedouble totalAmount;privateboolean valid;}

2.3) Consumer

Tạo consumer và produce, sau đó consume message và produce đến topic tương ứng. Để đơn giản, chúng ta sử dụng luôn field valid để check message:

@Slf4jpublicclassValidatorConsumer{@SuppressWarnings("InfiniteLoopStatement")publicstaticvoidmain(String[] args){finalvar consumerProps =newProperties();
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG,"validation-consumer");
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);
        consumerProps.put(JsonDeserializer.VALUE_CLASS_NAME_CONFIG,Invoice.class);finalvar consumer =newKafkaConsumer<String,Invoice>(consumerProps);
        consumer.subscribe(List.of("invoice-topic"));finalvar producerProps =newProperties();
        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG,"validation-producer");
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);finalvar producer =newKafkaProducer<>(producerProps);while(true){finalvar records = consumer.poll(Duration.ofMillis(100));
            records.forEach(r ->{if(!r.value().isValid()){
                    producer.send(newProducerRecord<>("invalid-invoice-topic", r.value().getStoreId(), r.value()));
                    log.info("Invalid record - {}", r.value().getInvoiceNumber());return;}
                producer.send(newProducerRecord<>("valid-invoice-topic", r.value().getStoreId(), r.value()));
                log.info("Valid record - {}", r.value().getInvoiceNumber());});}}}

2.4) Producer

Cuối cùng, tạo producer để produce message đến invoice-topic:

publicclassInvoiceProducer{publicstaticvoidmain(String[] args){finalvar random =newRandom();finalvar props =newProperties();
        props.put(ProducerConfig.CLIENT_ID_CONFIG,"producer");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);try(var producer =newKafkaProducer<String,Invoice>(props)){IntStream.range(0,1000).parallel().forEach(i ->{finalvar invoice =Invoice.builder().invoiceNumber(String.format("%05d", i)).storeId(i %5+"").created(System.currentTimeMillis()).valid(random.nextBoolean()).build();
                        producer.send(newProducerRecord<>("invoice-topic", invoice));});}}}

2.5) Tạo topic

Tạo invoice-topic với 2 partitions:

$ kafka-topics.sh 
    --bootstrap-server localhost:9092,localhost:9093 
    --partitions 2
    --replication-factor 3
    --topic invoice-topic 
    --create

Sau đó start ProducerConsumer và tận hưởng thành quả thôi.

3) Scale Kafka consumer với group.id

Thực tế, các bài toán không chỉ đơn thuần check choác như trên mà phức tạp hơn nhiều. Như vậy, trong trường hợp có rất nhiều producer nhưng chỉ có một consumer thì không ổn. Lúc này consumer trở thành bottle-neck của hệ thống, khiến cho application không còn tính chất real-time.

Trong trường hợp này, cách thông dụng nhất là scale consumer để handle đồng thời nhiều message hơn bằng cách sử dụng consumer group đã giới thiệu ở bài trước.

consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"validator-group");

Sau khi thêm properties, start 2 consumer và tiến hành produce message chúng ta sẽ thấy các message được route đến cả 2 consumer.

Nếu thêm 1 consumer nữa, lúc này có 2 partition và 3 consumer, tuy nhiên chỉ 2 consumer nhận được message. Nếu chưa hiểu nguyên nhân có thể đọc lại phần trước về consumer group.

Download source code here.

Reference

Reference in series https://viblo.asia/s/apache-kafka-tu-zero-den-one-aGK7jPbA5j2

After credit

Như vậy chúng ta đã đi qua toàn bộ về Apache Kafka core concept & practice. Các bài toán trong thực tế đôi khi sẽ phức tạp hơn nhiều lần. Ví dụ:

  • Read data từ database và produce đến Kafka.
  • Aggregate data từ 2 topic sau đó mới thực hiện business logic.

Chỉ sử dụng core concept với producer/consumer đơn giản như trên thì cũng được, mỗi tội cần thêm hơi nhiều code và phải đảm bảo fault tolerance. Do vậy, Kafka sinh ra thêm các concept khác để xử lý từng bài toán cụ thể khác nhau:

  • Kafka connect concept.
  • Kafka stream concept.
  • Kafka SQL concept.

Cùng đón chờ trong các bài viết tiếp theo.

© Dat Bui

Nguồn: viblo.asia

Bài viết liên quan

WebP là gì? Hướng dẫn cách để chuyển hình ảnh jpg, png qua webp

WebP là gì? WebP là một định dạng ảnh hiện đại, được phát triển bởi Google

Điểm khác biệt giữa IPv4 và IPv6 là gì?

IPv4 và IPv6 là hai phiên bản của hệ thống địa chỉ Giao thức Internet (IP). IP l

Check nameservers của tên miền xem website trỏ đúng chưa

Tìm hiểu cách check nameservers của tên miền để xác định tên miền đó đang dùn

Mình đang dùng Google Domains để check tên miền hàng ngày

Từ khi thông báo dịch vụ Google Domains bỏ mác Beta, mình mới để ý và bắt đầ