Giới Thiệu
Tiếp tục với series Java Practice, hôm nay chúng ta tiếp tục tìm hiểu về Temporal một thư viện workflow còn khá mới mẻ và triển khai nó trên Quarkus.
Bắt đầu thôi.
Temporal là gì?
Đầu tiên chắc chắn bạn đã hiểu về workflow!
Workflow (luồng công việc) là một sơ đồ miêu tả thứ tự thực hiện từng công việc, từng sự kiện.
Sơ đồ này giúp cho nhà quản trị thấy được chính xác công việc được thực hiện như thế nào hay có thể dùng nó để thiết kế một trình tự công việc khoa học và mang lại hiệu quả cao.
Theo Temporal.io đề cập định nghĩa như sau:
A Temporal Application is a set of Temporal Workflow Executions.
Each Temporal Workflow Execution has exclusive access to its local state, executes concurrently to all other Workflow Executions,
and communicates with other Workflow Executions and the environment via message passing
Một cách dễ hiểu thì Temporal là một luồng chứa các công việc cần thực hiện và yêu cầu bạn cần thực hiện tuần tự công việc của mình, Trong Temporal một luồng công việc có thể kéo dài nhiều ngày để xử lý và các khoảng GAP Time giữa các công việc trong luồng đôi khi kéo dài hàng giờ đồng hồ nhưng không tiêu tốn tài nguyên máy chủ.
Ví dụ:
Một luồng đơn hàng khi khách hàng đặt có thể kéo dài tới 10 ngày và trải qua nhiều công đoạn công việc khác nhau từ xác nhận đơn hàng -> đóng gói -> bàn giao vận chuyển -> giao hàng thành công
.
Đây là một quy trình sẽ lập đi lập lại và nó yêu cầu thực hiện tuần tự từng bước (bỏ qua các ngoại lệ về tồn kho và giao hàng thất bại).
Trên thực tế một đơn hàng có rất nhiều trạng thái và trường hợp có thể xảy ra, và điều này là không hề dễ dàng để quản lý thì Temporal ra đời như một vị cứu tinh, giúp giải quyết các vấn đề về luồng công việc đơn giản hơn.
Temporal đã được đưa vô Hoàng Phúc giúp việc xử lý luồng đơn hàng bớt khó khăn hơn rất nhiều.
Temporal có 4 thành phần quan trọng để giúp nó vận hành
- Macthing Service: Service này dùng để lưu trữ các Task Queues
- Worker Service: Service này sẽ triển khai các quy trình làm việc(workflow)
- History Service: Nó ghi nhớ lại quá trình làm việc, lưu trữ các mutable state, queues
- Frontend Service: hỗ trợ về rate limiting, routing và authorizing
Trong bài viết này chúng ta sẽ không đi sâu về Temporal Cluster mà chỉ tập trung vào phần Worker Service
thôi nhé
Worker Service
Worker Service sẽ có 2 thành phần chính là workflow
và activity
- workflow sẽ là các bước tiến trình của công việc, trong ví dụ trên thì mỗi trạng thái là một truy trình công việc.
- activity là các quy trình con của workflow.
Cài đặt Temporal
Trong bài viết này mình sẽ triển khai Temporal trên Docker.
Các bạn tạo file docker-compose.yml
với nội dung bên dưới
version: "3.5"
services:
elasticsearch:
container_name: temporal-elasticsearch
environment:
- cluster.routing.allocation.disk.threshold_enabled=true
- cluster.routing.allocation.disk.watermark.low=512mb
- cluster.routing.allocation.disk.watermark.high=256mb
- cluster.routing.allocation.disk.watermark.flood_stage=128mb
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms100m -Xmx100m
image: elasticsearch:7.16.2
networks:
- temporal-network
ports:
- 9200:9200
postgresql:
container_name: temporal-postgresql
environment:
POSTGRES_PASSWORD: temporal
POSTGRES_USER: temporal
image: postgres:13
networks:
- temporal-network
ports:
- 5432:5432
temporal:
container_name: temporal
depends_on:
- postgresql
- elasticsearch
environment:
- DB=postgresql
- DB_PORT=5432
- POSTGRES_USER=temporal
- POSTGRES_PWD=temporal
- POSTGRES_SEEDS=postgresql
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development_es.yaml
- ENABLE_ES=true
- ES_SEEDS=elasticsearch
- ES_VERSION=v7
image: temporalio/auto-setup:1.15.0
networks:
- temporal-network
ports:
- 7233:7233
volumes:
- ./dynamicconfig:/etc/temporal/config/dynamicconfig
temporal-admin-tools:
container_name: temporal-admin-tools
depends_on:
- temporal
environment:
- TEMPORAL_CLI_ADDRESS=temporal:7233
image: temporalio/admin-tools:1.15.0
networks:
- temporal-network
stdin_open: true
tty: true
temporal-web:
container_name: temporal-web
depends_on:
- temporal
environment:
- TEMPORAL_GRPC_ENDPOINT=temporal:7233
- TEMPORAL_PERMIT_WRITE_API=true
image: temporalio/web:1.13.0
networks:
- temporal-network
ports:
- 8088:8088
networks:
temporal-network:
driver: bridge
name: temporal-network
Đợi docker pull image về và setup xong, các bạn truy cập trình duyệt http://localhost:8088
và thấy giao diện như hình là ok rồi nhé.
Temporal với Quarkus
Trong bài viết này mình sẽ triển khai ví dụ trên Quarkus nhé, đối với các bạn sử dụng Spring Boot cũng sẽ triển khai tương tự như cấu trúc này.
Bạn có thể xem qua code mình đã dùng trong bài viết này tại: https://github.com/sackaboy/temporal-order-example
Thêm Dependency vô file POM:
<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-sdk</artifactId>
<version>1.5.0</version>
</dependency>
Tạo file WorkflowObservable.java
để khởi tạo kết nối tới Temporal
@ApplicationScoped
public class WorkflowObservable {
private static final Logger LOG = Logger.getLogger(WorkflowObservable.class);
private WorkflowClient client;
private WorkerFactory factory;
void onStart(@Observes StartupEvent event){
LOG.info("****************** On start: Run workflow ****************");
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
client = WorkflowClient.newInstance(service);
factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker(OrderProduct.QUEUE_NAME);
worker.registerWorkflowImplementationTypes(OrderProductImpl.class);
worker.registerActivitiesImplementations(new ActivityOrderNewProductImpl());
LOG.info("Start WorkflowService ");
factory.start();
}
void onStop(@Observes ShutdownEvent event){
factory.shutdown();
LOG.info("****************** On stop: stop workflow ****************");
}
public WorkflowClient getClient(){
return client;
}
}
Đoạn code trên minh đăng ký với Temporal Workflow là class OrderProduct
, Workflow Implement là OrderProductImpl
và Activity Implement l class ActivityOrderNewProductImpl
, Trong Workflow nó sẽ tiến hành tìm WorkflowMethod
và các QueryMethod
, SignalMethod
nếu có.
Activity được coi là các quy trình con của Workflow nên Temporal sẽ import tất cả phương thức được khai báo trong Activity.
Trong file OrderProduct.java
@WorkflowInterface
public interface OrderProduct {
public static final String QUEUE_NAME = "Customer_Order";
@WorkflowMethod
Invoice orderNewProduct(Invoice invoice);
@QueryMethod
String getStatus();
@SignalMethod
void signalOrderAccepted();
@SignalMethod
void signalOrderPickedUp();
@SignalMethod
void signalDelivered();
}
Tiếp tới file ActivityOrderNewProductImpl
sẽ implements class OrderProduct để triển khai các quy trình con(Activity).
public class OrderProductImpl implements OrderProduct {
private final RetryOptions retryoptions = RetryOptions.newBuilder()
.setInitialInterval(Duration.ofSeconds(1))
.setMaximumInterval(Duration.ofSeconds(5))
.setBackoffCoefficient(2)
.setMaximumAttempts(10)
.build();
private final ActivityOptions defaultActivityOptions = ActivityOptions.newBuilder()
// Timeout options specify when to automatically timeout Activities if the process is taking too long.
.setStartToCloseTimeout(Duration.ofSeconds(60))
// Optionally provide customized RetryOptions.
// Temporal retries failures by default, this is simply an example.
.setRetryOptions(retryoptions)
.build();
private final ActivityOrderNewProduct activityOrderNewProduct = Workflow.newActivityStub(ActivityOrderNewProduct.class,defaultActivityOptions);
public boolean isOrderConfirmed = false;
public boolean isOrderPickedUp = false;
public boolean isOrderDelivered = false;
private String status = "";
private static final Logger LOG = Logger.getLogger(OrderProductImpl.class);
@Override
public Invoice orderNewProduct(Invoice invoice) {
try{
activityOrderNewProduct.placeOrder(invoice.getCustomer());
status = "waiting confirm";
if(activityOrderNewProduct.checkInventoryQuantity(
invoice.getInvoiceDetailList().get(0).getProduct(),
invoice.getInvoiceDetailList().get(0).getQuantity())
){
LOG.info("***** Waiting for Hoang Phuc International to confirm your order");
activityOrderNewProduct.assignEmpConfirm();
Workflow.await(() -> isOrderConfirmed);
LOG.info("isOrderConfirmed: "+isOrderConfirmed);
invoice.setStatus("confirmed");
status = invoice.getStatus();
LOG.info("***** Please wait till we assign a delivery executive");
Workflow.await(() -> isOrderPickedUp);
invoice.setStatus( "picked up");
status = invoice.getStatus();
LOG.info("isOrderPickedUp: "+isOrderPickedUp);
Workflow.await(() -> isOrderDelivered);
invoice.setStatus( "delivered");
status = invoice.getStatus();
}else{
LOG.info("***** Placed Order Not Successfully !");
invoice.setStatus( "cancelled");
status = invoice.getStatus();
activityOrderNewProduct.notifyCustomer(invoice.getCustomer());
}
return invoice;
}catch (Exception e){
invoice.setStatus( "cancelled cause error");
status = invoice.getStatus();
}
return invoice;
}
@Override
public String getStatus() {
return status;
}
@Override
public void signalOrderAccepted() {
activityOrderNewProduct.setOrderAccepted();
this.isOrderConfirmed = false;
}
@Override
public void signalOrderPickedUp() {
activityOrderNewProduct.setOrderPickedUp();
this.isOrderPickedUp = true;
}
@Override
public void signalDelivered() {
activityOrderNewProduct.setOrderDelivered();
this.isOrderDelivered = true;
}
}
Tiếp theo chúng ta sẽ khai báo với Temporal các Activity, Trước tiên mình sẽ khai báo 1 lớp Interface để khai báo Activity
@ActivityInterface
public interface ActivityOrderNewProduct {
void placeOrder(Customer customer);
Boolean checkInventoryQuantity(Product product, int quantity);
void assignEmpConfirm() throws IOException;
void notifyEmployee() throws IOException;
void notifyCustomer(Customer customer);
void notifyDeliver(Employee employee);
void setOrderAccepted();
void setOrderPickedUp();
void setOrderDelivered();
}
và tiếp tục là class ActivityOrderNewProductImpl
để implements lại class ActivityOrderNewProduct.
public class ActivityOrderNewProductImpl implements ActivityOrderNewProduct {
public ActivityOrderNewProductImpl(){
}
private static final Logger LOG = Logger.getLogger(ActivityOrderNewProductImpl.class);
@Override
public void placeOrder(Customer customer) {
LOG.info("***** Customer order new");
}
@Override
public Boolean checkInventoryQuantity(Product product, int quantity) {
if(product.getQuantity() >= quantity){
LOG.info("***** Inventory enough to provide");
return true;
}
LOG.info("***** Inventory not enough to provide");
return false;
}
@Override
public void assignEmpConfirm() throws IOException {
//Get Employee
LOG.info("***** Order just assigned to: ");
this.notifyEmployee();
}
@Override
public void notifyEmployee() throws IOException {
LOG.info("***** Notify ZNS to employee: " );
}
@Override
public void notifyCustomer(Customer customer) {
LOG.info("***** Notify to customer : " + customer.getName());
}
@Override
public void notifyDeliver(Employee employee) {
LOG.info("***** Notify to deliver : "+employee.getName());
}
@Override
public void setOrderAccepted() {
LOG.info("***** Employee has confirmed your order");
}
@Override
public void setOrderPickedUp() {
LOG.info("***** Order has been picked up");
}
@Override
public void setOrderDelivered() {
LOG.info("***** Order Delivered");
}
}
Trong file WorkflowResource.java bạn mình tạo ra 1 hàm để thực thi đẩy 1 workflow lên Temporal.
@POST
@Path("/placeOrder")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public Invoice placeNewOrder(PageLoadInvoice pageLoadInvoice) {
int productOrderQuantity = 2;
Invoice invoice = invoiceBusiness.createNew(pageLoadInvoice.cusID);
InvoiceDetail invoiceDetail = invoiceDetailBusiness.createNew(invoice.getId(),pageLoadInvoice.prodID, productOrderQuantity);
invoice.addInvoiceDetail(invoiceDetail);
OrderProduct workflow =
workflowObservable.getClient().newWorkflowStub(
OrderProduct.class, WorkflowOptions.newBuilder()
.setWorkflowId("Customer_Order_" + Math.abs(new Random().nextInt()))
.setTaskQueue(OrderProduct.QUEUE_NAME).build());
WorkflowClient.start(workflow::orderNewProduct,invoice);
return invoice;
}
Trong hàm này các bạn có thể thấy Workflow được chỉ định sẽ start workflow này bằng hàm oderNewProduct
nằm trong interface OrderProduct
.
Lúc này oderProductImpl
implements oderNewProduct
sẽ được gọi ra và xử lý.
Bạn có thể thấy hàm orderNewProduct trong oderProductImpl
có nhiệm vụ gọi đến các Activity và khi một Activity được thực hiện, Temporal sẽ ghi nhớ quá trình này là tiếp tục thực hiện các Activity khác sau 1 thời gian nghỉ.
Trong oderProductImpl
bạn sẽ thấy có rất nhiều đoạn Workflow.await(Supplier<Boolean> unblockCondition)
, khi Temporal chạy đến hàm này nó sẽ đợi cho tới khi Condition = true. Trong Temporal cơ chế nghỉ này sẽ không tiêu tốn tài nguyên máy chủ.
Trong ví dụ trên của mình Workflow.await
sẽ cần phải nhận được các SignalMethod
từ bên ngoài gọi vô để tiếp tục thực hiện.
Sau đó các bạn thực hiện gọi hàm này
curl -X 'POST'
'http://localhost:8080/workflow/placeOrder'
-H 'accept: application/json'
-H 'Content-Type: application/json'
-d '{
"prodID": 1,
"cusID": 1
}'
Lúc này 1 Workflow mới đã được khởi tạo với trạng thái Running.
Khi một đơn hàng mới tạo ra mình sẽ gắn cho nó trạng thái bắt đầu là waiting confirm
Tiếp đó mình sẽ thực hiện gọi tiếp hàm sau:
curl -X 'GET'
'http://localhost:8080/workflow/confirmed/271924149'
-H 'accept: application/json'
Hàm này sẽ thực hiện các nhiệm vụ của nó là đưa trạng thái đơn hàng thành confirmed
.
Tương tự với hàm pickedup, delivered cũng sẽ đưa trạng thái đơn hàng về đúng nhu cầu và kết thúc 1 workflow khi nhận được trạng thái delivered.
Ngoài việc triển khai các Workflow bằng code, bạn hoàn toàn có
thể khai báo các workflow của mình trong file Json. Điều này là vô cùng tiện lợi khi nghiệm của của các ứng dụng phức tạp và có sự thay đổi nhiều. Nếu nhận được sự ủng hộ của mọi người về Temporal thì mình sẽ viết 1 bài khác hướng dẫn sau
Kết thúc
Temporal là một kiến trúc rất hay, tuy nhiên trong 1 bài viết ngắn ngủi cũng tương đối khó để mình trình bài hết về Temporal cũng như giúp bạn hiểu rõ được nó.
Hẹn các bạn trong bài viết khác trong series Java practice.
Bạn có thể xem qua code mình đã dùng trong bài viết này tại: https://github.com/sackaboy/temporal-order-example
Nguồn: viblo.asia