[Redis] – Redis Stream With Spring Boot

Trong bài viết này, chúng ta sẽ cùng tìm hiểu về Redis Stream với Spring Boot để demo cách thực hiện xử lý luồng thời gian thực. Redis ban đầu được biết đến như một Remote Dictionary Server được sử dụng để lưu thông tin vào bộ nhớ đệm (caching). Cùng với tính năng Master/ReadReplication

Trong bài viết này, chúng ta sẽ cùng tìm hiểu về Redis Stream với Spring Boot để demo cách thực hiện xử lý luồng thời gian thực.

Redis ban đầu được biết đến như một Remote Dictionary Server được sử dụng để lưu thông tin vào bộ nhớ đệm (caching). Cùng với tính năng Master/ReadReplication & Pub/Sub, hiện tại Redis cũng đã hỗ trợ thêm tính năng Stream.

Một số bài viết cùng chủ đề mọi người có thể sẽ quan tâm:

Trước đây, chúng ta thường thu thập dữ liệu, lưu trữ trong cơ sở dữ liệu và xử lý dữ liệu hàng đêm. Nó được gọi là xử lý batch processing! Trong kỷ nguyên Microservices này, chúng ta nhận được luồng dữ liệu liên tục, đôi khi việc trì hoãn việc xử lý dữ liệu này có thể gây ảnh hưởng nghiêm trọng đến hoạt động kinh doanh của chúng ta. Ví dụ: Hãy xem xét một ứng dụng như Netflix / YouTube. Dựa trên bộ phim hoặc video chúng ta xem, các ứng dụng này hiển thị các đề xuất về những video hay bộ phim cùng chủ đề chúng ta đang xem ngay lập tức. Nó cung cấp trải nghiệm người dùng tốt hơn nhiều và giúp cho việc kinh doanh từ đó tốt hơn.

Stream processing là quá trình xử lý dữ liệu liên tục theo thời gian thực. Hãy xem cách chúng ta có thể xử lý luồng dữ liệu theo thời gian thực bằng cách sử dụng Redis Stream với Spring Boot.

Chúng ta cũng đã thảo luận về việc xử lý luồng dữ liệu thời gian thực bằng Apache Kafka. Apache Kafka đã hoạt động được 10 năm trong khi Redis còn khá mới trong lĩnh vực này. Một số tính năng của Redis Streams dường như được lấy cảm hứng từ Apache Kafka. Vấn đề với Kafka là nó rất khó cấu hình. Việc bảo trì cơ sở hạ tầng là rất khó khăn. Nhưng với Redis thì dễ dàng hơn.

Stream vs Pub/Sub

Sẽ có ai đó đặt câu hỏi rằng có cần hỗ trợ Stream nữa không khi chúng ta đã có tính năng Pub/Sub? Câu trả lời là Pub/Sub không giống như Stream, stream không phải là một giải pháp thay thế cho Pub/Sub.

Redis Stream với Spring Boot

  • Publisher sẽ đẩy một số sự kiện liên quan đến các giao dịch mua bán vào Redis. Hãy gọi chúng là purchase-events stream.
  • Một group-consumer quan tâm sẽ lắng nghe đến những sự kiện đó. Điều này có thể là để tính toán doanh thu hoặc xử lý thanh toán hoặc gửi email. Khi bạn cần thực hiện tất cả những điều này thì chúng ta cần một group-consumer riêng cho mỗi hành động.
  • Consumer sẽ sử dụng các sự kiện và họ có thể làm bất cứ điều gì với nó. Trong trường hợp của chúng ta, chúng ta chỉ tìm giá mà người dùng đã trả và tính toán doanh thu theo danh mục loại sản phẩm. Để mọi thứ đơn giản, chúng ta sẽ ghi thông tin này dưới dạng SortedSet trong Redis.

Sample Application

Product Category

public enum Category {
    APPLIANCES,
    BOOKS,
    COSMETICS,
    ELECTRONICS,
    OUTDOOR;
}

Product – DTO

@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Product {

    private String name;
    private double price;
    private Category category;

}

Redis Stream – Producer

Producer apllication sẽ tiếp tục đẩy sự kiện PurchaseEvents được định cấu hình định kỳ thông qua publish.rate.

@Service
public class PurchaseEventProducer {

    private AtomicInteger atomicInteger = new AtomicInteger(0);

    @Value("${stream.key}")
    private String streamKey;

    @Autowired
    private ProductRepository repository;

    @Autowired
    private ReactiveRedisTemplate<String, String> redisTemplate;

    @Scheduled(fixedRateString= "${publish.rate}")
    public void publishEvent(){
        Product product = this.repository.getRandomProduct();
        ObjectRecord<String, Product> record = StreamRecords.newRecord()
                .ofObject(product)
                .withStreamKey(streamKey);
        this.redisTemplate
                .opsForStream()
                .add(record)
                .subscribe(System.out::println);
        atomicInteger.incrementAndGet();
    }

