Laravel 13 后台任务处理完全指南

后台任务处理是现代 Web 应用的核心功能,Laravel 提供了强大的队列系统来处理异步任务。本文将深入探讨 Laravel 13 中后台任务处理的最佳实践。

队列基础

配置队列连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// config/queue.php
return [
'default' => env('QUEUE_CONNECTION', 'database'),

'connections' => [
'sync' => [
'driver' => 'sync',
],

'database' => [
'driver' => 'database',
'connection' => env('DB_QUEUE_CONNECTION'),
'table' => 'jobs',
'queue' => 'default',
'retry_after' => 90,
'after_commit' => true,
],

'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => 90,
'block_for' => 5,
'after_commit' => true,
],
],
];

创建队列表

1
2
3
php artisan queue:table
php artisan queue:failed-table
php artisan migrate

创建任务

生成任务类

1
2
php artisan make:job ProcessPodcast
php artisan make:job SendNewsletter --queued

任务类结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

public int $tries = 3;
public int $maxExceptions = 2;
public int $timeout = 120;
public int $backoff = [10, 30, 60];

public function __construct(
public Podcast $podcast,
public string $format = 'mp3'
) {
$this->onQueue('processing');
}

public function handle(AudioProcessor $processor): void
{
$processor->process($this->podcast, $this->format);

$this->podcast->update([
'processed_at' => now(),
'status' => 'completed',
]);
}

public function failed(\Throwable $exception): void
{
$this->podcast->update([
'status' => 'failed',
'error_message' => $exception->getMessage(),
]);
}
}

分发任务

基础分发

1
2
3
4
5
6
7
8
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;

ProcessPodcast::dispatch($podcast);

ProcessPodcast::dispatch($podcast, 'wav');

dispatch(new ProcessPodcast($podcast));

延迟分发

1
2
3
4
5
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(5));

ProcessPodcast::dispatch($podcast)
->delay(now()->addHours(1));

指定队列

1
2
3
4
5
6
ProcessPodcast::dispatch($podcast)
->onQueue('processing');

ProcessPodcast::dispatch($podcast)
->onConnection('redis')
->onQueue('high');

条件分发

1
2
3
ProcessPodcast::dispatchIf($podcast->needsProcessing(), $podcast);

ProcessPodcast::dispatchUnless($podcast->isProcessed(), $podcast);

链式任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use App\Jobs\OptimizePodcast;
use App\Jobs\ReleasePodcast;
use App\Jobs\NotifySubscribers;

Bus::chain([
new ProcessPodcast($podcast),
new OptimizePodcast($podcast),
new ReleasePodcast($podcast),
new NotifySubscribers($podcast),
])->dispatch();

Bus::chain([
new ProcessPodcast($podcast),
new OptimizePodcast($podcast),
])->onConnection('redis')->onQueue('processing')->dispatch();

批量任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
use App\Jobs\ProcessInvoice;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;

$batch = Bus::batch([
new ProcessInvoice(1),
new ProcessInvoice(2),
new ProcessInvoice(3),
new ProcessInvoice(4),
])->then(function (Batch $batch) {
// 所有任务成功完成
})->catch(function (Batch $batch, \Throwable $e) {
// 检测到第一个失败
})->finally(function (Batch $batch) {
// 无论成功或失败都执行
})->name('Process Invoices')->dispatch();

任务重试

配置重试次数

1
2
3
4
5
6
7
8
9
10
11
12
13
class ProcessPodcast implements ShouldQueue
{
public int $tries = 3;

public int $backoff = 60;

public array $backoff = [10, 30, 60];

public function backoff(): array
{
return [10, 30, 60];
}
}

基于时间的重试

1
2
3
4
5
6
7
class ProcessPodcast implements ShouldQueue
{
public function retryUntil(): \DateTime
{
return now()->addHours(2);
}
}

手动释放

1
2
3
4
5
6
7
8
9
public function handle(): void
{
if ($this->podcast->isLocked()) {
$this->release(30); // 30秒后重试
return;
}

// 处理逻辑
}

唯一任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
use Illuminate\Support\Facades\Cache;

class ProcessPodcast implements ShouldQueue, ShouldBeUnique
{
public int $uniqueId;

public function uniqueId(): string
{
return 'podcast-' . $this->podcast->id;
}

public function uniqueFor(): int
{
return 3600; // 1小时内唯一
}
}

任务中间件

创建中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?php

namespace App\Jobs\Middleware;

use Illuminate\Support\Facades\Redis;

class RateLimited
{
public function __construct(
protected string $key,
protected int $limit = 5,
protected int $seconds = 1
) {}

public function handle($job, $next): void
{
Redis::throttle($this->key)
->allow($this->limit)
->every($this->seconds)
->then(function () use ($job, $next) {
$next($job);
}, function () use ($job) {
$job->release(10);
});
}
}

应用中间件

1
2
3
4
5
6
7
8
9
10
class ProcessPodcast implements ShouldQueue
{
public function middleware(): array
{
return [
new RateLimited('podcast-processing', 10, 1),
new SkipIfRecentlyProcessed(),
];
}
}

内置中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
use Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis;

public function middleware(): array
{
return [
(new WithoutOverlapping($this->podcast->id))
->releaseAfter(60)
->expireAfter(180),

(new ThrottlesExceptions(10, 5))
->by('podcast-exceptions'),
];
}

