Laravel 13 队列高级应用完全指南

Laravel 队列系统是处理后台任务的强大工具。本文将深入探讨 Laravel 13 队列的高级用法和最佳实践。

队列连接配置

多连接配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// config/queue.php
return [
'default' => env('QUEUE_CONNECTION', 'redis'),

'connections' => [
'sync' => [
'driver' => 'sync',
],

'database' => [
'driver' => 'database',
'table' => 'jobs',
'queue' => 'default',
'retry_after' => 90,
],

'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => 90,
'block_for' => 5,
],

'sqs' => [
'driver' => 'sqs',
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'prefix' => env('SQS_PREFIX', 'https://sqs.us-east-1.amazonaws.com/your-account-id'),
'queue' => env('SQS_QUEUE', 'default'),
'suffix' => env('SQS_SUFFIX'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
],
],
];

高级任务分发

条件链式分发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use App\Jobs\ProcessVideo;
use App\Jobs\OptimizeVideo;
use App\Jobs\PublishVideo;
use Illuminate\Support\Facades\Bus;

Bus::chain([
new ProcessVideo($video),
new OptimizeVideo($video),
function () use ($video) {
if ($video->needsReview()) {
return new ReviewVideo($video);
}
return new PublishVideo($video);
},
])->dispatch();

批量任务高级用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use App\Jobs\ProcessCsvRow;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Collection;

$batch = Bus::batch(
Collection::make($csvData)
->chunk(100)
->map(fn($chunk) => new ProcessCsvRow($chunk))
->toArray()
)->then(function (Batch $batch) {
Mail::to($batch->user)->send(new ProcessingComplete());
})->catch(function (Batch $batch, Throwable $e) {
Notification::route('mail', 'admin@example.com')
->notify(new BatchFailed($batch, $e));
})->finally(function (Batch $batch) {
Log::info("Batch {$batch->id} completed", [
'processed' => $batch->processedJobs(),
'failed' => $batch->failedJobs,
]);
})->onConnection('redis')
->onQueue('processing')
->allowFailures()
->name('Process CSV Import')
->dispatch();

批量任务进度追踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<?php

namespace App\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Support\Facades\Bus;

class ProcessCsvRow implements ShouldQueue
{
use Batchable, Queueable;

public function __construct(
protected array $rows,
protected string $batchId
) {}

public function handle(): void
{
if ($this->batch()?->cancelled()) {
return;
}

foreach ($this->rows as $row) {
$this->processRow($row);
}

$this->updateProgress();
}

protected function updateProgress(): void
{
$batch = $this->batch();

Cache::put("batch:{$batch->id}:progress", [
'total' => $batch->totalJobs,
'processed' => $batch->processedJobs(),
'failed' => $batch->failedJobs,
'percentage' => round(($batch->processedJobs() / $batch->totalJobs) * 100, 2),
]);
}

protected function processRow(array $row): void
{
// 处理单行数据
}
}

任务中间件高级用法

限流中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php

namespace App\Jobs\Middleware;

use Illuminate\Support\Facades\Redis;

class RateLimited
{
public function __construct(
protected string $key,
protected int $limit = 10,
protected int $seconds = 1,
protected int $releaseAfter = 10
) {}

public function handle($job, $next): void
{
Redis::throttle($this->key)
->block(0)
->allow($this->limit)
->every($this->seconds)
->then(function () use ($job, $next) {
$next($job);
}, function () use ($job) {
$job->release($this->releaseAfter);
});
}
}

重试中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?php

namespace App\Jobs\Middleware;

use Throwable;

class RetryWithBackoff
{
protected array $backoff = [10, 30, 60, 120, 300];

public function handle($job, $next): void
{
try {
$next($job);
} catch (Throwable $e) {
$attempts = $job->attempts();

if ($attempts >= count($this->backoff)) {
throw $e;
}

$delay = $this->backoff[$attempts - 1] ?? end($this->backoff);
$job->release($delay);
}
}
}

分布式锁中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?php

namespace App\Jobs\Middleware;

use Illuminate\Support\Facades\Cache;

