[Golang] Channel trong golang và use case – part IIII (pubsub pattern)

Mở đầu Tiếp tục series, hôm nay là một buổi chia sẽ của tôi về cách implement lại pubsub pattern bằng golang channel. Let’s go, guys! Pubsub Trước hết ta sẽ có một định nghĩa đầy đủ từ wiki : In software architecture, publish–subscribe is a messaging pattern where senders of messages, called publishers, do

Mở đầu

  • Tiếp tục series, hôm nay là một buổi chia sẽ của tôi về cách implement lại pubsub pattern bằng golang channel. Let’s go, guys!

Pubsub

pub-sub-messaging.png

  • Trước hết ta sẽ có một định nghĩa đầy đủ từ wiki :
    In software architecture, publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers, but instead categorize published messages into classes without knowledge of which subscribers, if any, there may be.

  • Pubsub là một message pattern mà ở trong đó publisher chỉ việc gửi message và không cần quan tâm đến có subscriber nào nhận hay không và các message sẽ được phân loại và gủi đi mà không cần quan tâm xem có subscribers nào hay không. Publishers và subscribers không biết sự tồn tại của nhau. Ở một số hệ thông pubsub, sẽ có thêm 1 thành phần là broker, nó sẽ được đảm nhiệm phân loại và gửi message.

  • Pubsub hay message queue nói chung được sử dụng khá phổ biến trong micro-service architectures. Nó cung cấp một phương thức giúp các service giao tiếp với nhau một cách bất đồng bộ. Ngoài ra, chúng ta sẽ có một vài use cases for messaging queues in real-world scenarios như là: Sending emails, Data post-processing, Batch updates for databases …

Use case 3: Build pubsub service with buffered channel

  • Dựa vào đặc tính channel là một queue, tôi sẽ có 1 simple demo về pubsub như sau:
    type Message struct {
        topic   string
        content interface{}
    }
    
    type MessageChannel chan Message
    
    func main() {
        maxMessage := 10000
        topic := "update-user"
        
        messageQueue := make(chan Message, maxMessage)
        mapTopicMessage := make(map[string][]MessageChannel) // map[topic][]MessageChannel
    }
    

    messageQueue: chứa danh sách các message sẽ xủ lí.

    MessageChannel: kênh giao tiếp giữa các publisher và subcriber. MessageChannel lúc này sẽ đóng vai trò như một trái tim của hệ thống.

    mapTopicMessage: chứa một bản map giữa topic và danh sách các message channel. Một topic sẽ được subcribe bởi nhiều subcriber nên ta sẽ quan hệ 1:N. Nó đóng vai trò như việc quản lý topic và các message channel.

  • Nguyên lý vận hành:
    messageQueue khi nhận được một message mới, service sẽ lọc ra các danh sách MessageChannel tương ứng với topic của message đó và gửi message mới đến. Mỗi subcriber sẽ communicate with MessageChannel để lấy message.
    ...
    func main() {
        maxMessage := 10000
        topic := "update-user"
    
        messageQueue := make(chan Message, maxMessage)
        mapTopicMessage := make(map[string][]MessageChannel)
        go run(messageQueue, mapTopicMessage)
        
    }
    
    func run(messageQueue chan Message, mapTopicMessage map[string][]MessageChannel) {
        for {
            message := <-messageQueue
    
            listMessageChannel, ok := mapTopicMessage[message.topic]
            if ok {
                for _, messageChannel := range listMessageChannel {
                    messageChannel <- message
                }
            }
    
        }
    }
    
  • Publish message
    func main() {
        maxMessage := 10000
        topic := "update-user"
    
        messageQueue := make(chan Message, maxMessage)
        mapTopicMessage := make(map[string][]MessageChannel)
        go run(messageQueue, mapTopicMessage)
        
        // publish
        publish(messageQueue, topic, "user-name is update to Hung")
        
        time.Sleep(time.Second * 10)
    }
    
    func publish(messageQueue chan Message, topic string, content string) {
        message := Message{
            topic:   topic,
            content: content,
        }
        messageQueue <- message
        fmt.Printf("%v: publish new message with topic: '%v' - content: '%v' n", time.Now().Format("15:04:05"), message.topic, message.content)
    }
    
    result: 
        08:46:28: publish new message with topic: 'update-user' - content: 'user-name is update to Hung'
    
  • Register Subscription
    func main() {
        ...
        // subcribe
        sub1 := registerSubscription(mapTopicMessage, topic)
        ...
    }
    
    func registerSubscription(mapTopicMessage map[string][]MessageChannel, topic string) MessageChannel {
        newMessageChannel := make(MessageChannel)
    
        value, ok := mapTopicMessage[topic]
        if ok {
            value = append(value, newMessageChannel)
            mapTopicMessage[topic] = value
        } else {
            mapTopicMessage[topic] = []MessageChannel{newMessageChannel}
        }
    
        return newMessageChannel
    }
    

    khi có một “register subcription” request, service sẽ trả về một MessageChannel. Subcriber sẽ giao tiếp với MessageChannel đó để nhận message.

  • Subcribe
    func subcribe(messageChannel MessageChannel) {
         go func() {
             for {
                 message := <-messageChannel
                 fmt.Printf("%v: receive new message with topic: '%v' - content: '%v' n", time.Now().Format("15:04:05"), message.topic, message.content)
             }
         }()
     }
    
  • Running and see what happen!
    func main() {
        maxMessage := 10000
        topic := "update-user"
    
        messageQueue := make(chan Message, maxMessage)
        mapTopicMessage := make(map[string][]MessageChannel) // map[topic][]MessageChannel
        go run(messageQueue, mapTopicMessage)
    
        // register subcriptions
        sub1 := registerSubscription(mapTopicMessage, topic)
    
        // publish
        publish(messageQueue, topic, "user-name is update to Hung")
    
        subcribe(sub1)
    
        time.Sleep(time.Second * 10)
    }
    result:
        09:16:02: publish new message with topic: 'update-user' - content: 'user-name is update to Hung'
        09:16:02: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
    
  • Add more subcriber
    func main() {
         ...
         // register subcriptions
         sub1 := registerSubscription(mapTopicMessage, topic)
         sub2 := registerSubscription(mapTopicMessage, topic)
         sub3 := registerSubscription(mapTopicMessage, topic)
         
         // publish
         publish(messageQueue, topic, "user-name is update to Hung")
    
         subcribe(sub1)
         subcribe(sub2)
         subcribe(sub3)
         
         time.Sleep(time.Second * 10)
    }
    result:
         09:20:15: publish new message with topic: 'update-user' - content: 'user-name is update to Hung'
         09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
         09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
         09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
    

Xây dựng một pubsub hoàn chỉnh

  • to be continued …

Tạm kết

  • to be continued …

Nguồn: viblo.asia

Bài viết liên quan

WebP là gì? Hướng dẫn cách để chuyển hình ảnh jpg, png qua webp

WebP là gì? WebP là một định dạng ảnh hiện đại, được phát triển bởi Google

Điểm khác biệt giữa IPv4 và IPv6 là gì?

IPv4 và IPv6 là hai phiên bản của hệ thống địa chỉ Giao thức Internet (IP). IP l

Check nameservers của tên miền xem website trỏ đúng chưa

Tìm hiểu cách check nameservers của tên miền để xác định tên miền đó đang dùn

Mình đang dùng Google Domains để check tên miền hàng ngày

Từ khi thông báo dịch vụ Google Domains bỏ mác Beta, mình mới để ý và bắt đầ