Laravel 13 队列高级特性深度解析

队列是 Laravel 处理耗时任务的强大工具。本文将深入探讨 Laravel 13 中队列的高级特性。

队列基础回顾

定义任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

public function __construct(
protected Podcast $podcast
) {}

public function handle(): void
{
$this->podcast->process();
}
}

任务链

链式任务

1
2
3
4
5
6
7
8
9
10
11
<?php

use App\Jobs\OptimizePodcast;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastNotification;

ProcessPodcast::withChain([
new OptimizePodcast($podcast),
new ReleasePodcast($podcast),
new SendPodcastNotification($podcast),
])->dispatch();

链式任务特性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

public $tries = 3;
public $maxExceptions = 2;
public $timeout = 120;
public $failOnTimeout = true;

public function handle(): void
{
if ($this->podcast->isProcessed()) {
$this->fail(new \Exception('Podcast already processed'));
return;
}

$this->podcast->process();
}

public function failed(\Throwable $exception): void
{
$this->podcast->update([
'status' => 'failed',
'error' => $exception->getMessage(),
]);
}
}

任务批处理

创建批处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?php

use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;

$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
])->then(function (Batch $batch) {
// 所有任务成功完成
})->catch(function (Batch $batch, \Throwable $e) {
// 检测到第一个任务失败
})->finally(function (Batch $batch) {
// 批处理完成
})->dispatch();

return $batch->id;

批处理任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<?php

namespace App\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ImportCsv implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

public function __construct(
protected int $startRow,
protected int $endRow
) {}

public function handle(): void
{
if ($this->batch()?->cancelled()) {
return;
}

$rows = $this->getRows();

foreach ($rows as $row) {
$this->processRow($row);
}

$this->batch()->add(new ProcessRemaining($rows));
}

protected function getRows(): array
{
return \App\Models\CsvRow::whereBetween('id', [$this->startRow, $this->endRow])->get();
}

protected function processRow($row): void
{
}
}

批处理进度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use Illuminate\Support\Facades\Bus;

class BatchController extends Controller
{
public function show(string $batchId)
{
$batch = Bus::findBatch($batchId);

if (!$batch) {
return response()->json(['error' => 'Batch not found'], 404);
}

return response()->json([
'id' => $batch->id,
'name' => $batch->name,
'total_jobs' => $batch->totalJobs,
'pending_jobs' => $batch->pendingJobs,
'processed_jobs' => $batch->processedJobs,
'failed_jobs' => $batch->failedJobs,
'progress' => $batch->progress(),
'cancelled' => $batch->cancelled(),
'finished_at' => $batch->finishedAt,
]);
}

public function cancel(string $batchId)
{
$batch = Bus::findBatch($batchId);

if ($batch) {
$batch->cancel();
}

return response()->json(['cancelled' => true]);
}
}

任务中间件

定义中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?php

namespace App\Jobs\Middleware;

class RateLimited
{
protected string $key;
protected int $limit;
protected int $releaseAfter;

public function __construct(string $key, int $limit = 5, int $releaseAfter = 5)
{
$this->key = $key;
$this->limit = $limit;
$this->releaseAfter = $releaseAfter;
}

public function handle($job, $next): void
{
$key = "rate_limit:{$this->key}";
$executed = \Cache::get($key, 0);

if ($executed >= $this->limit) {
$job->release($this->releaseAfter);
return;
}

\Cache::increment($key);
\Cache::put($key, \Cache::get($key), 60);

$next($job);
}
}

使用中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php

namespace App\Jobs;

use App\Jobs\Middleware\RateLimited;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessApiRequest implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

public function middleware(): array
{
return [
new RateLimited('api-calls', 10, 10),
new SkipIfBatchCancelled(),
new WithoutOverlapping($this->resourceId),
];
}

public function handle(): void
{
}
}

内置中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
use Illuminate\Queue\SerializesModels;

class ProcessPayment implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

public function middleware(): array
{
return [
(new WithoutOverlapping($this->orderId))
->releaseAfter(60)
->expireAfter(180),

(new ThrottlesExceptions(5, 5))
->by('payment-errors')
->backoff(5),
];
}

public function handle(): void
{
}
}

延迟调度

延迟执行

1
2
3
4
5
6
7
8
9
<?php

use App\Jobs\SendReminder;

SendReminder::dispatch($user)
->delay(now()->addMinutes(30));

