Data Change Capture (CDC) với Debezium

Mở đầu Làm việc với hệ thống database từ trước đến nay vẫn luôn là công việc khó khăn và bạc đầu, dạo gần đây mình có cơ hội làm việc với một giải pháp mới cho database sử dụng để phát hiện ra sự thay đổi từ một database và sao chép dữ liệu

Mở đầu

Làm việc với hệ thống database từ trước đến nay vẫn luôn là công việc khó khăn và bạc đầu, dạo gần đây mình có cơ hội làm việc với một giải pháp mới cho database sử dụng để phát hiện ra sự thay đổi từ một database và sao chép dữ liệu đó sang một database khác cùng loại hoặc khác loại. Để xử lý bài toán này mình sử dụng giải pháp có tên như title – Change Data Capture hay nói gắn gọn là CDC.

image.png

Change Data Capture là gì?

Đúng như cái tên của nó là bắt sự thay đổi dữ liệu, đây là kỹ thuật sử dụng để chúng ta bắt được những sự thay đổi về dữ liệu chứa trong database. Việc bắt được sự thay đổi về dữ liệu sẽ giúp chúng ta xử lý được kha khá các bài toán trong việc xử lý dữ liệu, những bài toán này thế nào cũng ta sẽ tìm hiểu trong phần tiếp theo.

Để mà có thể bắt được sự thay đổi dữ liệu này thì có rất nhiều các khác nhau, nguyên thủy nhất ta có thể sử dụng cơ chế TRIGGER trong các database đã hỗ trợ sẵn để bắt các ACTION về update, insert, delete,… Hoặc nhẹ nhàng hơn chúng ta có thể sử dụng các công cụ để làm việc này, điển hình có công cụ Debezium đang nổi bật nhất.

Lợi ích của CDC

Lợi ích đầu tiên mà chắc ai cũng thấy đó là việc sao chép dữ liệu sang các hệ thống khác, nếu nói về lợi ích này có thể 1 số bạn sẽ nói rằng: Các hệ thống database đều hỗ trợ các cơ chế replica rồi thì sử dụng luôn chứ cần gì phải sử dụng một tool bên ngoài cho rức đầu mà kém ổn định? Okay, đúng! Nếu bạn chỉ cần sao chép dữ liệu từ các database cùng loại (MySQL => MySQL, Mongo => Mongo) thì sử dụng luôn tính năng của database là tốt nhất. Tuy nhiên giờ nếu bạn muốn sao chép dữ liệu từ MySQL qua MongoDB hay từ MySQL qua PostgreSQL thì sẽ không có cơ chế đó. Trong bài toán này CDC sẽ đứng giữa để phát hiện sự thay đổi ở Database cần theo dõi và xử lý dữ liệu, sao đó có thể dùng code để xử lý và đẩy dữ liệu và hệ thống cần sao chép dữ liệu.

image.png

Một lợi ích khác cũng không kém phần quan trọng đó là khả năng Backup dữ liệu. Các sự kiện thay đổi dữ liệu sẽ được lưu trữ lại chính vì vậy nếu có chăng không may database của bạn bị drop vào lúc 9h sáng, bạn có thể lấy bản backup lúc 3h sáng và apply lại những thay đổi đã được lưu lại từ 3h đến 9h. Theo lý thuyết nếu bạn không miss bất cứ event nào thì dữ liệu của bạn sẽ được phục hồi đầy đủ như trước lúc bị drop. Quá đã đúng không 😃)))

image.png

Tiếp tục với lợi ịch đầu tiên, sau khi ta sao chép dữ liệu sang hệ thống khác ta có thể sử dụng hệ thống này cho việc testing thay vì tương tác trực tiếp trên hệ thống database thật. Không hiếm có những chuyện trong quá trình test Developers chạy những câu query tốn hàng phút để xử lý, thậm chí tệ hơn có thể gây ra lock hệ thống. Vấn đề này nhẹ thì gây giảm hiệu năng hệ thống, nặng thì gây crash luôn. CDC cũng là một cách giúp chúng ta giảm thiểu đi các trường hợp thế này sẽ xảy ra.

image.png

