Laravel 13 后台任务处理完全指南
后台任务处理是现代 Web 应用的核心功能,Laravel 提供了强大的队列系统来处理异步任务。本文将深入探讨 Laravel 13 中后台任务处理的最佳实践。
队列基础
配置队列连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| return [ 'default' => env('QUEUE_CONNECTION', 'database'),
'connections' => [ 'sync' => [ 'driver' => 'sync', ],
'database' => [ 'driver' => 'database', 'connection' => env('DB_QUEUE_CONNECTION'), 'table' => 'jobs', 'queue' => 'default', 'retry_after' => 90, 'after_commit' => true, ],
'redis' => [ 'driver' => 'redis', 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'), 'queue' => env('REDIS_QUEUE', 'default'), 'retry_after' => 90, 'block_for' => 5, 'after_commit' => true, ], ], ];
|
创建队列表
1 2 3
| php artisan queue:table php artisan queue:failed-table php artisan migrate
|
创建任务
生成任务类
1 2
| php artisan make:job ProcessPodcast php artisan make:job SendNewsletter --queued
|
任务类结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| <?php
namespace App\Jobs;
use App\Models\Podcast; use App\Services\AudioProcessor; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public int $tries = 3; public int $maxExceptions = 2; public int $timeout = 120; public int $backoff = [10, 30, 60];
public function __construct( public Podcast $podcast, public string $format = 'mp3' ) { $this->onQueue('processing'); }
public function handle(AudioProcessor $processor): void { $processor->process($this->podcast, $this->format); $this->podcast->update([ 'processed_at' => now(), 'status' => 'completed', ]); }
public function failed(\Throwable $exception): void { $this->podcast->update([ 'status' => 'failed', 'error_message' => $exception->getMessage(), ]); } }
|
分发任务
基础分发
1 2 3 4 5 6 7 8
| use App\Jobs\ProcessPodcast; use App\Models\Podcast;
ProcessPodcast::dispatch($podcast);
ProcessPodcast::dispatch($podcast, 'wav');
dispatch(new ProcessPodcast($podcast));
|
延迟分发
1 2 3 4 5
| ProcessPodcast::dispatch($podcast) ->delay(now()->addMinutes(5));
ProcessPodcast::dispatch($podcast) ->delay(now()->addHours(1));
|
指定队列
1 2 3 4 5 6
| ProcessPodcast::dispatch($podcast) ->onQueue('processing');
ProcessPodcast::dispatch($podcast) ->onConnection('redis') ->onQueue('high');
|
条件分发
1 2 3
| ProcessPodcast::dispatchIf($podcast->needsProcessing(), $podcast);
ProcessPodcast::dispatchUnless($podcast->isProcessed(), $podcast);
|
链式任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| use App\Jobs\OptimizePodcast; use App\Jobs\ReleasePodcast; use App\Jobs\NotifySubscribers;
Bus::chain([ new ProcessPodcast($podcast), new OptimizePodcast($podcast), new ReleasePodcast($podcast), new NotifySubscribers($podcast), ])->dispatch();
Bus::chain([ new ProcessPodcast($podcast), new OptimizePodcast($podcast), ])->onConnection('redis')->onQueue('processing')->dispatch();
|
批量任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| use App\Jobs\ProcessInvoice; use Illuminate\Bus\Batch; use Illuminate\Support\Facades\Bus;
$batch = Bus::batch([ new ProcessInvoice(1), new ProcessInvoice(2), new ProcessInvoice(3), new ProcessInvoice(4), ])->then(function (Batch $batch) { })->catch(function (Batch $batch, \Throwable $e) { })->finally(function (Batch $batch) { })->name('Process Invoices')->dispatch();
|
任务重试
配置重试次数
1 2 3 4 5 6 7 8 9 10 11 12 13
| class ProcessPodcast implements ShouldQueue { public int $tries = 3; public int $backoff = 60; public array $backoff = [10, 30, 60]; public function backoff(): array { return [10, 30, 60]; } }
|
基于时间的重试
1 2 3 4 5 6 7
| class ProcessPodcast implements ShouldQueue { public function retryUntil(): \DateTime { return now()->addHours(2); } }
|
手动释放
1 2 3 4 5 6 7 8 9
| public function handle(): void { if ($this->podcast->isLocked()) { $this->release(30); return; } }
|
唯一任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| use Illuminate\Support\Facades\Cache;
class ProcessPodcast implements ShouldQueue, ShouldBeUnique { public int $uniqueId;
public function uniqueId(): string { return 'podcast-' . $this->podcast->id; }
public function uniqueFor(): int { return 3600; } }
|
任务中间件
创建中间件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| <?php
namespace App\Jobs\Middleware;
use Illuminate\Support\Facades\Redis;
class RateLimited { public function __construct( protected string $key, protected int $limit = 5, protected int $seconds = 1 ) {}
public function handle($job, $next): void { Redis::throttle($this->key) ->allow($this->limit) ->every($this->seconds) ->then(function () use ($job, $next) { $next($job); }, function () use ($job) { $job->release(10); }); } }
|
应用中间件
1 2 3 4 5 6 7 8 9 10
| class ProcessPodcast implements ShouldQueue { public function middleware(): array { return [ new RateLimited('podcast-processing', 10, 1), new SkipIfRecentlyProcessed(), ]; } }
|
内置中间件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| use Illuminate\Queue\Middleware\WithoutOverlapping; use Illuminate\Queue\Middleware\ThrottlesExceptions; use Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis;
public function middleware(): array { return [ (new WithoutOverlapping($this->podcast->id)) ->releaseAfter(60) ->expireAfter(180), (new ThrottlesExceptions(10, 5)) ->by('podcast-exceptions'), ]; }
|
队列工作进程
启动工作进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| php artisan queue:work
php artisan queue:work redis --queue=high,default,low
php artisan queue:work --daemon
php artisan queue:work --max-jobs=1000
php artisan queue:work --max-time=3600
php artisan queue:work --memory=128
|
监听队列
1 2
| php artisan queue:listen php artisan queue:listen redis --queue=high,default
|
处理失败任务
1 2 3 4 5 6 7 8 9 10 11 12
| php artisan queue:failed
php artisan queue:retry all php artisan queue:retry 5
php artisan queue:forget 5
php artisan queue:flush
|
Supervisor 配置
安装 Supervisor
1
| sudo apt-get install supervisor
|
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13
| [program:laravel-worker] process_name=%(program_name)s_%(process_num)02d command=php /var/www/app/artisan queue:work redis --queue=high,default,low --sleep=3 --tries=3 --max-time=3600 autostart=true autorestart=true stopasgroup=true killasgroup=true user=www-data numprocs=8 redirect_stderr=true stdout_logfile=/var/www/app/storage/logs/worker.log stopwaitsecs=3600
|
Supervisor 命令
1 2 3 4 5
| sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start laravel-worker:* sudo supervisorctl stop laravel-worker:* sudo supervisorctl restart laravel-worker:*
|
任务事件
任务事件监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| use Illuminate\Support\Facades\Queue; use Illuminate\Queue\Events\JobProcessed; use Illuminate\Queue\Events\JobProcessing; use Illuminate\Queue\Events\JobFailed;
class AppServiceProvider extends ServiceProvider { public function boot(): void { Queue::before(function (JobProcessing $event) { });
Queue::after(function (JobProcessed $event) { });
Queue::failing(function (JobFailed $event) { });
Queue::exceptionOccurred(function ($event) { }); } }
|
Horizon 监控
安装 Horizon
1 2
| composer require laravel/horizon php artisan horizon:install
|
配置 Horizon
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| return [ 'environments' => [ 'production' => [ 'supervisor-1' => [ 'connection' => 'redis', 'queue' => ['high', 'default', 'low'], 'balance' => 'auto', 'autoScalingStrategy' => 'time', 'maxProcesses' => 10, 'maxTime' => 0, 'maxJobs' => 0, 'memory' => 128, 'tries' => 3, 'timeout' => 60, ], ],
'local' => [ 'supervisor-1' => [ 'connection' => 'redis', 'queue' => ['default'], 'balance' => 'simple', 'processes' => 3, 'tries' => 3, ], ], ], ];
|
启动 Horizon
1 2 3 4
| php artisan horizon php artisan horizon:pause php artisan horizon:continue php artisan horizon:terminate
|
实战示例
邮件发送任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| <?php
namespace App\Jobs;
use App\Models\User; use App\Mail\Newsletter; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Support\Facades\Mail;
class SendNewsletter implements ShouldQueue { use Queueable;
public int $tries = 3; public int $backoff = 60;
public function __construct( public User $user, public array $content ) {}
public function handle(): void { Mail::to($this->user->email) ->send(new Newsletter($this->content)); }
public function failed(\Throwable $exception): void { \Log::error('Newsletter failed', [ 'user_id' => $this->user->id, 'error' => $exception->getMessage(), ]); } }
|
数据导出任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| <?php
namespace App\Jobs;
use App\Models\User; use App\Services\ExportService; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Support\Facades\Storage;
class ExportUsers implements ShouldQueue, ShouldBeUnique { use Queueable;
public int $timeout = 600;
public function __construct( public User $user, public array $filters = [] ) {}
public function uniqueId(): string { return 'export-users-' . $this->user->id; }
public function handle(ExportService $exporter): void { $users = User::query() ->when($this->filters['role'] ?? null, fn($q, $role) => $q->where('role', $role)) ->when($this->filters['status'] ?? null, fn($q, $status) => $q->where('status', $status)) ->get();
$path = $exporter->toCsv($users, 'users-export.csv'); $this->user->notify(new ExportReady($path)); } }
|
图片处理任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| <?php
namespace App\Jobs;
use App\Models\Image; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Support\Facades\Storage; use Intervention\Image\Facades\Image as ImageProcessor;
class ProcessImage implements ShouldQueue { use Queueable;
public int $tries = 3; public int $timeout = 300;
protected array $sizes = [ 'thumbnail' => [150, 150], 'medium' => [400, 400], 'large' => [800, 800], ];
public function __construct( public Image $image ) { $this->onQueue('images'); }
public function handle(): void { $original = Storage::path($this->image->path); foreach ($this->sizes as $name => [$width, $height]) { $processed = ImageProcessor::make($original) ->fit($width, $height) ->encode('jpg', 80);
$path = "images/{$name}/{$this->image->id}.jpg"; Storage::put($path, $processed);
$this->image->sizes()->create([ 'name' => $name, 'path' => $path, 'width' => $width, 'height' => $height, ]); }
$this->image->update(['processed_at' => now()]); } }
|
总结
Laravel 13 的后台任务处理提供了:
- 灵活的队列配置
- 强大的任务分发机制
- 完善的重试策略
- 任务中间件支持
- 批量任务处理
- Horizon 监控面板
合理使用后台任务可以显著提升应用性能和用户体验。