ProcessPodcast::dispatch($podcast)
->delay(now()->addHours(2));

特定时间调度

1
2
3
4
5
6
7
8
<?php

use App\Jobs\SendNewsletter;

SendNewsletter::dispatch($newsletter)
->delay(
\Carbon\Carbon::parse('2024-12-25 09:00:00')
);

重试机制

配置重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPayment implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

public $tries = 3;
public $backoff = [10, 30, 60];
public $maxExceptions = 2;

public function handle(): void
{
$result = $this->processPayment();

if (!$result->success) {
if ($this->attempts() < $this->tries) {
$this->release($this->backoff[$this->attempts() - 1] ?? 60);
} else {
$this->fail(new \Exception($result->error));
}
}
}

public function retryUntil(): \DateTime
{
return now()->addHours(2);
}
}

条件重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessApiCall implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

public function handle(): void
{
try {
$response = $this->makeApiCall();

if ($response->status() === 429) {
$this->release(60);
return;
}

if ($response->status() >= 500) {
$this->release($this->attempts() * 10);
return;
}
} catch (\Exception $e) {
if ($this->attempts() >= 3) {
throw $e;
}

$this->release(30);
}
}
}

唯一任务

防止重复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessReport implements ShouldQueue, ShouldBeUnique
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

protected int $reportId;

public $uniqueFor = 3600;

public function __construct(int $reportId)
{
$this->reportId = $reportId;
}

public function uniqueId(): string
{
return (string) $this->reportId;
}

public function handle(): void
{
}
}

唯一锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class GenerateInvoice implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

public function uniqueId(): string
{
return 'invoice:' . $this->orderId;
}
}

任务标签

定义标签

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessOrder implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

public function tags(): array
{
return ['order:' . $this->orderId, 'user:' . $this->userId];
}
}

队列优先级

配置优先级

1
2
3
4
5
6
7
<?php

use App\Jobs\ProcessPayment;
use App\Jobs\SendNotification;

ProcessPayment::dispatch($payment)->onQueue('high');
SendNotification::dispatch($notification)->onQueue('low');

启动 Worker

1
php artisan queue:work --queue=high,default,low

测试队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
<?php

namespace Tests\Feature\Jobs;

use Tests\TestCase;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Support\Facades\Queue;
use Illuminate\Foundation\Testing\RefreshDatabase;

class ProcessPodcastTest extends TestCase
{
use RefreshDatabase;

public function test_job_is_dispatched(): void
{
Queue::fake();

$podcast = Podcast::factory()->create();

ProcessPodcast::dispatch($podcast);

Queue::assertPushed(ProcessPodcast::class, function ($job) use ($podcast) {
return $job->podcast->id === $podcast->id;
});
}

public function test_job_is_dispatched_on_correct_queue(): void
{
Queue::fake();

$podcast = Podcast::factory()->create();

ProcessPodcast::dispatch($podcast)->onQueue('processing');

Queue::assertPushedOn('processing', ProcessPodcast::class);
}

public function test_job_chain(): void
{
Queue::fake();

$podcast = Podcast::factory()->create();

ProcessPodcast::withChain([
new \App\Jobs\OptimizePodcast($podcast),
new \App\Jobs\ReleasePodcast($podcast),
])->dispatch();

Queue::assertPushedWithChain(ProcessPodcast::class, [
\App\Jobs\OptimizePodcast::class,
\App\Jobs\ReleasePodcast::class,
]);
}
}

最佳实践

1. 任务保持轻量

1
2
3
4
5
6
7
8
9
10
11
12
13
<?php

class GoodJob implements ShouldQueue
{
public function handle(): void
{
$items = $this->getItems();

foreach ($items as $item) {
dispatch(new ProcessItem($item));
}
}
}

2. 使用异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<?php

class SafeJob implements ShouldQueue
{
public function handle(): void
{
try {
$this->process();
} catch (\Exception $e) {
$this->handleError($e);

throw $e;
}
}
}

3. 监控队列健康

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\Queue;

class MonitorQueueHealth extends Command
{
protected $signature = 'queue:monitor';

public function handle(): int
{
$size = Queue::size();

if ($size > 1000) {
$this->alert("Queue size is high: {$size}");
}

return 0;
}
}

总结

Laravel 13 的队列系统提供了强大的异步任务处理能力。通过合理使用任务链、批处理、中间件等高级特性,可以构建高效、可靠的后台任务处理系统。