    @Scheduled(fixedRate = 10000)
    public void showPublishedEventsSoFar(){
        System.out.println(
                "Total Events :: " + atomicInteger.get()
        );
    }

}
  • Phương thức publishEvent() đẩy một số giao dịch mua sản phẩm ngẫu nhiên theo định kỳ.
  • Phương thức showPublishedEventsSoFar() chỉ đơn giản là hiển thị số lượng đơn đặt hàng được đặt từ trước đến nay, nhằm mục đích ghi nhật ký.

Để giữ mọi thứ đơn giản, chúng ta không sử dụng DB mà tạo ra một danh sách sản phẩm từ code.

@Repository
public class ProductRepository {

    private static final List<Product> PRODUCTS = List.of(
            // appliances
            new Product("oven", 500.00, Category.APPLIANCES),
            new Product("dishwasher", 125.00, Category.APPLIANCES),
            new Product("heater", 65.00, Category.APPLIANCES),
            new Product("vacuum cleaner", 48.00, Category.APPLIANCES),
            new Product("refrigerator", 1200.00, Category.APPLIANCES),
            // books
            new Product("how to win friends and influence", 13.00, Category.BOOKS),
            new Product("ds and algorithms", 70.00, Category.BOOKS),
            new Product("effective java", 41.00, Category.BOOKS),
            new Product("clean architecture", 32.00, Category.BOOKS),
            new Product("microservices", 16.00, Category.BOOKS),
            // cosmetics
            new Product("brush", 9.50, Category.COSMETICS),
            new Product("face wash", 13.00, Category.COSMETICS),
            new Product("makeup mirror", 17.50, Category.COSMETICS),
            // electronics
            new Product("sony 4k tv", 999.25, Category.ELECTRONICS),
            new Product("headphone", 133.25, Category.ELECTRONICS),
            new Product("macbook", 2517.25, Category.ELECTRONICS),
            new Product("speaker", 65.25, Category.ELECTRONICS),
            // outdoor
            new Product("plants", 9.75, Category.OUTDOOR),
            new Product("power tools", 73.50, Category.OUTDOOR),
            new Product("pools", 111.75, Category.OUTDOOR)
    );

    public Product getRandomProduct(){
        int random = ThreadLocalRandom.current().nextInt(0, 20);
        return PRODUCTS.get(random);
    }

}

File application.peroperties

stream.key=purchase-events
publish.rate=1000

Redis Stream – Consumer

Có Producer rồi. Chúng ta hãy tạo một Consumer. Để sử dụng Redis Streams, chúng ta cần triển khai interface StreamListener.

@Service
public class PurchaseEventConsumer implements StreamListener<String, ObjectRecord<String, Product>> {

    private AtomicInteger atomicInteger = new AtomicInteger(0);

    @Autowired
    private ReactiveRedisTemplate<String, String> redisTemplate;

    @Override
    @SneakyThrows
    public void onMessage(ObjectRecord<String, Product> record) {
        System.out.println(
                InetAddress.getLocalHost().getHostName() + " - consumed :" +
                record.getValue()
        );
        this.redisTemplate
                .opsForZSet()
                .incrementScore("revenue", record.getValue().getCategory().toString(), record.getValue().getPrice())
                .subscribe();
        atomicInteger.incrementAndGet();
    }

    @Scheduled(fixedRate = 10000)
    public void showPublishedEventsSoFar(){
        System.out.println(
                "Total Consumed :: " + atomicInteger.get()
        );
    }

}
  • Đầu tiên, chúng ta hiển thị dữ liệu nhận được từ Producer.
  • Sau đó, chúng ta lấy được giá phải trả cho sản phẩm và thêm nó vào doanh thu (revenue) của danh mục loại sản phẩm được sắp xếp.
  • Cuối cùng là hiển thị số lượng sự kiện nhận được từ Producer theo định kỳ.

Cấu hình Redis Stream

Sau khi Consumer được tạo, chúng ta cần đăng ký Consumer ở trên vào StreamMessageListenerContainer.

@Configuration
public class RedisStreamConfig {

    @Value("${stream.key:purchase-events}")
    private String streamKey;

    // Autowired PurchaseEventConsumer (Consumer bên trên)
    @Autowired
    private StreamListener<String, ObjectRecord<String, Product>> streamListener;

    @Bean
    public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        var options = StreamMessageListenerContainer
                            .StreamMessageListenerContainerOptions
                            .builder()
                            .pollTimeout(Duration.ofSeconds(1))
                            .targetType(Product.class)
                            .build();
        var listenerContainer = StreamMessageListenerContainer
                                    .create(redisConnectionFactory, options);
        var subscription = listenerContainer.receiveAutoAck(
                Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
                streamListener);
        listenerContainer.start();
        return subscription;
    }

}

Cơ sở hạ tầng hóa ứng dụng (Dockerizing Infrastructure)