class DistributedLock
{
public function __construct(
protected string $key,
protected int $ttl = 300
) {}

public function handle($job, $next): void
{
$lockKey = "lock:{$this->key}";
$lockValue = uniqid();

if (!Cache::add($lockKey, $lockValue, $this->ttl)) {
$job->release(10);
return;
}

try {
$next($job);
} finally {
if (Cache::get($lockKey) === $lockValue) {
Cache::forget($lockKey);
}
}
}
}

队列优先级

动态优先级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?php

namespace App\Services;

use Illuminate\Support\Facades\Redis;

class QueuePriorityService
{
public function getPriorityQueue(): string
{
$highQueueLength = Redis::llen('queues:high');
$defaultQueueLength = Redis::llen('queues:default');
$lowQueueLength = Redis::llen('queues:low');

if ($highQueueLength > 0 && $highQueueLength < 100) {
return 'high';
}

if ($defaultQueueLength > 0) {
return 'default';
}

return 'low';
}

public function dispatchWithPriority($job, string $priority = 'auto'): void
{
if ($priority === 'auto') {
$priority = $this->calculatePriority($job);
}

dispatch($job)->onQueue($priority);
}

protected function calculatePriority($job): string
{
if (method_exists($job, 'getPriority')) {
return $job->getPriority();
}

return 'default';
}
}

失败处理

自定义失败处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?php

namespace App\Jobs;

use App\Models\FailedJob;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Throwable;

abstract class BaseJob implements ShouldQueue
{
use InteractsWithQueue, Queueable;

public function failed(?Throwable $exception): void
{
FailedJob::create([
'queue' => $this->queue ?? 'default',
'payload' => json_encode([
'job' => static::class,
'data' => $this->getJobData(),
]),
'exception' => $exception?->getMessage(),
'failed_at' => now(),
]);

$this->onFailure($exception);
}

protected function onFailure(?Throwable $exception): void
{
// 子类可重写此方法
}

abstract protected function getJobData(): array;
}

自动重试策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
<?php

namespace App\Jobs;

use App\Jobs\Middleware\RetryWithBackoff;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;

class SmartRetryJob implements ShouldQueue
{
use InteractsWithQueue, Queueable;

protected array $retryRules = [
'connection' => [
'max_attempts' => 5,
'backoff' => [5, 15, 30, 60, 120],
],
'rate_limit' => [
'max_attempts' => 3,
'backoff' => [60, 120, 300],
],
'default' => [
'max_attempts' => 3,
'backoff' => [10, 30, 60],
],
];

public function middleware(): array
{
return [
new RetryWithBackoff(),
];
}

public function handle(): void
{
try {
$this->execute();
} catch (ConnectionException $e) {
$this->handleRetry('connection', $e);
} catch (RateLimitException $e) {
$this->handleRetry('rate_limit', $e);
} catch (Exception $e) {
$this->handleRetry('default', $e);
}
}

protected function handleRetry(string $type, Exception $e): void
{
$rules = $this->retryRules[$type];
$attempts = $this->attempts();

if ($attempts >= $rules['max_attempts']) {
$this->fail($e);
return;
}

$delay = $rules['backoff'][$attempts - 1] ?? end($rules['backoff']);
$this->release($delay);
}

abstract protected function execute(): void;
}

队列监控

队列健康检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
<?php

namespace App\Services;

use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Facades\Queue;

class QueueHealthService
{
public function getHealthStatus(): array
{
$queues = ['high', 'default', 'low'];
$status = [];

foreach ($queues as $queue) {
$status[$queue] = $this->checkQueue($queue);
}

return [
'status' => $this->determineOverallStatus($status),
'queues' => $status,
'workers' => $this->getWorkerCount(),
];
}

protected function checkQueue(string $queue): array
{
$size = Queue::size($queue);
$failed = $this->getFailedCount($queue);

return [
'size' => $size,
'failed' => $failed,
'status' => $this->determineQueueStatus($size, $failed),
];
}

protected function determineQueueStatus(int $size, int $failed): string
{
if ($failed > 100) {
return 'critical';
}

if ($size > 1000 || $failed > 10) {
return 'warning';
}

return 'healthy';
}

protected function getFailedCount(string $queue): int
{
return DB::table('failed_jobs')
->where('queue', $queue)
->where('failed_at', '>=', now()->subHour())
->count();
}

protected function getWorkerCount(): int
{
return Redis::connection()->command('CLIENT', ['LIST'])
? count(explode("\n", trim($result)))
: 0;
}
}

