[Golang] Channel trong golang và use case – part II(worker pool pattern)

Mở đầu Tiếp tục series, hôm nay là một buổi chia sẽ của tôi về buffer channel trong GO. Let’s go, guys! Buffered Channel Buffered channel là một channel trong Golang và có khả năng lưu trữ được dữ liệu bên trong nó. Buffered channel không cần receiver goroutiner như unbuffered channel . Tuy nhiên,

Mở đầu

  • Tiếp tục series, hôm nay là một buổi chia sẽ của tôi về buffer channel trong GO. Let’s go, guys!

Buffered Channel

  • Buffered channel là một channel trong Golang và có khả năng lưu trữ được dữ liệu bên trong nó.
  • Buffered channel không cần receiver goroutiner như unbuffered channel . Tuy nhiên, nếu sức chứa của buffered channel vượt qua mức giới hạn, nó sẽ block goroutine hiện tại và cũng cần goroutine khác đến lấy dử liệu ra.
  • Cách buffered channel lưu trữ dữ liệu cũng giông như queue(FIFO).

    image.png

  • Channel được define với “capicity” = 1 thì vẫn được xem là buffered channel.
    func main() {
        bufferedChannel := make(chan int, 1)
        bufferedChannel <- 1 // no deadlock in here
    }
    

Play with buffered channel

  • Send and recevie value
    func SendReceiveValueToBufferChannel() {
        bufferChannel := make(chan int, 10)
    
        /* Send */
        bufferChannel <- 1
        bufferChannel <- 2
        bufferChannel <- 3
    
        /* Receive */
        fmt.Printf("value in channel: %v n", <-bufferChannel)
        fmt.Printf("value in channel: %v n", <-bufferChannel)
        fmt.Printf("value in channel: %v n", <-bufferChannel)
    }
    
    func main() {
        definition.SendReceiveValueToBufferChannel()
    }
    result:
        value in channel: 1 
        value in channel: 2 
        value in channel: 3 
    
  • Send and recevie value with for-range
    func SendReceiveValueToBufferChannelWithForRange() {
        bufferChannel := make(chan int, 10)
    
        /* Send */
        bufferChannel <- 1
        bufferChannel <- 2
        bufferChannel <- 3
    
        /* Receive with for-range */
        for value := range bufferChannel {
            fmt.Printf("value in channel: %v n", value)
        }
    }
    
    func main() {
        definition.SendReceiveValueToBufferChannelWithForRange()
    }
    
    result:
        value in channel: 1 
        value in channel: 2 
        value in channel: 3 
        fatal error: all goroutines are asleep - deadlock!
    

    Mặc định, for-range loop sẽ tiếp tục nhân giá trị từ channel cho đến khi nó closed. Tuy nhiên, lúc này cũng ko có một sender goroutine nào sẽ push dữ liệu vào channel nữa và “for-range loop” vẫn không nhận biết được rằng ko còn sender goroutine nào, dẫn đến deadlock xuất hiện.
    Trong trường hợp này, chúng ta có thể close channel trước vòng lặp for-range. Nó sẽ giúp thông báo rằng channel đã closed và ngừng việc request get value từ channel.

    bufferChannel := make(chan int, 10)
    
    /* Send */
    bufferChannel <- 1
    bufferChannel <- 2
    bufferChannel <- 3
    
    close(bufferChannel)
    
    /* Receive */
    for value := range bufferChannel {
        fmt.Printf("value in channel: %v n", value)
    }
    result:
        value in channel: 1 
        value in channel: 2 
        value in channel: 3 
    
  • Send and recevie value with “value, found := <-bufferChannel
    bufferChannel := make(chan int, 10)
    
    /* Send */
    bufferChannel <- 1
    bufferChannel <- 2
    bufferChannel <- 3
    
    /* Receive 2 */
    for true {
        value, found := <-bufferChannel
        if found {
            fmt.Printf("value in channel: %v n", value)
        } else {
            break
        }
    }
    

