Trong bài viết này, chúng ta sẽ cùng tìm hiểu về Redis Stream với Spring Boot để demo cách thực hiện xử lý luồng thời gian thực.
Redis ban đầu được biết đến như một Remote Dictionary Server được sử dụng để lưu thông tin vào bộ nhớ đệm (caching). Cùng với tính năng Master/ReadReplication & Pub/Sub, hiện tại Redis cũng đã hỗ trợ thêm tính năng Stream.
Một số bài viết cùng chủ đề mọi người có thể sẽ quan tâm:
Trước đây, chúng ta thường thu thập dữ liệu, lưu trữ trong cơ sở dữ liệu và xử lý dữ liệu hàng đêm. Nó được gọi là xử lý batch processing! Trong kỷ nguyên Microservices này, chúng ta nhận được luồng dữ liệu liên tục, đôi khi việc trì hoãn việc xử lý dữ liệu này có thể gây ảnh hưởng nghiêm trọng đến hoạt động kinh doanh của chúng ta. Ví dụ: Hãy xem xét một ứng dụng như Netflix / YouTube. Dựa trên bộ phim hoặc video chúng ta xem, các ứng dụng này hiển thị các đề xuất về những video hay bộ phim cùng chủ đề chúng ta đang xem ngay lập tức. Nó cung cấp trải nghiệm người dùng tốt hơn nhiều và giúp cho việc kinh doanh từ đó tốt hơn.
Stream processing là quá trình xử lý dữ liệu liên tục theo thời gian thực. Hãy xem cách chúng ta có thể xử lý luồng dữ liệu theo thời gian thực bằng cách sử dụng Redis Stream với Spring Boot.
Chúng ta cũng đã thảo luận về việc xử lý luồng dữ liệu thời gian thực bằng Apache Kafka. Apache Kafka đã hoạt động được 10 năm trong khi Redis còn khá mới trong lĩnh vực này. Một số tính năng của Redis Streams dường như được lấy cảm hứng từ Apache Kafka. Vấn đề với Kafka là nó rất khó cấu hình. Việc bảo trì cơ sở hạ tầng là rất khó khăn. Nhưng với Redis thì dễ dàng hơn.
Stream vs Pub/Sub
Sẽ có ai đó đặt câu hỏi rằng có cần hỗ trợ Stream nữa không khi chúng ta đã có tính năng Pub/Sub? Câu trả lời là Pub/Sub không giống như Stream, stream không phải là một giải pháp thay thế cho Pub/Sub.
Redis Stream với Spring Boot
- Publisher sẽ đẩy một số sự kiện liên quan đến các giao dịch mua bán vào Redis. Hãy gọi chúng là purchase-events stream.
- Một group-consumer quan tâm sẽ lắng nghe đến những sự kiện đó. Điều này có thể là để tính toán doanh thu hoặc xử lý thanh toán hoặc gửi email. Khi bạn cần thực hiện tất cả những điều này thì chúng ta cần một group-consumer riêng cho mỗi hành động.
- Consumer sẽ sử dụng các sự kiện và họ có thể làm bất cứ điều gì với nó. Trong trường hợp của chúng ta, chúng ta chỉ tìm giá mà người dùng đã trả và tính toán doanh thu theo danh mục loại sản phẩm. Để mọi thứ đơn giản, chúng ta sẽ ghi thông tin này dưới dạng SortedSet trong Redis.
Sample Application
Product Category
public enum Category {
APPLIANCES,
BOOKS,
COSMETICS,
ELECTRONICS,
OUTDOOR;
}
Product – DTO
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Product {
private String name;
private double price;
private Category category;
}
Redis Stream – Producer
Producer apllication sẽ tiếp tục đẩy sự kiện PurchaseEvents được định cấu hình định kỳ thông qua publish.rate.
@Service
public class PurchaseEventProducer {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Value("${stream.key}")
private String streamKey;
@Autowired
private ProductRepository repository;
@Autowired
private ReactiveRedisTemplate<String, String> redisTemplate;
@Scheduled(fixedRateString= "${publish.rate}")
public void publishEvent(){
Product product = this.repository.getRandomProduct();
ObjectRecord<String, Product> record = StreamRecords.newRecord()
.ofObject(product)
.withStreamKey(streamKey);
this.redisTemplate
.opsForStream()
.add(record)
.subscribe(System.out::println);
atomicInteger.incrementAndGet();
}
@Scheduled(fixedRate = 10000)
public void showPublishedEventsSoFar(){
System.out.println(
"Total Events :: " + atomicInteger.get()
);
}
}
- Phương thức publishEvent() đẩy một số giao dịch mua sản phẩm ngẫu nhiên theo định kỳ.
- Phương thức showPublishedEventsSoFar() chỉ đơn giản là hiển thị số lượng đơn đặt hàng được đặt từ trước đến nay, nhằm mục đích ghi nhật ký.
Để giữ mọi thứ đơn giản, chúng ta không sử dụng DB mà tạo ra một danh sách sản phẩm từ code.
@Repository
public class ProductRepository {
private static final List<Product> PRODUCTS = List.of(
// appliances
new Product("oven", 500.00, Category.APPLIANCES),
new Product("dishwasher", 125.00, Category.APPLIANCES),
new Product("heater", 65.00, Category.APPLIANCES),
new Product("vacuum cleaner", 48.00, Category.APPLIANCES),
new Product("refrigerator", 1200.00, Category.APPLIANCES),
// books
new Product("how to win friends and influence", 13.00, Category.BOOKS),
new Product("ds and algorithms", 70.00, Category.BOOKS),
new Product("effective java", 41.00, Category.BOOKS),
new Product("clean architecture", 32.00, Category.BOOKS),
new Product("microservices", 16.00, Category.BOOKS),
// cosmetics
new Product("brush", 9.50, Category.COSMETICS),
new Product("face wash", 13.00, Category.COSMETICS),
new Product("makeup mirror", 17.50, Category.COSMETICS),
// electronics
new Product("sony 4k tv", 999.25, Category.ELECTRONICS),
new Product("headphone", 133.25, Category.ELECTRONICS),
new Product("macbook", 2517.25, Category.ELECTRONICS),
new Product("speaker", 65.25, Category.ELECTRONICS),
// outdoor
new Product("plants", 9.75, Category.OUTDOOR),
new Product("power tools", 73.50, Category.OUTDOOR),
new Product("pools", 111.75, Category.OUTDOOR)
);
public Product getRandomProduct(){
int random = ThreadLocalRandom.current().nextInt(0, 20);
return PRODUCTS.get(random);
}
}
File application.peroperties
stream.key=purchase-events
publish.rate=1000
Redis Stream – Consumer
Có Producer rồi. Chúng ta hãy tạo một Consumer. Để sử dụng Redis Streams, chúng ta cần triển khai interface StreamListener
.
@Service
public class PurchaseEventConsumer implements StreamListener<String, ObjectRecord<String, Product>> {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Autowired
private ReactiveRedisTemplate<String, String> redisTemplate;
@Override
@SneakyThrows
public void onMessage(ObjectRecord<String, Product> record) {
System.out.println(
InetAddress.getLocalHost().getHostName() + " - consumed :" +
record.getValue()
);
this.redisTemplate
.opsForZSet()
.incrementScore("revenue", record.getValue().getCategory().toString(), record.getValue().getPrice())
.subscribe();
atomicInteger.incrementAndGet();
}
@Scheduled(fixedRate = 10000)
public void showPublishedEventsSoFar(){
System.out.println(
"Total Consumed :: " + atomicInteger.get()
);
}
}
- Đầu tiên, chúng ta hiển thị dữ liệu nhận được từ Producer.
- Sau đó, chúng ta lấy được giá phải trả cho sản phẩm và thêm nó vào doanh thu (revenue) của danh mục loại sản phẩm được sắp xếp.
- Cuối cùng là hiển thị số lượng sự kiện nhận được từ Producer theo định kỳ.
Cấu hình Redis Stream
Sau khi Consumer được tạo, chúng ta cần đăng ký Consumer ở trên vào StreamMessageListenerContainer
.
@Configuration
public class RedisStreamConfig {
@Value("${stream.key:purchase-events}")
private String streamKey;
// Autowired PurchaseEventConsumer (Consumer bên trên)
@Autowired
private StreamListener<String, ObjectRecord<String, Product>> streamListener;
@Bean
public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.targetType(Product.class)
.build();
var listenerContainer = StreamMessageListenerContainer
.create(redisConnectionFactory, options);
var subscription = listenerContainer.receiveAutoAck(
Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
streamListener);
listenerContainer.start();
return subscription;
}
}
Cơ sở hạ tầng hóa ứng dụng (Dockerizing Infrastructure)
Dockerfile
Chúng ta sẽ tạo nhiều Consumer để xử lý các sự kiện mua hàng. Vì vậy, chúng ta sẽ tạo ra dockerfile.
# Use JRE11 slim
FROM openjdk:11.0-jre-slim
# Add the app jar
ADD target/*.jar redis-stream.jar
ENTRYPOINT java -jar redis-stream.jar
docker-compose file
version: '3'
services:
redis:
image: redis
ports:
- 6379:6379
redis-commander:
image: rediscommander/redis-commander:latest
depends_on:
- redis
environment:
- REDIS_HOSTS=redis:redis
ports:
- 8081:8081
producer:
build: ./redis-stream-producer
image: docker/redis-stream-producer
depends_on:
- redis
environment:
- SPRING_REDIS_HOST=redis
- PUBLISH_RATE=1000
consumer:
build: ./redis-stream-consumer
image: docker/redis-stream-consumer
depends_on:
- redis
environment:
- SPRING_REDIS_HOST=redis
Redis Stream – Setup
Đầu tiên là chạy Redis và Redis-commander lên trước
docker-compose up redis redis-commander
Truy cập vào trình duyệt chúng ta sẽ thấy dịch vụ Redis đã được chạy
Chúng ta có thể tạo một stream như hình dưới đây. Đây là tất cả các lệnh redis liên quan đến stream: Redis Stream Command
XADD purchase-events * dummy-key dummy-value
Chúng ta tạo ra một consumer-group bằng lệnh sau:
XGROUP CREATE purchase-events purchase-events
Chạy Producer application
docker-compose up producer
Sau khi start producer nó sẽ bắt đầu gửi sự kiện định kỳ:
producer_1 | 1585682873612-0
producer_1 | 1585682873812-0
producer_1 | 1585682874013-0
producer_1 | 1585682874215-0
producer_1 | 1585682874413-0
producer_1 | 1585682874613-0
producer_1 | 1585682874812-0
producer_1 | 1585682875012-0
producer_1 | Total Events :: 51
Chạy Consumer application
docker-compose up --scale consumer=3
Chúng ta có thể thấy Consumer sử dụng tất cả các sự kiện mua hàng. Tải được phân phối cho tất cả Consumer trong consumger-group. Ở đây người consumer_2 cho thấy rằng nó xử lý nhiều sự kiện hơn vì nó bắt đầu trước những consumer khác.
producer_1 | 1585682887612-0
consumer_2 | 7b6c828647b0 - consumed :Product(name=how to win friends and influence, price=13.0, category=BOOKS)
producer_1 | 1585682887813-0
consumer_3 | 83699cab10bd - consumed :Product(name=ds and algorithms, price=70.0, category=BOOKS)
producer_1 | 1585682888012-0
consumer_1 | cdb3357593e6 - consumed :Product(name=headphone, price=133.25, category=ELECTRONICS)
producer_1 | 1585682888212-0
consumer_2 | 7b6c828647b0 - consumed :Product(name=oven, price=500.0, category=APPLIANCES)
consumer_1 | Total Consumed :: 18
consumer_2 | Total Consumed :: 84
producer_1 | 1585682888412-0
consumer_3 | 83699cab10bd - consumed :Product(name=makeup mirror, price=17.5, category=COSMETICS)
producer_1 | 1585682888612-0
consumer_1 | cdb3357593e6 - consumed :Product(name=ds and algorithms, price=70.0, category=BOOKS)
consumer_3 | Total Consumed :: 16
Truy cập redis-commander và tìm sorted set – revenue. Chúng tôi có thể thấy doanh thu các sản phẩm theo danh mục diễn ra trong thời gian thực:
Tổng kết
Vậy là chúng ta vừa triển khai thành công xử lý luồng dữ liệu theo thời gian thực bằng cách sử dụng Redis Stream với Spring Boot. Hi vọng bài viết hữu ích với mọi người.
Nguồn:https://thenewstack.wordpress.com/2021/11/25/redis-redis-stream-with-spring-boot/
Follow me: thenewstack.wordpress.com
Nguồn: viblo.asia