Laravel 13 队列优先级完全指南
队列优先级管理是优化任务处理效率的关键。本文将深入探讨 Laravel 13 中队列优先级的各种配置和管理方法。
队列优先级基础
定义队列
1 2 3 4 5 6 7 8 9 10
| 'connections' => [ 'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => ['high', 'default', 'low'], 'retry_after' => 90, 'block_for' => null, ], ],
|
任务分发到队列
1 2 3 4 5 6 7 8 9 10 11 12
| use App\Jobs\ProcessPayment; use App\Jobs\SendNotification; use App\Jobs\CleanupData;
ProcessPayment::dispatch($payment)->onQueue('high');
SendNotification::dispatch($user)->onQueue('default');
CleanupData::dispatch()->onQueue('low');
|
Worker 优先级配置
启动 Worker
1 2 3 4 5 6 7 8
| php artisan queue:work --queue=high,default,low
php artisan queue:work --queue=high
php artisan queue:work redis --queue=high,default
|
Supervisor 配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| [program:laravel-worker-high] command=php /var/www/app/artisan queue:work redis --queue=high --sleep=3 --tries=3 process_name=%(program_name)s_%(process_num)02d numprocs=5 autostart=true autorestart=true
[program:laravel-worker-default] command=php /var/www/app/artisan queue:work redis --queue=default --sleep=3 --tries=3 numprocs=3 autostart=true autorestart=true
[program:laravel-worker-low] command=php /var/www/app/artisan queue:work redis --queue=low --sleep=3 --tries=3 numprocs=2 autostart=true autorestart=true
|
动态优先级
优先级服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| <?php
namespace App\Services;
use Illuminate\Support\Facades\Redis;
class QueuePriorityService { protected array $priorities = [ 'critical' => 100, 'high' => 75, 'default' => 50, 'low' => 25, ];
public function dispatchWithPriority($job, string $priority = 'default'): void { $queue = $this->resolveQueue($priority); dispatch($job)->onQueue($queue); }
public function resolveQueue(string $priority): string { return match ($priority) { 'critical' => 'high', 'high' => 'high', 'default' => 'default', 'low' => 'low', default => 'default', }; }
public function getQueueStats(): array { $stats = [];
foreach (['high', 'default', 'low'] as $queue) { $stats[$queue] = [ 'size' => Redis::llen("queues:{$queue}"), 'priority' => $this->priorities[$queue] ?? 50, ]; }
return $stats; }
public function suggestWorkerDistribution(): array { $stats = $this->getQueueStats(); $totalWorkers = 10;
$totalJobs = array_sum(array_column($stats, 'size'));
if ($totalJobs === 0) { return ['high' => 2, 'default' => 5, 'low' => 3]; }
$distribution = [];
foreach ($stats as $queue => $data) { $weight = $data['priority'] * ($data['size'] / $totalJobs); $distribution[$queue] = max(1, (int) ($totalWorkers * $weight / 100)); }
return $distribution; } }
|
动态队列选择
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| <?php
namespace App\Jobs;
use App\Services\QueuePriorityService;
trait DynamicPriority { protected function determineQueue(): string { $priority = app(QueuePriorityService::class);
if ($this->isUrgent()) { return $priority->resolveQueue('critical'); }
if ($this->isHeavy()) { return $priority->resolveQueue('low'); }
return $priority->resolveQueue('default'); }
protected function isUrgent(): bool { return false; }
protected function isHeavy(): bool { return false; } }
|
优先级中间件
优先级调整中间件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| <?php
namespace App\Jobs\Middleware;
use Illuminate\Support\Facades\Cache;
class PriorityAdjustment { public function handle($job, $next): void { $queue = $job->job->getQueue(); $priority = $this->getPriority($queue);
if ($this->shouldElevate($job, $priority)) { $this->elevatePriority($job); }
$next($job); }
protected function getPriority(string $queue): int { return match ($queue) { 'high' => 100, 'default' => 50, 'low' => 25, default => 50, }; }
protected function shouldElevate($job, int $priority): bool { $attempts = $job->attempts();
return $attempts >= 3 && $priority < 100; }
protected function elevatePriority($job): void { $job->job->setQueue('high'); } }
|
队列路由
路由服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| <?php
namespace App\Services;
class QueueRoutingService { protected array $routes = [];
public function register(string $jobClass, string $queue): self { $this->routes[$jobClass] = $queue; return $this; }
public function getQueue(string $jobClass): string { return $this->routes[$jobClass] ?? 'default'; }
public function route($job): string { $jobClass = get_class($job);
if (method_exists($job, 'getQueue')) { return $job->getQueue(); }
return $this->getQueue($jobClass); } }
|
基于属性的路由
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| <?php
namespace App\Attributes;
use Attribute;
#[Attribute(Attribute::TARGET_CLASS)] class QueuePriority { public function __construct( public string $queue = 'default', public int $priority = 50 ) {} }
use App\Attributes\QueuePriority;
#[QueuePriority(queue: 'high', priority: 100)] class ProcessPayment { }
|
优先级监控
监控服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| <?php
namespace App\Services;
use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\Cache;
class QueueMonitorService { public function getPriorityMetrics(): array { return [ 'queues' => $this->getQueueMetrics(), 'wait_times' => $this->getWaitTimes(), 'throughput' => $this->getThroughput(), ]; }
protected function getQueueMetrics(): array { $queues = ['high', 'default', 'low']; $metrics = [];
foreach ($queues as $queue) { $metrics[$queue] = [ 'pending' => DB::table('jobs')->where('queue', $queue)->count(), 'oldest_job_age' => $this->getOldestJobAge($queue), ]; }
return $metrics; }
protected function getWaitTimes(): array { return Cache::get('queue:wait_times', [ 'high' => 0, 'default' => 0, 'low' => 0, ]); }
protected function getThroughput(): array { return Cache::get('queue:throughput', [ 'high' => 0, 'default' => 0, 'low' => 0, ]); }
protected function getOldestJobAge(string $queue): int { $oldest = DB::table('jobs') ->where('queue', $queue) ->orderBy('created_at') ->first();
if (!$oldest) { return 0; }
return now()->diffInSeconds($oldest->created_at); } }
|
总结
Laravel 13 的队列优先级管理提供了:
- 多队列优先级配置
- Worker 优先级处理
- 动态优先级调整
- 优先级中间件
- 队列路由机制
- 优先级监控统计
合理配置队列优先级可以优化任务处理效率,确保重要任务优先执行。