Laravel 13 队列高级特性深度解析
队列是 Laravel 处理耗时任务的强大工具。本文将深入探讨 Laravel 13 中队列的高级特性。
队列基础回顾
定义任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| <?php
namespace App\Jobs;
use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct( protected Podcast $podcast ) {}
public function handle(): void { $this->podcast->process(); } }
|
任务链
链式任务
1 2 3 4 5 6 7 8 9 10 11
| <?php
use App\Jobs\OptimizePodcast; use App\Jobs\ReleasePodcast; use App\Jobs\SendPodcastNotification;
ProcessPodcast::withChain([ new OptimizePodcast($podcast), new ReleasePodcast($podcast), new SendPodcastNotification($podcast), ])->dispatch();
|
链式任务特性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| <?php
namespace App\Jobs;
use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public $tries = 3; public $maxExceptions = 2; public $timeout = 120; public $failOnTimeout = true;
public function handle(): void { if ($this->podcast->isProcessed()) { $this->fail(new \Exception('Podcast already processed')); return; }
$this->podcast->process(); }
public function failed(\Throwable $exception): void { $this->podcast->update([ 'status' => 'failed', 'error' => $exception->getMessage(), ]); } }
|
任务批处理
创建批处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| <?php
use App\Jobs\ImportCsv; use Illuminate\Bus\Batch; use Illuminate\Support\Facades\Bus;
$batch = Bus::batch([ new ImportCsv(1, 100), new ImportCsv(101, 200), new ImportCsv(201, 300), ])->then(function (Batch $batch) { })->catch(function (Batch $batch, \Throwable $e) { })->finally(function (Batch $batch) { })->dispatch();
return $batch->id;
|
批处理任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| <?php
namespace App\Jobs;
use Illuminate\Bus\Batchable; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels;
class ImportCsv implements ShouldQueue { use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct( protected int $startRow, protected int $endRow ) {}
public function handle(): void { if ($this->batch()?->cancelled()) { return; }
$rows = $this->getRows();
foreach ($rows as $row) { $this->processRow($row); }
$this->batch()->add(new ProcessRemaining($rows)); }
protected function getRows(): array { return \App\Models\CsvRow::whereBetween('id', [$this->startRow, $this->endRow])->get(); }
protected function processRow($row): void { } }
|
批处理进度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| <?php
namespace App\Http\Controllers;
use Illuminate\Http\Request; use Illuminate\Support\Facades\Bus;
class BatchController extends Controller { public function show(string $batchId) { $batch = Bus::findBatch($batchId);
if (!$batch) { return response()->json(['error' => 'Batch not found'], 404); }
return response()->json([ 'id' => $batch->id, 'name' => $batch->name, 'total_jobs' => $batch->totalJobs, 'pending_jobs' => $batch->pendingJobs, 'processed_jobs' => $batch->processedJobs, 'failed_jobs' => $batch->failedJobs, 'progress' => $batch->progress(), 'cancelled' => $batch->cancelled(), 'finished_at' => $batch->finishedAt, ]); }
public function cancel(string $batchId) { $batch = Bus::findBatch($batchId);
if ($batch) { $batch->cancel(); }
return response()->json(['cancelled' => true]); } }
|
任务中间件
定义中间件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| <?php
namespace App\Jobs\Middleware;
class RateLimited { protected string $key; protected int $limit; protected int $releaseAfter;
public function __construct(string $key, int $limit = 5, int $releaseAfter = 5) { $this->key = $key; $this->limit = $limit; $this->releaseAfter = $releaseAfter; }
public function handle($job, $next): void { $key = "rate_limit:{$this->key}"; $executed = \Cache::get($key, 0);
if ($executed >= $this->limit) { $job->release($this->releaseAfter); return; }
\Cache::increment($key); \Cache::put($key, \Cache::get($key), 60);
$next($job); } }
|
使用中间件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| <?php
namespace App\Jobs;
use App\Jobs\Middleware\RateLimited; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels;
class ProcessApiRequest implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function middleware(): array { return [ new RateLimited('api-calls', 10, 10), new SkipIfBatchCancelled(), new WithoutOverlapping($this->resourceId), ]; }
public function handle(): void { } }
|
内置中间件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| <?php
namespace App\Jobs;
use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\Middleware\WithoutOverlapping; use Illuminate\Queue\Middleware\ThrottlesExceptions; use Illuminate\Queue\SerializesModels;
class ProcessPayment implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function middleware(): array { return [ (new WithoutOverlapping($this->orderId)) ->releaseAfter(60) ->expireAfter(180),
(new ThrottlesExceptions(5, 5)) ->by('payment-errors') ->backoff(5), ]; }
public function handle(): void { } }
|
延迟调度
延迟执行
1 2 3 4 5 6 7 8 9
| <?php
use App\Jobs\SendReminder;
SendReminder::dispatch($user) ->delay(now()->addMinutes(30));
ProcessPodcast::dispatch($podcast) ->delay(now()->addHours(2));
|
特定时间调度
1 2 3 4 5 6 7 8
| <?php
use App\Jobs\SendNewsletter;
SendNewsletter::dispatch($newsletter) ->delay( \Carbon\Carbon::parse('2024-12-25 09:00:00') );
|
重试机制
配置重试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| <?php
namespace App\Jobs;
use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels;
class ProcessPayment implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public $tries = 3; public $backoff = [10, 30, 60]; public $maxExceptions = 2;
public function handle(): void { $result = $this->processPayment();
if (!$result->success) { if ($this->attempts() < $this->tries) { $this->release($this->backoff[$this->attempts() - 1] ?? 60); } else { $this->fail(new \Exception($result->error)); } } }
public function retryUntil(): \DateTime { return now()->addHours(2); } }
|
条件重试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| <?php
namespace App\Jobs;
use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels;
class ProcessApiCall implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function handle(): void { try { $response = $this->makeApiCall();
if ($response->status() === 429) { $this->release(60); return; }
if ($response->status() >= 500) { $this->release($this->attempts() * 10); return; } } catch (\Exception $e) { if ($this->attempts() >= 3) { throw $e; }
$this->release(30); } } }
|
唯一任务
防止重复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| <?php
namespace App\Jobs;
use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldBeUnique; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels;
class ProcessReport implements ShouldQueue, ShouldBeUnique { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected int $reportId;
public $uniqueFor = 3600;
public function __construct(int $reportId) { $this->reportId = $reportId; }
public function uniqueId(): string { return (string) $this->reportId; }
public function handle(): void { } }
|
唯一锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| <?php
namespace App\Jobs;
use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels;
class GenerateInvoice implements ShouldQueue, ShouldBeUniqueUntilProcessing { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function uniqueId(): string { return 'invoice:' . $this->orderId; } }
|
任务标签
定义标签
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| <?php
namespace App\Jobs;
use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels;
class ProcessOrder implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function tags(): array { return ['order:' . $this->orderId, 'user:' . $this->userId]; } }
|
队列优先级
配置优先级
1 2 3 4 5 6 7
| <?php
use App\Jobs\ProcessPayment; use App\Jobs\SendNotification;
ProcessPayment::dispatch($payment)->onQueue('high'); SendNotification::dispatch($notification)->onQueue('low');
|
启动 Worker
1
| php artisan queue:work --queue=high,default,low
|
测试队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| <?php
namespace Tests\Feature\Jobs;
use Tests\TestCase; use App\Jobs\ProcessPodcast; use App\Models\Podcast; use Illuminate\Support\Facades\Queue; use Illuminate\Foundation\Testing\RefreshDatabase;
class ProcessPodcastTest extends TestCase { use RefreshDatabase;
public function test_job_is_dispatched(): void { Queue::fake();
$podcast = Podcast::factory()->create();
ProcessPodcast::dispatch($podcast);
Queue::assertPushed(ProcessPodcast::class, function ($job) use ($podcast) { return $job->podcast->id === $podcast->id; }); }
public function test_job_is_dispatched_on_correct_queue(): void { Queue::fake();
$podcast = Podcast::factory()->create();
ProcessPodcast::dispatch($podcast)->onQueue('processing');
Queue::assertPushedOn('processing', ProcessPodcast::class); }
public function test_job_chain(): void { Queue::fake();
$podcast = Podcast::factory()->create();
ProcessPodcast::withChain([ new \App\Jobs\OptimizePodcast($podcast), new \App\Jobs\ReleasePodcast($podcast), ])->dispatch();
Queue::assertPushedWithChain(ProcessPodcast::class, [ \App\Jobs\OptimizePodcast::class, \App\Jobs\ReleasePodcast::class, ]); } }
|
最佳实践
1. 任务保持轻量
1 2 3 4 5 6 7 8 9 10 11 12 13
| <?php
class GoodJob implements ShouldQueue { public function handle(): void { $items = $this->getItems();
foreach ($items as $item) { dispatch(new ProcessItem($item)); } } }
|
2. 使用异常处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <?php
class SafeJob implements ShouldQueue { public function handle(): void { try { $this->process(); } catch (\Exception $e) { $this->handleError($e);
throw $e; } } }
|
3. 监控队列健康
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| <?php
namespace App\Console\Commands;
use Illuminate\Console\Command; use Illuminate\Support\Facades\Queue;
class MonitorQueueHealth extends Command { protected $signature = 'queue:monitor';
public function handle(): int { $size = Queue::size();
if ($size > 1000) { $this->alert("Queue size is high: {$size}"); }
return 0; } }
|
总结
Laravel 13 的队列系统提供了强大的异步任务处理能力。通过合理使用任务链、批处理、中间件等高级特性,可以构建高效、可靠的后台任务处理系统。