Ngoài ra CDC còn hỗ trợ cho một số bài toán đặc thù của từng hệ thống hay xử lý dữ liệu Big Data, bạn nào từng ứng dụng CDC vào các bài toán này rồi chia sẻ với mình ở dưới nhé.

Debezium – CDC Tool

Nói lý thuyết hoài mà không có tý ví dụ để xem thì có ích gì đâu, chính vì thế mình sẽ giới thiệu công cụ mà đã có thời gian làm việc và thấy khá ngon, công cụ này là Debezium. Debezium cốt lỗi của nó sử dụng Kafka để tạo ra các messages tương ứng với các events thay đổi dữ liệu. Debezium sử dụng các Connector để kết nối đến các hệ thống database và bắt ra sự thay đổi, hiện tại Debezium 1.9 hỗ trợ MySQL, PostgreSQL, MongoDB, Oracle, SQL Server, DB2, Cassandra, Vitess. Bạn có thể xem hướng dẫn cho từng connector tại document chính thức: https://debezium.io/documentation/reference/2.0/connectors/index.html

Trong phần này mình sẽ mô tả các bước cài đặt của MySQL, loại database chắc mọi người làm việc nhiều nhất.

Đối với MySQL, Debezium sẽ dựa vào binlog để có thể phát hiện ra sự thay đổi về dữ liệu, chính vì vậy đối với hệ thống cần monitor thì bạn cần enable tính năng binlog này và chắc chắn user dùng để kết nối đến cần có quyền đọc binlog.

Mình làm việc với Kubernetes nhiều nên công cụ này mình sẽ hướng dẫn mọi người dựng trên K8s, đối với các môi trường khác như VM hay Docker thì về cơ bản cũng có các thành phần tương tự.

Debezium khi chạy trên K8s sẽ sử dụng Strimzi Operator (đây là một Operator cho Kafka). Đầu tiên ta tạo một namespace riêng cho ứng dụng này:

kubectl create ns debezium-example

Sau đó ta cần cài đặt Strimzi Operator

curl -sL https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.20.0/install.sh | bash -s v0.20.0

Tạo Secret cho database demo

cat << EOF | kubectl create -n debezium-example -f
apiVersion: v1
kind: Secret
metadata:
  name: debezium-secret
  namespace: debezium-example
type: Opaque
data:
  username: ZGViZXppdW0=
  password: ZGJ6
EOF

Tạo User và phân quyền cho Debezium

cat << EOF | kubectl create -n debezium-example -f
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: connector-configuration-role
  namespace: debezium-example
rules:
- apiGroups: [""]
  resources: ["secrets"]
  resourceNames: ["debezium-secret"]
  verbs: ["get"]
EOF
$ cat << EOF | kubectl create -n debezium-example -f
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: connector-configuration-role-binding
  namespace: debezium-example
subjects:
- kind: ServiceAccount
  name: debezium-connect-cluster-connect
  namespace: debezium-example
roleRef:
  kind: Role
  name: connector-configuration-role
  apiGroup: rbac.authorization.k8s.io
EOF

Giờ đến phần quan trọng, chúng ta sẽ khởi tạo một cụm Kafka phục vụ cho việc lưu trữ changes event. Cấu hình dưới sẽ tạo 1 pod kafka tương ứng 1 broker và 1 pod zookeeper.

$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: debezium-cluster
spec:
  kafka:
    replicas: 1
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external
        port: 9094
        type: nodeport
        tls: false
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      default.replication.factor: 1
      min.insync.replicas: 1
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 100Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}
EOF

Tiếp theo ta triển khai một database MySQL để test thử, user và pass của DB này là mysqluser – msqlpw

cat << EOF | kubectl create -n debezium-example -f -
apiVersion: v1
kind: Service
metadata:
  name: mysql
spec:
  ports:
  - port: 3306
  selector:
    app: mysql
  clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mysql
spec:
  selector:
    matchLabels:
      app: mysql
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: mysql
    spec:
      containers:
      - image: quay.io/debezium/example-mysql:1.9
        name: mysql
        env:
        - name: MYSQL_ROOT_PASSWORD
          value: debezium
        - name: MYSQL_USER
          value: mysqluser
        - name: MYSQL_PASSWORD
          value: mysqlpw
        ports:
        - containerPort: 3306
          name: mysql
