Laravel 13 队列高级应用完全指南
Laravel 队列系统是处理后台任务的强大工具。本文将深入探讨 Laravel 13 队列的高级用法和最佳实践。
队列连接配置
多连接配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| return [ 'default' => env('QUEUE_CONNECTION', 'redis'),
'connections' => [ 'sync' => [ 'driver' => 'sync', ],
'database' => [ 'driver' => 'database', 'table' => 'jobs', 'queue' => 'default', 'retry_after' => 90, ],
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => env('REDIS_QUEUE', 'default'), 'retry_after' => 90, 'block_for' => 5, ],
'sqs' => [ 'driver' => 'sqs', 'key' => env('AWS_ACCESS_KEY_ID'), 'secret' => env('AWS_SECRET_ACCESS_KEY'), 'prefix' => env('SQS_PREFIX', 'https://sqs.us-east-1.amazonaws.com/your-account-id'), 'queue' => env('SQS_QUEUE', 'default'), 'suffix' => env('SQS_SUFFIX'), 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'), ], ], ];
|
高级任务分发
条件链式分发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| use App\Jobs\ProcessVideo; use App\Jobs\OptimizeVideo; use App\Jobs\PublishVideo; use Illuminate\Support\Facades\Bus;
Bus::chain([ new ProcessVideo($video), new OptimizeVideo($video), function () use ($video) { if ($video->needsReview()) { return new ReviewVideo($video); } return new PublishVideo($video); }, ])->dispatch();
|
批量任务高级用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| use App\Jobs\ProcessCsvRow; use Illuminate\Bus\Batch; use Illuminate\Support\Facades\Bus; use Illuminate\Support\Collection;
$batch = Bus::batch( Collection::make($csvData) ->chunk(100) ->map(fn($chunk) => new ProcessCsvRow($chunk)) ->toArray() )->then(function (Batch $batch) { Mail::to($batch->user)->send(new ProcessingComplete()); })->catch(function (Batch $batch, Throwable $e) { Notification::route('mail', 'admin@example.com') ->notify(new BatchFailed($batch, $e)); })->finally(function (Batch $batch) { Log::info("Batch {$batch->id} completed", [ 'processed' => $batch->processedJobs(), 'failed' => $batch->failedJobs, ]); })->onConnection('redis') ->onQueue('processing') ->allowFailures() ->name('Process CSV Import') ->dispatch();
|
批量任务进度追踪
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| <?php
namespace App\Jobs;
use Illuminate\Bus\Batchable; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Support\Facades\Bus;
class ProcessCsvRow implements ShouldQueue { use Batchable, Queueable;
public function __construct( protected array $rows, protected string $batchId ) {}
public function handle(): void { if ($this->batch()?->cancelled()) { return; }
foreach ($this->rows as $row) { $this->processRow($row); }
$this->updateProgress(); }
protected function updateProgress(): void { $batch = $this->batch(); Cache::put("batch:{$batch->id}:progress", [ 'total' => $batch->totalJobs, 'processed' => $batch->processedJobs(), 'failed' => $batch->failedJobs, 'percentage' => round(($batch->processedJobs() / $batch->totalJobs) * 100, 2), ]); }
protected function processRow(array $row): void { } }
|
任务中间件高级用法
限流中间件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| <?php
namespace App\Jobs\Middleware;
use Illuminate\Support\Facades\Redis;
class RateLimited { public function __construct( protected string $key, protected int $limit = 10, protected int $seconds = 1, protected int $releaseAfter = 10 ) {}
public function handle($job, $next): void { Redis::throttle($this->key) ->block(0) ->allow($this->limit) ->every($this->seconds) ->then(function () use ($job, $next) { $next($job); }, function () use ($job) { $job->release($this->releaseAfter); }); } }
|
重试中间件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| <?php
namespace App\Jobs\Middleware;
use Throwable;
class RetryWithBackoff { protected array $backoff = [10, 30, 60, 120, 300];
public function handle($job, $next): void { try { $next($job); } catch (Throwable $e) { $attempts = $job->attempts();
if ($attempts >= count($this->backoff)) { throw $e; }
$delay = $this->backoff[$attempts - 1] ?? end($this->backoff); $job->release($delay); } } }
|
分布式锁中间件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| <?php
namespace App\Jobs\Middleware;
use Illuminate\Support\Facades\Cache;
class DistributedLock { public function __construct( protected string $key, protected int $ttl = 300 ) {}
public function handle($job, $next): void { $lockKey = "lock:{$this->key}"; $lockValue = uniqid();
if (!Cache::add($lockKey, $lockValue, $this->ttl)) { $job->release(10); return; }
try { $next($job); } finally { if (Cache::get($lockKey) === $lockValue) { Cache::forget($lockKey); } } } }
|
队列优先级
动态优先级
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| <?php
namespace App\Services;
use Illuminate\Support\Facades\Redis;
class QueuePriorityService { public function getPriorityQueue(): string { $highQueueLength = Redis::llen('queues:high'); $defaultQueueLength = Redis::llen('queues:default'); $lowQueueLength = Redis::llen('queues:low');
if ($highQueueLength > 0 && $highQueueLength < 100) { return 'high'; }
if ($defaultQueueLength > 0) { return 'default'; }
return 'low'; }
public function dispatchWithPriority($job, string $priority = 'auto'): void { if ($priority === 'auto') { $priority = $this->calculatePriority($job); }
dispatch($job)->onQueue($priority); }
protected function calculatePriority($job): string { if (method_exists($job, 'getPriority')) { return $job->getPriority(); }
return 'default'; } }
|
失败处理
自定义失败处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| <?php
namespace App\Jobs;
use App\Models\FailedJob; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Queue\InteractsWithQueue; use Throwable;
abstract class BaseJob implements ShouldQueue { use InteractsWithQueue, Queueable;
public function failed(?Throwable $exception): void { FailedJob::create([ 'queue' => $this->queue ?? 'default', 'payload' => json_encode([ 'job' => static::class, 'data' => $this->getJobData(), ]), 'exception' => $exception?->getMessage(), 'failed_at' => now(), ]);
$this->onFailure($exception); }
protected function onFailure(?Throwable $exception): void { }
abstract protected function getJobData(): array; }
|
自动重试策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| <?php
namespace App\Jobs;
use App\Jobs\Middleware\RetryWithBackoff; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Queue\InteractsWithQueue;
class SmartRetryJob implements ShouldQueue { use InteractsWithQueue, Queueable;
protected array $retryRules = [ 'connection' => [ 'max_attempts' => 5, 'backoff' => [5, 15, 30, 60, 120], ], 'rate_limit' => [ 'max_attempts' => 3, 'backoff' => [60, 120, 300], ], 'default' => [ 'max_attempts' => 3, 'backoff' => [10, 30, 60], ], ];
public function middleware(): array { return [ new RetryWithBackoff(), ]; }
public function handle(): void { try { $this->execute(); } catch (ConnectionException $e) { $this->handleRetry('connection', $e); } catch (RateLimitException $e) { $this->handleRetry('rate_limit', $e); } catch (Exception $e) { $this->handleRetry('default', $e); } }
protected function handleRetry(string $type, Exception $e): void { $rules = $this->retryRules[$type]; $attempts = $this->attempts();
if ($attempts >= $rules['max_attempts']) { $this->fail($e); return; }
$delay = $rules['backoff'][$attempts - 1] ?? end($rules['backoff']); $this->release($delay); }
abstract protected function execute(): void; }
|
队列监控
队列健康检查
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| <?php
namespace App\Services;
use Illuminate\Support\Facades\Redis; use Illuminate\Support\Facades\Queue;
class QueueHealthService { public function getHealthStatus(): array { $queues = ['high', 'default', 'low']; $status = [];
foreach ($queues as $queue) { $status[$queue] = $this->checkQueue($queue); }
return [ 'status' => $this->determineOverallStatus($status), 'queues' => $status, 'workers' => $this->getWorkerCount(), ]; }
protected function checkQueue(string $queue): array { $size = Queue::size($queue); $failed = $this->getFailedCount($queue);
return [ 'size' => $size, 'failed' => $failed, 'status' => $this->determineQueueStatus($size, $failed), ]; }
protected function determineQueueStatus(int $size, int $failed): string { if ($failed > 100) { return 'critical'; }
if ($size > 1000 || $failed > 10) { return 'warning'; }
return 'healthy'; }
protected function getFailedCount(string $queue): int { return DB::table('failed_jobs') ->where('queue', $queue) ->where('failed_at', '>=', now()->subHour()) ->count(); }
protected function getWorkerCount(): int { return Redis::connection()->command('CLIENT', ['LIST']) ? count(explode("\n", trim($result))) : 0; } }
|
队列指标收集
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| <?php
namespace App\Services;
use Illuminate\Support\Facades\Cache; use Illuminate\Queue\Events\JobProcessed; use Illuminate\Queue\Events\JobFailed;
class QueueMetricsService { protected string $metricsKey = 'queue:metrics';
public function recordProcessed(JobProcessed $event): void { $this->recordMetric('processed', [ 'queue' => $event->job->getQueue(), 'job' => $event->job->resolveName(), 'duration' => $this->calculateDuration($event), ]); }
public function recordFailed(JobFailed $event): void { $this->recordMetric('failed', [ 'queue' => $event->job->getQueue(), 'job' => $event->job->resolveName(), 'exception' => $event->exception->getMessage(), ]); }
public function getMetrics(string $period = 'hour'): array { return Cache::get("{$this->metricsKey}:{$period}", [ 'processed' => 0, 'failed' => 0, 'avg_duration' => 0, ]); }
protected function recordMetric(string $type, array $data): void { $key = "{$this->metricsKey}:hour"; $metrics = Cache::get($key, $this->getDefaultMetrics());
$metrics[$type]++; $metrics['jobs'][] = array_merge($data, [ 'timestamp' => now()->toIso8601String(), ]);
Cache::put($key, $metrics, 3600); }
protected function calculateDuration(JobProcessed $event): float { return microtime(true) - $event->job->payload()['pushedAt']; }
protected function getDefaultMetrics(): array { return [ 'processed' => 0, 'failed' => 0, 'jobs' => [], ]; } }
|
实战案例
邮件批量发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| <?php
namespace App\Jobs;
use App\Models\Campaign; use App\Models\User; use Illuminate\Bus\Batchable; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Support\Facades\Mail; use Illuminate\Support\Facades\Bus;
class SendCampaignEmail implements ShouldQueue { use Batchable, Queueable;
public function __construct( protected Campaign $campaign, protected array $userIds ) {}
public function handle(): void { if ($this->batch()?->cancelled()) { return; }
$users = User::whereIn('id', $this->userIds)->get();
foreach ($users as $user) { Mail::to($user->email) ->queue(new CampaignMail($this->campaign, $user)); }
$this->campaign->increment('sent_count', count($users)); }
public static function dispatchCampaign(Campaign $campaign): void { $userIds = $campaign->targetUsers()->pluck('id')->chunk(100);
Bus::batch( $userIds->map(fn($chunk) => new static($campaign, $chunk->toArray()))->toArray() )->then(function () use ($campaign) { $campaign->update(['status' => 'completed']); })->catch(function () use ($campaign) { $campaign->update(['status' => 'failed']); })->name("Campaign: {$campaign->name}") ->onQueue('emails') ->dispatch(); } }
|
数据同步任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| <?php
namespace App\Jobs;
use App\Services\SyncService; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Queue\InteractsWithQueue;
class SyncDataJob implements ShouldQueue { use InteractsWithQueue, Queueable;
public int $timeout = 3600; public int $tries = 1;
protected int $chunkSize = 1000; protected int $lastId = 0;
public function __construct( protected string $source, protected string $entity, protected array $options = [] ) { $this->lastId = $options['last_id'] ?? 0; }
public function handle(SyncService $syncService): void { $hasMore = true;
while ($hasMore) { if ($this->shouldStop()) { break; }
$data = $syncService->fetch( $this->source, $this->entity, $this->lastId, $this->chunkSize );
if (empty($data)) { $hasMore = false; break; }
$syncService->sync($this->entity, $data);
$this->lastId = end($data)['id']; $this->updateProgress();
if (count($data) < $this->chunkSize) { $hasMore = false; } } }
protected function shouldStop(): bool { return Cache::get("sync:{$this->source}:{$this->entity}:stop", false); }
protected function updateProgress(): void { Cache::put("sync:{$this->source}:{$this->entity}:progress", [ 'last_id' => $this->lastId, 'updated_at' => now()->toIso8601String(), ]); } }
|
总结
Laravel 13 的队列系统提供了:
- 灵活的批量任务处理
- 强大的任务中间件
- 智能的重试策略
- 完善的失败处理
- 实时监控能力
- 分布式锁支持
掌握队列高级用法是构建高性能、可扩展应用的关键。