009: Kafka Consumer client API với Java

Bài viết nằm trong series Apache Kafka từ zero đến one. Tiếp tục series, cùng practice với Kafka consumer với bài toán đơn giản. 1) Scenario Thực tế, một hệ thống với Kafka có thể hoạt động với mô hình dưới đây: Event generating service: là service produce message tới Kafka. Data validation service: consume

Bài viết nằm trong series Apache Kafka từ zero đến one.

Tiếp tục series, cùng practice với Kafka consumer với bài toán đơn giản.

1) Scenario

Thực tế, một hệ thống với Kafka có thể hoạt động với mô hình dưới đây:

  • Event generating service: là service produce message tới Kafka.
  • Data validation service: consume message, thực hiện validate, dựa trên result để produce message tương ứng tới topic.
  • Data reconciliation application: nếu message invalid, DRA consume message, tiến hành correct message và produce valid message với Kafka.
  • Other service: cuối cùng message valid được đẩy tới next service để xử lý các business logic phù hợp với yêu cầu.

Với bài toán trên, ta sử dụng Data validation service làm ví dụ cho phần consumer:

  • Đầu vào là các hóa đơn, producer produce data đến invoice-topic.
  • Consumer consume message từ invoice-topic và xử lý, nếu valid sẽ chuyển tiếp đến valid-invoice-topic, ngược lại produce đến invalid-invoice-topic.

2) Practice

Có bài toán rồi, quẩy thôi. Thêm dependency Kafka client và Jackson để xử lý JSON data:

Với Maven:

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.32</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.12.5</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version></dependency></dependencies>

Gradle:

dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.8.0'
    implementation 'org.slf4j:slf4j-simple:1.7.32'
    implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.5'}

2.1) Custom serializer với JSON:

Với bài toán này, cùng practice tạo một custome serializer/deserializer với JSON type như sau.

Serializer:

publicclassJsonSerializer<T>implementsSerializer<T>{privatefinalObjectMapper objectMapper =newObjectMapper();@Overridepublicbyte[]serialize(String topic,T data){if(data ==null){returnnull;}try{return objectMapper.writeValueAsBytes(data);}catch(Exception e){thrownewSerializationException("Error serializing JSON message", e);}}}

Deserializer:

publicclassJsonDeserializer<T>implementsDeserializer<T>{privatefinalObjectMapper objectMapper =newObjectMapper();privateClass<T> className;publicstaticfinalString KEY_CLASS_NAME_CONFIG ="key.class.name";publicstaticfinalString VALUE_CLASS_NAME_CONFIG ="value.class.name";@SuppressWarnings("unchecked")@Overridepublicvoidconfigure(Map<String,?> props,boolean isKey){if(isKey){
            className =(Class<T>) props.get(KEY_CLASS_NAME_CONFIG);return;}
        className =(Class<T>) props.get(VALUE_CLASS_NAME_CONFIG);}@OverridepublicTdeserialize(String topic,byte[] data){if(data ==null){returnnull;}try{return objectMapper.readValue(data, className);}catch(Exception e){thrownewSerializationException(e);}}}

2.2) Tạo Invoice POJO

@Data@Builder@NoArgsConstructor@AllArgsConstructor@JsonInclude(JsonInclude.Include.NON_NULL)publicclassInvoice{privateString invoiceNumber;privateString storeId;privatelong created;privatedouble totalAmount;privateboolean valid;}

2.3) Consumer

Tạo consumer và produce, sau đó consume message và produce đến topic tương ứng. Để đơn giản, chúng ta sử dụng luôn field valid để check message:

@Slf4jpublicclassValidatorConsumer{@SuppressWarnings("InfiniteLoopStatement")publicstaticvoidmain(String[] args){finalvar consumerProps =newProperties();
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG,"validation-consumer");
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);
        consumerProps.put(JsonDeserializer.VALUE_CLASS_NAME_CONFIG,Invoice.class);finalvar consumer =newKafkaConsumer<String,Invoice>(consumerProps);
        consumer.subscribe(List.of("invoice-topic"));finalvar producerProps =newProperties();
        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG,"validation-producer");
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);finalvar producer =newKafkaProducer<>(producerProps);while(true){finalvar records = consumer.poll(Duration.ofMillis(100));
            records.forEach(r ->{if(!r.value().isValid()){
                    producer.send(newProducerRecord<>("invalid-invoice-topic", r.value().getStoreId(), r.value()));
                    log.info("Invalid record - {}", r.value().getInvoiceNumber());return;}
                producer.send(newProducerRecord<>("valid-invoice-topic", r.value().getStoreId(), r.value()));
                log.info("Valid record - {}", r.value().getInvoiceNumber());});}}}