Use case 1: Crawl giả lập 10K URLs với 5 workers/goroutines

  • Về bài toán này, chúng ta sẽ đi qua một chút về khái niệm worker pool(thread pool). Nó sẽ bao gồm công việc sẽ được sắp sếp vào queue và sau đó phân phối tới worker pool. Tiếp tục từ worker pool, các công việc sẽ tiếp tục điều phối đến worker và thực hiện nó. Các worker sẽ được xem xét làm sao cho hoạt dộng gần như cùng lúc. Từ đó, thời gian hoàn thành các công việc sẽ được cải thiện đáng kể.
  • Một cách hiểu đơn giản hơn sẽ là bạn có một N công việc, thay vì để một mình bạn làm, bạn chuyển giao nó cho một nhóm người cùng thực hiện.


    Sau đây, là một simple implementation của worker pool pattern trong golang.

    • Từ đặc tính hoạt đông như một hàng đợi, ta sẽ thiết kế danh sách các công việc hoạt động dưa trên một buffer channel.
      func main() {
          const numJobs = 10
          jobs := make(chan int, numJobs)
      
          for j := 1; j <= numJobs; j++ {
              jobs <- j
          }
      }
      
    • Tiếp đến, Goroutine là một ứng cử viên vô cùng sáng giá để ánh xạ hình ảnh worker.
      func worker(id string, jobs <-chan int) {
          for j := range jobs {
              fmt.Println("worker", id, "started  job", j)
              time.Sleep(time.Second)
              fmt.Println("worker", id, "finished job", j)
          }
      }
      
      func main() {
          const numJobs = 10
          jobs := make(chan int, numJobs)
      
          go worker("ti", jobs, results)
          go worker("teo", jobs, results)
          go worker("tun", jobs, results)
      
          for j := 1; j <= numJobs; j++ {
              jobs <- j
          }
          close(jobs)
      }
      

      3 worker “tí”, “tèo”, “tũn” sẽ thay nhau lấy công việc từ job channel và hoàn thành nó (for-range channel). Vậy có công việc nào sẽ bị thưc hiện hai lần từ các worker? Câu trả lời là không! Đặc tính hàng đợi đã giúp giải quyết diều đó, job lấy ra và đồng thời với nghĩa bị removed.
      Ngoài ra, chúng ta cũng cần sleep vài giây ở main goroutine để đợi 3 workers làm việc xong.

      func main() {
          ....
          time.Sleep(time.Second * 5)
      }
      
    • Trong trường hợp, muốn lấy job result ta có thể add thêm một job result channel.
      func worker(id string, jobs <-chan int, results chan<- int) {
         for j := range jobs {
             fmt.Println("worker", id, "started  job", j)
             time.Sleep(time.Second)
             fmt.Println("worker", id, "finished job", j)
             /* expose result */
             results <- j * 2
         }
      }
      
      func main() {
         const numJobs = 10
         jobs := make(chan int, numJobs)
         results := make(chan int, numJobs)
      
         go worker("ti", jobs, results)
         go worker("teo", jobs, results)
         go worker("tun", jobs, results)
      
         for j := 1; j <= numJobs; j++ {
             jobs <- j
         }
         close(jobs)
      
         for a := 1; a <= numJobs; a++ {
              <-results
         }
      }
      

      Ngay tại đây, mặc dù “time.Sleep” đã bị removed nhưng các worker vẫn hoạt động bình thường. Lí do, tại main goroutine chúng ta đã request lấy giá trị từ result channel và GO runtime nhận thấy vẫn có các sender goroutine đang đẩy giá trị vào result channel, từ đó không cho main goroutine kết thúc ngay mà tiếp tục đợi giá trị từ result channel.

  • Từ kiến trúc trên, ta có thể giải quyết bài toán “craw url” bằng cách add thêm 2 worker và update lai data của channel thành một chuỗi là url. Worker sẽ lấy url từ job channel và craw url đó.
    func worker(id string, jobs <-chan string, results chan<- string) {
        ...
    }
    
    func main() {
        const numJobs = 10000
        jobs := make(chan string, numJobs)
        results := make(chan string, numJobs)
    
        go worker("ti", jobs, results)
        go worker("teo", jobs, results)
        go worker("tun", jobs, results)
        go worker("luu-bi", jobs, results)
        go worker("bang-thong", jobs, results)
    
        ...
    }
    

Tạm kết

  • Tại phần implement cho usecase “craw-url“, worker pool chỉ là một phiên bản đơn giản. Bạn hoàn toàn có thể implement một phiên bản hoàn chỉnh hơn và sữ dụng nó cho mục đích của mình.
  • Thanks for reading and I hope you find this useful! See you on next part!

Nguồn: viblo.asia

Bài viết liên quan

WebP là gì? Hướng dẫn cách để chuyển hình ảnh jpg, png qua webp

WebP là gì? WebP là một định dạng ảnh hiện đại, được phát triển bởi Google

Điểm khác biệt giữa IPv4 và IPv6 là gì?

IPv4 và IPv6 là hai phiên bản của hệ thống địa chỉ Giao thức Internet (IP). IP l

Check nameservers của tên miền xem website trỏ đúng chưa

Tìm hiểu cách check nameservers của tên miền để xác định tên miền đó đang dùn

Mình đang dùng Google Domains để check tên miền hàng ngày

Từ khi thông báo dịch vụ Google Domains bỏ mác Beta, mình mới để ý và bắt đầ