EOF

Giờ ta sẽ triển khai các thành phần với vai trò kết nối đến MySQL và detect changes. Đầu tiên ta cần tạo KafkaConnect để thực hiện công việc detect changes:

$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.1.0
  replicas: 1
  bootstrapServers: debezium-cluster-kafka-bootstrap:9092
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
  build:
    output:
      type: docker
      image: 10.110.154.103/debezium-connect-mysql:latest
    plugins:
      - name: debezium-mysql-connector
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/{debezium-version}/debezium-connector-mysql-{debezium-version}-plugin.tar.gz
EOF

Sau đó ta triển khai thêm các KafkaConnector để kết nối đến MySQL được gắn vào KafkaConnect đã tạo ở trên

cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-connector-mysql
  labels:
    strimzi.io/cluster: debezium-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    tasks.max: 1
    database.hostname: mysql
    database.port: 3306
    database.user: ${secrets:debezium-example/debezium-secret:username}
    database.password: ${secrets:debezium-example/debezium-secret:password}
    database.server.id: 184054
    database.server.name: mysql
    database.include.list: inventory
    database.history.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092
    database.history.kafka.topic: schema-changes.inventory
EOF

Như vậy là đã setup xong và giờ ta có thể giám sát được sự thay đổi trong database. Ta chạy câu lệnh này để listen messages trong kafka

kubectl run -n debezium-example -it --rm --image=quay.io/debezium/tooling:1.2  --restart=Never watcher -- kcat -b debezium-cluster-kafka-bootstrap:9092 -C -o beginning -t mysql.inventory.customers

Mở một terminal khác, Giờ ta sẽ truy cập vào DB và tiến hành thêm một bản ghi để test thử:

kubectl run -n debezium-example -it --rm --image=mysql:8.0 --restart=Never --env MYSQL_ROOT_PASSWORD=debezium mysqlterm -- mysql -hmysql -P3306 -uroot -pdebezium

Add thêm một bản ghi:

sql> update customers set first_name="Sally Marie" where id=1001;

Nếu các bạn nhìn thấy có message dạng JSON như sau thì việc setup đã thành công:

{
...
  "payload": {
    "before": {
      "id": 1001,
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "after": {
      "id": 1001,
      "first_name": "Sally Marie",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "source": {
      "version": "{debezium-version}",
      "connector": "mysql",
      "name": "mysql",
      "ts_ms": 1646300467000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 401,
      "row": 0,
      "thread": null,
      "query": null
    },
    "op": "u",
    "ts_ms": 1646300467746,
    "transaction": null
  }
}

Message thông báo ra sẽ có 3 mục chính gồm source (nguồn chứa dữ liệu, ví dụ file binlog bao nhiêu), before (dữ liệu trước khi thay đổi) và after (dữ liệu sau khi thay đổi).

Kết

Trong quá trình cài mình gặp khá nhiều vấn đề :v, một phần do chưa có nhiều kinh nghiệm với Kafka. Nếu các bạn cài đặt gặp lỗi thì bạn không cô đơn đâu 😃))) Lỗi gì bạn có thể comment để mình hỗ trợ. Hy vọng bài viết này đã giúp bạn có thêm 1 vài kiến thức về Change Data Capture.

Một số source mình tham khảo:
https://luminousmen.com/post/change-data-capturehttps://www.striim.com/tutorial/streaming-data-integration-using-cdc-to-stream-database-changes/

Nguồn: viblo.asia

Bài viết liên quan

WebP là gì? Hướng dẫn cách để chuyển hình ảnh jpg, png qua webp

WebP là gì? WebP là một định dạng ảnh hiện đại, được phát triển bởi Google

Điểm khác biệt giữa IPv4 và IPv6 là gì?

IPv4 và IPv6 là hai phiên bản của hệ thống địa chỉ Giao thức Internet (IP). IP l

Check nameservers của tên miền xem website trỏ đúng chưa

Tìm hiểu cách check nameservers của tên miền để xác định tên miền đó đang dùn

Mình đang dùng Google Domains để check tên miền hàng ngày

Từ khi thông báo dịch vụ Google Domains bỏ mác Beta, mình mới để ý và bắt đầ