Apache Kafka – Producer – Gửi message đến Kafka bằng kafka-python

Overview Understand how to produce message and send to the Kafka topic Architecture Producer has many types and sources: message from Credit Card transactions, message from Facebook, Email or any systems When the producer send the message to kafka, kafka sau khi nhận message và randomly phân bố message đó về từng partition. Vậy nên

Overview

Understand how to produce message and send to the Kafka topic

Architecture

Producer has many types and sources: message from Credit Card transactions, message from Facebook, Email or any systems

When the producer send the message to kafka, kafka sau khi nhận message và randomly phân bố message đó về từng partition. Vậy nên Producer chỉ cần quan tâm việc:

  • Boostrap Server
  • Topic
  • Value_serializer : cách, định dạng mà message được gửi đến
  • client_id : là id mà client được cấp và producer có id này mới send được message to kafka topic
  • acks : có 3 dạng (0, 1, ‘all’), khi gửi message đến kafka, procedure yêu cầu kafka xác nhận cho mình để tiến hành process tiếp tục các message khác. defaults to acks=1
  1. acks = 0: Producer sẽ không chờ việc Kafka xác nhận đã hoàn thành việc nhận dữ liệu. Mà mỗi lần có message, Producer sẽ tự động add message vào menmory. Do vậy, trong một số trường hợp dữ liệu sẽ bị mất và Kafka không guarantee cho việc này.
  2. acks = 1: Producer sẽ chỉ chờ cho việc message được write xuống leader xong mà không quan tâm việc message được replicate sang những follower khác. Ngay sau đó, Producer sẽ tiếp tục gửi một message khác đến. Với cơ chế này thì trong 1 vài trường hợp message sẽ bị lost ở consumer, do có lỗi tại leader
  3. acks = all. Producer sẽ chờ cho toàn bộ quá trình leader và follower được write xuống thì mới process một message khác. Do vậy Kafka sẽ đảm bảo việc message sẽ được ko lost dữ liệu. Nhưng sẽ xảy ra trường hợp dữ liệu bị ngẽn tại Producer

Code example

Requirement

  • Python 3.6 , 3.7, 3.8
  • pip install kafka-python – required
  • pip install Faker – optional : this Lib to ramdomly create dummy data

Code example

  1. Please visit the previous document to know how to set up kafka, kafka CLI, Kafka UI.
  2. Produce the dummy data
from time import time
from kafka import KafkaProducer
from faker import Faker
import json, time

faker = Faker()defget_register():return{'name': faker.name(),'year': faker.year()}
  1. Send data to Kafka

As above we know that 3 points must have to send the message to kafka is:

  • boostrap server or broker: the ip/host of broker
  • topic name
  • value_serializer : message and message type

Code for sending message:

from time import time
from kafka import KafkaProducer
from faker import Faker
import json, time

faker = Faker()defget_register():return{'name': faker.name(),'year': faker.year()}defjson_serializer(data):return json.dumps(data).encode('utf-8')

producer = KafkaProducer(
    bootstrap_servers =['localhost:9092'],# server name
    value_serializer = json_serializer # function callable)while1==1:
    user = get_register()print(user)
    producer.send('second_topic',user
    )
    time.sleep(3)

Focus on only one partition

Set up việc gửi message chỉ đến 1 given partition in list partition of kafka

from time import time
from kafka import KafkaProducer
from faker import Faker
import json, time

faker = Faker()defget_register():return{'name': faker.name(),'add': faker.year()}defget_partitioner(key_bytes, all_partitions, available_partitions):return0defjson_serializer(data):return json.dumps(data).encode('utf-8')

producer = KafkaProducer(
    bootstrap_servers =['localhost:9092'],# server name
    value_serializer = json_serializer,# function callable# partitioner = get_partitioner, # function return 0 >>> only partition_0 can received messages)while1==1:
    user = get_register()print(user)
    producer.send('second_topic',user
    )
    time.sleep(3)

Nguồn: viblo.asia

Bài viết liên quan

Thay đổi Package Name của Android Studio dể dàng với plugin APR

Nếu bạn đang gặp khó khăn hoặc bế tắc trong việc thay đổi package name trong And

Lỗi không Update Meta_Value Khi thay thế hình ảnh cũ bằng hình ảnh mới trong WordPress

Mã dưới đây hoạt động tốt có 1 lỗi không update được postmeta ” meta_key=

Bài 1 – React Native DevOps các khái niệm và các cài đặt căn bản

Hướng dẫn setup jenkins agent để bắt đầu build mobile bằng jenkins cho devloper an t

Chuyển đổi từ monolith sang microservices qua ví dụ

1. Why microservices? Microservices là kiến trúc hệ thống phần mềm hướng dịch vụ,