Laravel 13 队列优先级完全指南

队列优先级管理是优化任务处理效率的关键。本文将深入探讨 Laravel 13 中队列优先级的各种配置和管理方法。

队列优先级基础

定义队列

1
2
3
4
5
6
7
8
9
10
// config/queue.php
'connections' => [
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => ['high', 'default', 'low'],
'retry_after' => 90,
'block_for' => null,
],
],

任务分发到队列

1
2
3
4
5
6
7
8
9
10
11
12
use App\Jobs\ProcessPayment;
use App\Jobs\SendNotification;
use App\Jobs\CleanupData;

// 高优先级
ProcessPayment::dispatch($payment)->onQueue('high');

// 默认优先级
SendNotification::dispatch($user)->onQueue('default');

// 低优先级
CleanupData::dispatch()->onQueue('low');

Worker 优先级配置

启动 Worker

1
2
3
4
5
6
7
8
# 按优先级顺序处理
php artisan queue:work --queue=high,default,low

# 只处理高优先级队列
php artisan queue:work --queue=high

# 多队列处理
php artisan queue:work redis --queue=high,default

Supervisor 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[program:laravel-worker-high]
command=php /var/www/app/artisan queue:work redis --queue=high --sleep=3 --tries=3
process_name=%(program_name)s_%(process_num)02d
numprocs=5
autostart=true
autorestart=true

[program:laravel-worker-default]
command=php /var/www/app/artisan queue:work redis --queue=default --sleep=3 --tries=3
numprocs=3
autostart=true
autorestart=true

[program:laravel-worker-low]
command=php /var/www/app/artisan queue:work redis --queue=low --sleep=3 --tries=3
numprocs=2
autostart=true
autorestart=true

动态优先级

优先级服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
<?php

namespace App\Services;

use Illuminate\Support\Facades\Redis;

class QueuePriorityService
{
protected array $priorities = [
'critical' => 100,
'high' => 75,
'default' => 50,
'low' => 25,
];

public function dispatchWithPriority($job, string $priority = 'default'): void
{
$queue = $this->resolveQueue($priority);
dispatch($job)->onQueue($queue);
}

public function resolveQueue(string $priority): string
{
return match ($priority) {
'critical' => 'high',
'high' => 'high',
'default' => 'default',
'low' => 'low',
default => 'default',
};
}

public function getQueueStats(): array
{
$stats = [];

foreach (['high', 'default', 'low'] as $queue) {
$stats[$queue] = [
'size' => Redis::llen("queues:{$queue}"),
'priority' => $this->priorities[$queue] ?? 50,
];
}

return $stats;
}

public function suggestWorkerDistribution(): array
{
$stats = $this->getQueueStats();
$totalWorkers = 10;

$totalJobs = array_sum(array_column($stats, 'size'));

if ($totalJobs === 0) {
return ['high' => 2, 'default' => 5, 'low' => 3];
}

$distribution = [];

foreach ($stats as $queue => $data) {
$weight = $data['priority'] * ($data['size'] / $totalJobs);
$distribution[$queue] = max(1, (int) ($totalWorkers * $weight / 100));
}

return $distribution;
}
}

动态队列选择

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?php

namespace App\Jobs;

use App\Services\QueuePriorityService;

trait DynamicPriority
{
protected function determineQueue(): string
{
$priority = app(QueuePriorityService::class);

if ($this->isUrgent()) {
return $priority->resolveQueue('critical');
}

if ($this->isHeavy()) {
return $priority->resolveQueue('low');
}

return $priority->resolveQueue('default');
}

protected function isUrgent(): bool
{
return false;
}

protected function isHeavy(): bool
{
return false;
}
}

优先级中间件

优先级调整中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<?php

namespace App\Jobs\Middleware;

use Illuminate\Support\Facades\Cache;

class PriorityAdjustment
{
public function handle($job, $next): void
{
$queue = $job->job->getQueue();
$priority = $this->getPriority($queue);

if ($this->shouldElevate($job, $priority)) {
$this->elevatePriority($job);
}

$next($job);
}

protected function getPriority(string $queue): int
{
return match ($queue) {
'high' => 100,
'default' => 50,
'low' => 25,
default => 50,
};
}

protected function shouldElevate($job, int $priority): bool
{
$attempts = $job->attempts();

return $attempts >= 3 && $priority < 100;
}

protected function elevatePriority($job): void
{
$job->job->setQueue('high');
}
}

队列路由

路由服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?php

namespace App\Services;

class QueueRoutingService
{
protected array $routes = [];

public function register(string $jobClass, string $queue): self
{
$this->routes[$jobClass] = $queue;
return $this;
}

public function getQueue(string $jobClass): string
{
return $this->routes[$jobClass] ?? 'default';
}

public function route($job): string
{
$jobClass = get_class($job);

if (method_exists($job, 'getQueue')) {
return $job->getQueue();
}

return $this->getQueue($jobClass);
}
}

基于属性的路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php

namespace App\Attributes;

use Attribute;

#[Attribute(Attribute::TARGET_CLASS)]
class QueuePriority
{
public function __construct(
public string $queue = 'default',
public int $priority = 50
) {}
}

// 使用
use App\Attributes\QueuePriority;

#[QueuePriority(queue: 'high', priority: 100)]
class ProcessPayment
{
//
}

优先级监控

监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
<?php

namespace App\Services;

use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Cache;

class QueueMonitorService
{
public function getPriorityMetrics(): array
{
return [
'queues' => $this->getQueueMetrics(),
'wait_times' => $this->getWaitTimes(),
'throughput' => $this->getThroughput(),
];
}

protected function getQueueMetrics(): array
{
$queues = ['high', 'default', 'low'];
$metrics = [];

foreach ($queues as $queue) {
$metrics[$queue] = [
'pending' => DB::table('jobs')->where('queue', $queue)->count(),
'oldest_job_age' => $this->getOldestJobAge($queue),
];
}

return $metrics;
}

protected function getWaitTimes(): array
{
return Cache::get('queue:wait_times', [
'high' => 0,
'default' => 0,
'low' => 0,
]);
}

protected function getThroughput(): array
{
return Cache::get('queue:throughput', [
'high' => 0,
'default' => 0,
'low' => 0,
]);
}

protected function getOldestJobAge(string $queue): int
{
$oldest = DB::table('jobs')
->where('queue', $queue)
->orderBy('created_at')
->first();

if (!$oldest) {
return 0;
}

return now()->diffInSeconds($oldest->created_at);
}
}

总结

Laravel 13 的队列优先级管理提供了:

  • 多队列优先级配置
  • Worker 优先级处理
  • 动态优先级调整
  • 优先级中间件
  • 队列路由机制
  • 优先级监控统计

合理配置队列优先级可以优化任务处理效率,确保重要任务优先执行。