Sử dụng Apache Kafka với Quarkus

Kafka là gì Kafka là gì? – Đó là hệ thống message pub/sub phân tán (distributed messaging system). Bên pulbic dữ liệu được gọi là producer, bên subscribe nhận dữ liệu theo topic được gọi là consumer. Kafka có khả năng truyền một lượng lớn message theo thời gian thực, trong trường hợp bên nhận

Kafka là gì

Kafka là gì? – Đó là hệ thống message pub/sub phân tán (distributed messaging system). Bên pulbic dữ liệu được gọi là producer, bên subscribe nhận dữ liệu theo topic được gọi là consumer. Kafka có khả năng truyền một lượng lớn message theo thời gian thực, trong trường hợp bên nhận chưa nhận message vẫn được lưu trữ sao lưu trên một hàng đợi và cả trên ổ đĩa bảo đảm an toàn. Đồng thời nó cũng được replicate trong cluster giúp phòng tránh mất dữ liệu.

Screen Shot 2022-03-21 at 23.05.07.png

Ở bài viết này mình sẽ trình bày cách sử dụng Apache Kafka trong Quarkus

Yêu cầu

  • JDK 1.8+ installed with JAVA_HOME configured appropriately
  • Apache Maven 3.6.2+
  • Docker Compose to start a development cluster

Docker

Đây là image mà mình sử dụng để start kafka cho bài viết này.

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - kafka

  kafka:
    image: confluentinc/cp-enterprise-kafka:6.0.0
    hostname: kafka
    restart: "always"
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: |
         PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: |
         PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    networks:
      - kafka

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    restart: "no"
    ports:
      - "9080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: "kafka"
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka:9092"
    networks:
      - kafka

networks:
  kafka: 
    external:
      name: kafka

Tạo Project

Tạo project ở trang chủ của Quarkus tại đây, nhập group id và artifact id.
Bạn tìm và thêm 2 extension là:

  • SmallRye Reactive Messaging – Kafka Connector
  • RESTEasy Jackson

Screen Shot 2022-03-21 at 23.16.04.png

Click Generate your application để tải xuống , sau đó unzip và mở bằng IDE yêu thích của bạn.

Tại file pom.xml bạn sẽ thấy 2 extension quarkus-smallrye-reactive-messaging-kafkaquarkus-resteasy-jackson đã được thêm vào.

Hmm ngoài lề một chút, dạo này giá xăng hiện tại tăng chóng mặt nên mình sẽ viết ứng dụng gửi message kèm nâng giá xăng lên nhé 😄

Tạo Class Gas pojo như sau:

package practice.kafka;

public class Gas {
    public String name;
    public double price;

    public Gas() {
    }

    public Gas(String name, double price) {
        this.name = name;
        this.price = price;
    }
}

Tạo thêm một Class GasProcessor để nâng giá xăng như sau:

package practice.kafka;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GasProcessor {

    private static final double CONVERSION_RATE = 1.5;

    public Gas process(Gas gas) {
        gas.price = gas.price * CONVERSION_RATE;
        return gas;
    }
}

Chúng ta nhận vào giá gốc sau đó thay đổi giá cả và gửi chúng lại cho Kafka.
Để làm được chúng ta sẽ cần cài đặt JSON serialization.

Với extension quarkus-resteasy-jackson cung cấp cho chúng ta ObjectMapperSerializer sử dụng cho việc serialize tất cả các pojo thông qua Jackson,
nhưng đối với deserializer thì Generic, nên cần phải phân lớp.

Để deserializer cần tạo Class với tên là GasDeserializer kế thừa từ ObjectMapperDeserializer.

package practice.kafka;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class GasDeserializer extends ObjectMapperDeserializer<Gas> {
    public GasDeserializer(){
        super(Gas.class);
    }
}

Cấu hình để sử dụng Jackson serializer, deserializer và kết nối máy chủ Kafka trong file application.properties

kafka.bootstrap.servers=http://localhost:29092

# Configure the Kafka source (we read from it)
mp.messaging.incoming.gas-in.connector=smallrye-kafka
mp.messaging.incoming.gas-in.topic=gas
mp.messaging.incoming.gas-in.value.deserializer=practice.kafka.GasDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.gas-out.connector=smallrye-kafka
mp.messaging.outgoing.gas-out.topic=gas
mp.messaging.outgoing.gas-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Mình sẽ tạo thêm một Class API ví như Producer để tạo ra messages gửi lên cho Kafka.

package practice.kafka;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.concurrent.CompletionStage;

@Path("gas")
@ApplicationScoped
public class GasResource {

    @Inject
    GasProcessor gasProcessor;

    @Inject
    @Channel("gas-out")
    Emitter<Gas> gasEmitter;

    @POST
    @Produces(MediaType.APPLICATION_JSON)
    @Consumes(MediaType.APPLICATION_JSON)
    public Response increasePrice(Gas gas){
        gas = gasProcessor.process(gas);
        CompletionStage<Void> ack = gasEmitter.send(gas);
        return Response.ok().entity(ack).build();
    }
}

Ở đây mình sẽ sử dụng annotation @Channel để khai báo đầu ra của message.
Sử dụng Emitter để gửi message lên máy chủ Kafka.

Sau khi đã có producer gửi message, thì tiếp theo mình cũng sẽ tạo thêm một Class Consume để nhận message từ Kafka.

package practice.kafka;

import org.eclipse.microprofile.reactive.messaging.Incoming;

public class GasConsume {

    @Incoming("gas-in")
    public void outputGas(Gas gas){
        System.out.printf("Gas price has been increased to %s",gas.price);
    }
}

Với annotation @Incoming khai báo đầu nhận message từ Kafka.

À để sử dụng API dễ dàng hơn thì ta sử dụng swagger, bạn copy bỏ vào file pom.xml nhé:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>

Chạy câu lệnh để cài đặt extension trong file pom.xml

mvn clean install

Để khởi động ứng dụng

mvn quarkus:dev

Gọi thử API gas

curl -X 'POST' 
  'http://localhost:8080/gas' 
  -H 'accept: */*' 
  -H 'Content-Type: application/json' 
  -d '{
  "name": "Xăng 95",
  "price": 18000
}'

Và giá xăng đã tăng gấp rưỡi từ 18000 thành 27000 😦

Screen Shot 2022-03-22 at 01.10.25.png

Kết Luận

Vậy với bài tóm tắt trên, các bạn có thể sử dụng Apache Kafka với Quarkus một cách cơ bản và nhanh chóng.

Nếu có thể bài sau mình sẽ trình bày cách sử dụng Avro Schema Registry với Kafka và Quarkus.

Cảm ơn mọi người, chúc mọi người nhiều sức khoẻ ❤️

References

https://quarkus.io/guides/kafka

Nguồn: viblo.asia

Bài viết liên quan

WebP là gì? Hướng dẫn cách để chuyển hình ảnh jpg, png qua webp

WebP là gì? WebP là một định dạng ảnh hiện đại, được phát triển bởi Google

Điểm khác biệt giữa IPv4 và IPv6 là gì?

IPv4 và IPv6 là hai phiên bản của hệ thống địa chỉ Giao thức Internet (IP). IP l

Check nameservers của tên miền xem website trỏ đúng chưa

Tìm hiểu cách check nameservers của tên miền để xác định tên miền đó đang dùn

Mình đang dùng Google Domains để check tên miền hàng ngày

Từ khi thông báo dịch vụ Google Domains bỏ mác Beta, mình mới để ý và bắt đầ