Dockerfile
Chúng ta sẽ tạo nhiều Consumer để xử lý các sự kiện mua hàng. Vì vậy, chúng ta sẽ tạo ra dockerfile.

# Use JRE11 slim
FROM openjdk:11.0-jre-slim

# Add the app jar
ADD target/*.jar redis-stream.jar

ENTRYPOINT java -jar redis-stream.jar

docker-compose file

version: '3'
services:
  redis:
    image: redis
    ports:
      - 6379:6379
  redis-commander:
    image: rediscommander/redis-commander:latest
    depends_on:
      - redis
    environment:
      - REDIS_HOSTS=redis:redis
    ports:
      - 8081:8081
  producer:
    build: ./redis-stream-producer
    image: docker/redis-stream-producer
    depends_on:
      - redis
    environment:
      - SPRING_REDIS_HOST=redis
      - PUBLISH_RATE=1000
  consumer:
    build: ./redis-stream-consumer
    image: docker/redis-stream-consumer
    depends_on:
      - redis
    environment:
      - SPRING_REDIS_HOST=redis

Redis Stream – Setup

Đầu tiên là chạy Redis và Redis-commander lên trước

docker-compose up redis redis-commander

Truy cập vào trình duyệt chúng ta sẽ thấy dịch vụ Redis đã được chạy

Chúng ta có thể tạo một stream như hình dưới đây. Đây là tất cả các lệnh redis liên quan đến stream: Redis Stream Command

XADD purchase-events * dummy-key dummy-value

Chúng ta tạo ra một consumer-group bằng lệnh sau:

XGROUP CREATE purchase-events purchase-events

Chạy Producer application

docker-compose up producer

Sau khi start producer nó sẽ bắt đầu gửi sự kiện định kỳ:

producer_1         | 1585682873612-0
producer_1         | 1585682873812-0
producer_1         | 1585682874013-0
producer_1         | 1585682874215-0
producer_1         | 1585682874413-0
producer_1         | 1585682874613-0
producer_1         | 1585682874812-0
producer_1         | 1585682875012-0
producer_1         | Total Events :: 51

Chạy Consumer application

docker-compose up --scale consumer=3

Chúng ta có thể thấy Consumer sử dụng tất cả các sự kiện mua hàng. Tải được phân phối cho tất cả Consumer trong consumger-group. Ở đây người consumer_2 cho thấy rằng nó xử lý nhiều sự kiện hơn vì nó bắt đầu trước những consumer khác.

producer_1         | 1585682887612-0
consumer_2         | 7b6c828647b0 - consumed :Product(name=how to win friends and influence, price=13.0, category=BOOKS)
producer_1         | 1585682887813-0
consumer_3         | 83699cab10bd - consumed :Product(name=ds and algorithms, price=70.0, category=BOOKS)
producer_1         | 1585682888012-0
consumer_1         | cdb3357593e6 - consumed :Product(name=headphone, price=133.25, category=ELECTRONICS)
producer_1         | 1585682888212-0
consumer_2         | 7b6c828647b0 - consumed :Product(name=oven, price=500.0, category=APPLIANCES)
consumer_1         | Total Consumed :: 18
consumer_2         | Total Consumed :: 84
producer_1         | 1585682888412-0
consumer_3         | 83699cab10bd - consumed :Product(name=makeup mirror, price=17.5, category=COSMETICS)
producer_1         | 1585682888612-0
consumer_1         | cdb3357593e6 - consumed :Product(name=ds and algorithms, price=70.0, category=BOOKS)
consumer_3         | Total Consumed :: 16

Truy cập redis-commander và tìm sorted set – revenue. Chúng tôi có thể thấy doanh thu các sản phẩm theo danh mục diễn ra trong thời gian thực:

Tổng kết

Vậy là chúng ta vừa triển khai thành công xử lý luồng dữ liệu theo thời gian thực bằng cách sử dụng Redis Stream với Spring Boot. Hi vọng bài viết hữu ích với mọi người.

Nguồn:https://thenewstack.wordpress.com/2021/11/25/redis-redis-stream-with-spring-boot/

Follow me: thenewstack.wordpress.com

Nguồn: viblo.asia

Bài viết liên quan

WebP là gì? Hướng dẫn cách để chuyển hình ảnh jpg, png qua webp

WebP là gì? WebP là một định dạng ảnh hiện đại, được phát triển bởi Google

Điểm khác biệt giữa IPv4 và IPv6 là gì?

IPv4 và IPv6 là hai phiên bản của hệ thống địa chỉ Giao thức Internet (IP). IP l

Check nameservers của tên miền xem website trỏ đúng chưa

Tìm hiểu cách check nameservers của tên miền để xác định tên miền đó đang dùn

Mình đang dùng Google Domains để check tên miền hàng ngày

Từ khi thông báo dịch vụ Google Domains bỏ mác Beta, mình mới để ý và bắt đầ