[Laravel] Cách vận hành job queue

Giới thiệu Đã bao giờ bạn tự hỏi, queue laravel thật sự hoạt động như thế nào chưa , vừa rồi có dịp tìm hiểu nên mình cũng muốn giới thiệu chi tiết tới mọi người luôn Nhóm để chạy queue:work WorkCommand Là class sẽ đăng kí comman chạy cho laravel Worker 1 đơn vị

Giới thiệu

Đã bao giờ bạn tự hỏi, queue laravel thật sự hoạt động như thế nào chưa 😄, vừa rồi có dịp tìm hiểu nên mình cũng muốn giới thiệu chi tiết tới mọi người luôn

Nhóm để chạy queue:work

WorkCommand

Là class sẽ đăng kí comman chạy cho laravel

Worker

1 đơn vị xử lí, nơi tạo vòng lặp xử lí, bóc tách job ra khỏi queue để xử lí (có thể hiểu giống như 1 process của CPU)

    /**
     * Listen to the given queue in a loop.
     *
     * @param  string  $connectionName
     * @param  string  $queue
     * @param  IlluminateQueueWorkerOptions  $options
     * @return int
     */
    public function daemon($connectionName, $queue, WorkerOptions $options)
    {
        if ($this->supportsAsyncSignals()) {
            $this->listenForSignals();
        }

        $lastRestart = $this->getTimestampOfLastQueueRestart();

        [$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0];

        while (true) {
            // Before reserving any jobs, we will make sure this queue is not paused and
            // if it is we will just pause this worker for a given amount of time and
            // make sure we do not need to kill this worker process off completely.
            if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
                $status = $this->pauseWorker($options, $lastRestart);

                if (! is_null($status)) {
                    return $this->stop($status);
                }

                continue;
            }

            // First, we will attempt to get the next job off of the queue. We will also
            // register the timeout handler and reset the alarm for this job so it is
            // not stuck in a frozen state forever. Then, we can fire off this job.
            $job = $this->getNextJob(
                $this->manager->connection($connectionName), $queue
            );

            if ($this->supportsAsyncSignals()) {
                $this->registerTimeoutHandler($job, $options);
            }

            // If the daemon should run (not in maintenance mode, etc.), then we can run
            // fire off this job for processing. Otherwise, we will need to sleep the
            // worker so no more jobs are processed until they should be processed.
            if ($job) {
                $jobsProcessed++;

                $this->runJob($job, $connectionName, $options);

                if ($options->rest > 0) {
                    $this->sleep($options->rest);
                }
            } else {
                $this->sleep($options->sleep);
            }

            if ($this->supportsAsyncSignals()) {
                $this->resetTimeoutHandler();
            }

            // Finally, we will check to see if we have exceeded our memory limits or if
            // the queue should restart based on other indications. If so, we'll stop
            // this worker and let whatever is "monitoring" it restart the process.
            $status = $this->stopIfNecessary(
                $options, $lastRestart, $startTime, $jobsProcessed, $job
            );

            if (! is_null($status)) {
                return $this->stop($status);
            }
        }
    }
  • Trong woker tạo một vòng lặp liên tục
  • Nếu queue bị pause thì stop job lại
  • Lấy job tiếp theo từ database job (có thể là database, redis, …)
  • Nếu có job sẽ tiến hành xử lí
  • Xử lí xong job, sleep 1 khoảng thời gian, rồi tiến hành chạy lại vòng lặp lấy job ra chạy tiếp

Queue

Nơi tạo load define job, payload, event, setting connection để lưu trữ queue

- createPayload
- createObjectPayload
- getDisplayName
- getJobBackoff
- getJobExpiration
- createStringPayload
- pushOn
- laterOn
- bulk

Job

Đơn vị nhỏ nhất chứa công việc cần được xử lí, sẽ được đẩy vào queue và xử lí dần dần

abstract class Job
public function uuid()
public function fire()
public function delete()
public function isDeleted()
public function release($delay = 0)
protected function failed($e)

Nhóm của queue:listen

Thường được dùng để chạy command cho việc phát triển ở local, giống queue:work phía trên nhưng command này giúp lắng nghe thay đổi của source code để reload lại queue để apply code mới của chúng ta mỗi lần chúng ta edit

ListenCommand

Nơi define command cho queue:listen

class Listener

Nơi run process cho command queue:listen, như ta thấy cũng giống trên kia, nó chạy bằng 1 vòng while (true)

    /**
     * Listen to the given queue connection.
     *
     * @param  string  $connection
     * @param  string  $queue
     * @param  IlluminateQueueListenerOptions  $options
     * @return void
     */
    public function listen($connection, $queue, ListenerOptions $options)
    {
        $process = $this->makeProcess($connection, $queue, $options);

        while (true) {
            $this->runProcess($process, $options->memory);
        }
    }

Cách supervisor mở các queue để chạy

Dùng supervisor, ta có thể mở số lượng woker tuỳ ý, phù hợp với yêu cầu business và cấu hình server, thông thường đối với 1 server tầm trung mình nghĩ mở tầm 8 => 10 woker là đủ

Mình đã mô tả sơ qua cách vận hành của job và worker trong laravel rồi nhé, hy vọng sẽ giúp ích được cho các bạn 😀

Nguồn: viblo.asia

Bài viết liên quan

WebP là gì? Hướng dẫn cách để chuyển hình ảnh jpg, png qua webp

WebP là gì? WebP là một định dạng ảnh hiện đại, được phát triển bởi Google

Điểm khác biệt giữa IPv4 và IPv6 là gì?

IPv4 và IPv6 là hai phiên bản của hệ thống địa chỉ Giao thức Internet (IP). IP l

Check nameservers của tên miền xem website trỏ đúng chưa

Tìm hiểu cách check nameservers của tên miền để xác định tên miền đó đang dùn

Mình đang dùng Google Domains để check tên miền hàng ngày

Từ khi thông báo dịch vụ Google Domains bỏ mác Beta, mình mới để ý và bắt đầ