队列指标收集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
<?php

namespace App\Services;

use Illuminate\Support\Facades\Cache;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobFailed;

class QueueMetricsService
{
protected string $metricsKey = 'queue:metrics';

public function recordProcessed(JobProcessed $event): void
{
$this->recordMetric('processed', [
'queue' => $event->job->getQueue(),
'job' => $event->job->resolveName(),
'duration' => $this->calculateDuration($event),
]);
}

public function recordFailed(JobFailed $event): void
{
$this->recordMetric('failed', [
'queue' => $event->job->getQueue(),
'job' => $event->job->resolveName(),
'exception' => $event->exception->getMessage(),
]);
}

public function getMetrics(string $period = 'hour'): array
{
return Cache::get("{$this->metricsKey}:{$period}", [
'processed' => 0,
'failed' => 0,
'avg_duration' => 0,
]);
}

protected function recordMetric(string $type, array $data): void
{
$key = "{$this->metricsKey}:hour";
$metrics = Cache::get($key, $this->getDefaultMetrics());

$metrics[$type]++;
$metrics['jobs'][] = array_merge($data, [
'timestamp' => now()->toIso8601String(),
]);

Cache::put($key, $metrics, 3600);
}

protected function calculateDuration(JobProcessed $event): float
{
return microtime(true) - $event->job->payload()['pushedAt'];
}

protected function getDefaultMetrics(): array
{
return [
'processed' => 0,
'failed' => 0,
'jobs' => [],
];
}
}

实战案例

邮件批量发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<?php

namespace App\Jobs;

use App\Models\Campaign;
use App\Models\User;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Support\Facades\Mail;
use Illuminate\Support\Facades\Bus;

class SendCampaignEmail implements ShouldQueue
{
use Batchable, Queueable;

public function __construct(
protected Campaign $campaign,
protected array $userIds
) {}

public function handle(): void
{
if ($this->batch()?->cancelled()) {
return;
}

$users = User::whereIn('id', $this->userIds)->get();

foreach ($users as $user) {
Mail::to($user->email)
->queue(new CampaignMail($this->campaign, $user));
}

$this->campaign->increment('sent_count', count($users));
}

public static function dispatchCampaign(Campaign $campaign): void
{
$userIds = $campaign->targetUsers()->pluck('id')->chunk(100);

Bus::batch(
$userIds->map(fn($chunk) => new static($campaign, $chunk->toArray()))->toArray()
)->then(function () use ($campaign) {
$campaign->update(['status' => 'completed']);
})->catch(function () use ($campaign) {
$campaign->update(['status' => 'failed']);
})->name("Campaign: {$campaign->name}")
->onQueue('emails')
->dispatch();
}
}

数据同步任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
<?php

namespace App\Jobs;

use App\Services\SyncService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;

class SyncDataJob implements ShouldQueue
{
use InteractsWithQueue, Queueable;

public int $timeout = 3600;
public int $tries = 1;

protected int $chunkSize = 1000;
protected int $lastId = 0;

public function __construct(
protected string $source,
protected string $entity,
protected array $options = []
) {
$this->lastId = $options['last_id'] ?? 0;
}

public function handle(SyncService $syncService): void
{
$hasMore = true;

while ($hasMore) {
if ($this->shouldStop()) {
break;
}

$data = $syncService->fetch(
$this->source,
$this->entity,
$this->lastId,
$this->chunkSize
);

if (empty($data)) {
$hasMore = false;
break;
}

$syncService->sync($this->entity, $data);

$this->lastId = end($data)['id'];
$this->updateProgress();

if (count($data) < $this->chunkSize) {
$hasMore = false;
}
}
}

protected function shouldStop(): bool
{
return Cache::get("sync:{$this->source}:{$this->entity}:stop", false);
}

protected function updateProgress(): void
{
Cache::put("sync:{$this->source}:{$this->entity}:progress", [
'last_id' => $this->lastId,
'updated_at' => now()->toIso8601String(),
]);
}
}

总结

Laravel 13 的队列系统提供了:

  • 灵活的批量任务处理
  • 强大的任务中间件
  • 智能的重试策略
  • 完善的失败处理
  • 实时监控能力
  • 分布式锁支持

掌握队列高级用法是构建高性能、可扩展应用的关键。