队列工作进程

启动工作进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 基础启动
php artisan queue:work

# 指定连接和队列
php artisan queue:work redis --queue=high,default,low

# 守护进程模式
php artisan queue:work --daemon

# 指定最大任务数
php artisan queue:work --max-jobs=1000

# 指定最大时间
php artisan queue:work --max-time=3600

# 指定内存限制
php artisan queue:work --memory=128

监听队列

1
2
php artisan queue:listen
php artisan queue:listen redis --queue=high,default

处理失败任务

1
2
3
4
5
6
7
8
9
10
11
12
# 查看失败任务
php artisan queue:failed

# 重试失败任务
php artisan queue:retry all
php artisan queue:retry 5

# 删除失败任务
php artisan queue:forget 5

# 清除所有失败任务
php artisan queue:flush

Supervisor 配置

安装 Supervisor

1
sudo apt-get install supervisor

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
# /etc/supervisor/conf.d/laravel-worker.conf
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/app/artisan queue:work redis --queue=high,default,low --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=www-data
numprocs=8
redirect_stderr=true
stdout_logfile=/var/www/app/storage/logs/worker.log
stopwaitsecs=3600

Supervisor 命令

1
2
3
4
5
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*
sudo supervisorctl stop laravel-worker:*
sudo supervisorctl restart laravel-worker:*

任务事件

任务事件监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Queue\Events\JobFailed;

class AppServiceProvider extends ServiceProvider
{
public function boot(): void
{
Queue::before(function (JobProcessing $event) {
// 任务开始前
});

Queue::after(function (JobProcessed $event) {
// 任务完成后
});

Queue::failing(function (JobFailed $event) {
// 任务失败时
});

Queue::exceptionOccurred(function ($event) {
// 发生异常时
});
}
}

Horizon 监控

安装 Horizon

1
2
composer require laravel/horizon
php artisan horizon:install

配置 Horizon

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// config/horizon.php
return [
'environments' => [
'production' => [
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['high', 'default', 'low'],
'balance' => 'auto',
'autoScalingStrategy' => 'time',
'maxProcesses' => 10,
'maxTime' => 0,
'maxJobs' => 0,
'memory' => 128,
'tries' => 3,
'timeout' => 60,
],
],

'local' => [
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['default'],
'balance' => 'simple',
'processes' => 3,
'tries' => 3,
],
],
],
];

启动 Horizon

1
2
3
4
php artisan horizon
php artisan horizon:pause
php artisan horizon:continue
php artisan horizon:terminate

实战示例

邮件发送任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?php

namespace App\Jobs;

use App\Models\User;
use App\Mail\Newsletter;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Support\Facades\Mail;

class SendNewsletter implements ShouldQueue
{
use Queueable;

public int $tries = 3;
public int $backoff = 60;

public function __construct(
public User $user,
public array $content
) {}

public function handle(): void
{
Mail::to($this->user->email)
->send(new Newsletter($this->content));
}

public function failed(\Throwable $exception): void
{
\Log::error('Newsletter failed', [
'user_id' => $this->user->id,
'error' => $exception->getMessage(),
]);
}
}

数据导出任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?php

namespace App\Jobs;

use App\Models\User;
use App\Services\ExportService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Support\Facades\Storage;

class ExportUsers implements ShouldQueue, ShouldBeUnique
{
use Queueable;

public int $timeout = 600;

public function __construct(
public User $user,
public array $filters = []
) {}

public function uniqueId(): string
{
return 'export-users-' . $this->user->id;
}

public function handle(ExportService $exporter): void
{
$users = User::query()
->when($this->filters['role'] ?? null, fn($q, $role) => $q->where('role', $role))
->when($this->filters['status'] ?? null, fn($q, $status) => $q->where('status', $status))
->get();

$path = $exporter->toCsv($users, 'users-export.csv');

$this->user->notify(new ExportReady($path));
}
}

图片处理任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<?php

namespace App\Jobs;

use App\Models\Image;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Support\Facades\Storage;
use Intervention\Image\Facades\Image as ImageProcessor;

class ProcessImage implements ShouldQueue
{
use Queueable;

public int $tries = 3;
public int $timeout = 300;

protected array $sizes = [
'thumbnail' => [150, 150],
'medium' => [400, 400],
'large' => [800, 800],
];

public function __construct(
public Image $image
) {
$this->onQueue('images');
}

public function handle(): void
{
$original = Storage::path($this->image->path);

foreach ($this->sizes as $name => [$width, $height]) {
$processed = ImageProcessor::make($original)
->fit($width, $height)
->encode('jpg', 80);

$path = "images/{$name}/{$this->image->id}.jpg";
Storage::put($path, $processed);

$this->image->sizes()->create([
'name' => $name,
'path' => $path,
'width' => $width,
'height' => $height,
]);
}

$this->image->update(['processed_at' => now()]);
}
}

总结

Laravel 13 的后台任务处理提供了:

  • 灵活的队列配置
  • 强大的任务分发机制
  • 完善的重试策略
  • 任务中间件支持
  • 批量任务处理
  • Horizon 监控面板

合理使用后台任务可以显著提升应用性能和用户体验。