2.4) Producer

Cuối cùng, tạo producer để produce message đến invoice-topic:

publicclassInvoiceProducer{publicstaticvoidmain(String[] args){finalvar random =newRandom();finalvar props =newProperties();
        props.put(ProducerConfig.CLIENT_ID_CONFIG,"producer");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);try(var producer =newKafkaProducer<String,Invoice>(props)){IntStream.range(0,1000).parallel().forEach(i ->{finalvar invoice =Invoice.builder().invoiceNumber(String.format("%05d", i)).storeId(i %5+"").created(System.currentTimeMillis()).valid(random.nextBoolean()).build();
                        producer.send(newProducerRecord<>("invoice-topic", invoice));});}}}

2.5) Tạo topic

Tạo invoice-topic với 2 partitions:

$ kafka-topics.sh 
    --bootstrap-server localhost:9092,localhost:9093 
    --partitions 2
    --replication-factor 3
    --topic invoice-topic 
    --create

Sau đó start ProducerConsumer và tận hưởng thành quả thôi.

3) Scale Kafka consumer với group.id

Thực tế, các bài toán không chỉ đơn thuần check choác như trên mà phức tạp hơn nhiều. Như vậy, trong trường hợp có rất nhiều producer nhưng chỉ có một consumer thì không ổn. Lúc này consumer trở thành bottle-neck của hệ thống, khiến cho application không còn tính chất real-time.

Trong trường hợp này, cách thông dụng nhất là scale consumer để handle đồng thời nhiều message hơn bằng cách sử dụng consumer group đã giới thiệu ở bài trước.

consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"validator-group");

Sau khi thêm properties, start 2 consumer và tiến hành produce message chúng ta sẽ thấy các message được route đến cả 2 consumer.

Nếu thêm 1 consumer nữa, lúc này có 2 partition và 3 consumer, tuy nhiên chỉ 2 consumer nhận được message. Nếu chưa hiểu nguyên nhân có thể đọc lại phần trước về consumer group.

Download source code here.

Reference

Reference in series https://viblo.asia/s/apache-kafka-tu-zero-den-one-aGK7jPbA5j2

After credit

Như vậy chúng ta đã đi qua toàn bộ về Apache Kafka core concept & practice. Các bài toán trong thực tế đôi khi sẽ phức tạp hơn nhiều lần. Ví dụ:

  • Read data từ database và produce đến Kafka.
  • Aggregate data từ 2 topic sau đó mới thực hiện business logic.

Chỉ sử dụng core concept với producer/consumer đơn giản như trên thì cũng được, mỗi tội cần thêm hơi nhiều code và phải đảm bảo fault tolerance. Do vậy, Kafka sinh ra thêm các concept khác để xử lý từng bài toán cụ thể khác nhau:

  • Kafka connect concept.
  • Kafka stream concept.
  • Kafka SQL concept.

Cùng đón chờ trong các bài viết tiếp theo.

© Dat Bui

Nguồn: viblo.asia

Bài viết liên quan

Thay đổi Package Name của Android Studio dể dàng với plugin APR

Nếu bạn đang gặp khó khăn hoặc bế tắc trong việc thay đổi package name trong And

Lỗi không Update Meta_Value Khi thay thế hình ảnh cũ bằng hình ảnh mới trong WordPress

Mã dưới đây hoạt động tốt có 1 lỗi không update được postmeta ” meta_key=

Bài 1 – React Native DevOps các khái niệm và các cài đặt căn bản

Hướng dẫn setup jenkins agent để bắt đầu build mobile bằng jenkins cho devloper an t

Chuyển đổi từ monolith sang microservices qua ví dụ

1. Why microservices? Microservices là kiến trúc hệ thống phần mềm hướng dịch vụ,