Laravel 13 的邮件队列系统可以高效处理大量邮件发送,本文介绍邮件队列的高级用法。
邮件队列配置
队列配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| <?php
return [ 'default' => env('QUEUE_CONNECTION', 'redis'), 'connections' => [ 'sync' => [ 'driver' => 'sync', ], 'database' => [ 'driver' => 'database', 'table' => 'jobs', 'queue' => 'default', 'retry_after' => 90, 'after_commit' => true, ], 'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => env('REDIS_QUEUE', 'default'), 'retry_after' => 90, 'block_for' => null, 'after_commit' => true, ], ], 'batching' => [ 'database' => env('DB_CONNECTION', 'mysql'), 'table' => 'job_batches', ], 'failed' => [ 'driver' => env('QUEUE_FAILED_DRIVER', 'database-uuids'), 'database' => env('DB_CONNECTION', 'mysql'), 'table' => 'failed_jobs', ], ];
|
队列邮件
可队列邮件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| <?php
namespace App\Mail;
use App\Models\User; use Illuminate\Bus\Queueable; use Illuminate\Mail\Mailable; use Illuminate\Mail\Mailables\Content; use Illuminate\Mail\Mailables\Envelope; use Illuminate\Queue\SerializesModels;
class NewsletterEmail extends Mailable { use Queueable, SerializesModels; public int $tries = 3; public int $backoff = 60; public int $maxExceptions = 3; public int $timeout = 120; public function __construct( protected User $user, protected array $content ) {} public function envelope(): Envelope { return new Envelope( subject: $this->content['subject'], tags: ['newsletter'], metadata: [ 'user_id' => $this->user->id, ], ); } public function content(): Content { return new Content( view: 'emails.newsletter', with: [ 'user' => $this->user, 'content' => $this->content, ], ); } public function shouldQueue(): bool { return true; } public function retryUntil(): \DateTime { return now()->addHours(24); } public function failed(\Throwable $exception): void { \Log::error('Newsletter email failed', [ 'user_id' => $this->user->id, 'error' => $exception->getMessage(), ]); } }
|
邮件队列服务
批量邮件发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| <?php
namespace App\Services\Mail;
use App\Mail\NewsletterEmail; use App\Models\User; use Illuminate\Bus\Batch; use Illuminate\Support\Facades\Bus; use Illuminate\Support\Facades\Mail;
class MailQueueService { public function sendNewsletter(array $userIds, array $content): Batch { $jobs = collect($userIds)->map(function ($userId) use ($content) { $user = User::find($userId); return new SendNewsletterJob($user, $content); }); return Bus::batch($jobs) ->name('Newsletter Batch') ->onQueue('mail') ->allowFailures() ->finally(function (Batch $batch) { \Log::info("Newsletter batch completed", [ 'total_jobs' => $batch->totalJobs, 'failed_jobs' => $batch->failedJobs, ]); }) ->dispatch(); } public function sendTransactional(User $user, string $template, array $data): void { Mail::to($user) ->queue(new TransactionalEmail($user, $template, $data)); } public function sendWithDelay(User $user, Mailable $mail, int $delaySeconds): void { Mail::to($user) ->later(now()->addSeconds($delaySeconds), $mail); } }
class SendNewsletterJob implements \Illuminate\Contracts\Queue\ShouldQueue { use \Illuminate\Bus\Queueable; public int $tries = 3; public int $backoff = [60, 120, 300]; public function __construct( protected User $user, protected array $content ) {} public function handle(): void { Mail::to($this->user)->send(new NewsletterEmail($this->user, $this->content)); } public function failed(\Throwable $exception): void { \Log::error('Newsletter job failed', [ 'user_id' => $this->user->id, 'error' => $exception->getMessage(), ]); } }
|
邮件队列监控
队列监控服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| <?php
namespace App\Services\Mail;
use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\Queue;
class MailQueueMonitor { public function getQueueStats(): array { return [ 'pending' => $this->getPendingCount(), 'failed' => $this->getFailedCount(), 'processed_today' => $this->getProcessedToday(), ]; } protected function getPendingCount(): int { return DB::table('jobs') ->where('queue', 'mail') ->count(); } protected function getFailedCount(): int { return DB::table('failed_jobs') ->where('queue', 'mail') ->count(); } protected function getProcessedToday(): int { return DB::table('job_batches') ->whereDate('created_at', today()) ->where('name', 'like', '%mail%') ->sum('processed_jobs'); } public function getBatchStatus(string $batchId): array { $batch = Bus::findBatch($batchId); if (!$batch) { return ['error' => 'Batch not found']; } return [ 'id' => $batch->id, 'name' => $batch->name, 'total_jobs' => $batch->totalJobs, 'pending_jobs' => $batch->pendingJobs, 'processed_jobs' => $batch->processedJobs(), 'failed_jobs' => $batch->failedJobs, 'progress' => $batch->progress(), 'finished' => $batch->finished(), 'cancelled' => $batch->cancelled(), ]; } public function getFailedJobs(int $limit = 50): array { return DB::table('failed_jobs') ->where('queue', 'mail') ->orderByDesc('failed_at') ->limit($limit) ->get() ->map(function ($job) { return [ 'id' => $job->id, 'queue' => $job->queue, 'error' => $job->exception, 'failed_at' => $job->failed_at, ]; }) ->toArray(); } }
|
邮件限流
限流中间件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| <?php
namespace App\Jobs\Middleware;
use Illuminate\Support\Facades\Redis;
class RateLimitedMail { protected int $maxAttempts = 100; protected int $decaySeconds = 60; public function __construct(int $maxAttempts = 100, int $decaySeconds = 60) { $this->maxAttempts = $maxAttempts; $this->decaySeconds = $decaySeconds; } public function handle($job, $next): void { $key = 'mail_rate_limit:' . now()->format('YmdHi'); $attempts = Redis::incr($key); if ($attempts === 1) { Redis::expire($key, $this->decaySeconds); } if ($attempts > $this->maxAttempts) { $job->release($this->decaySeconds); return; } $next($job); } }
class ThrottledMailJob implements \Illuminate\Contracts\Queue\ShouldQueue { use \Illuminate\Bus\Queueable; public function middleware(): array { return [new RateLimitedMail(100, 60)]; } public function handle(): void { } }
|
邮件重试策略
自定义重试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| <?php
namespace App\Jobs;
use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Support\Facades\Mail;
class RetryableMailJob implements ShouldQueue { use Queueable, InteractsWithQueue; protected int $maxRetries = 5; protected array $backoffTimes = [60, 300, 900, 1800, 3600]; public function __construct( protected $mailable, protected $recipient ) {} public function handle(): void { try { Mail::to($this->recipient)->send($this->mailable); } catch (\Throwable $e) { $this->handleFailure($e); } } protected function handleFailure(\Throwable $exception): void { $attempts = $this->attempts(); if ($attempts >= $this->maxRetries) { $this->fail($exception); return; } $backoff = $this->backoffTimes[$attempts - 1] ?? 3600; \Log::warning('Mail job failed, retrying', [ 'attempt' => $attempts, 'backoff' => $backoff, 'error' => $exception->getMessage(), ]); $this->release($backoff); } public function backoff(): array { return $this->backoffTimes; } public function retryUntil(): \DateTime { return now()->addHours(24); } }
|
邮件队列命令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| <?php
namespace App\Console\Commands;
use App\Services\Mail\MailQueueMonitor; use Illuminate\Console\Command;
class MailQueueCommand extends Command { protected $signature = 'mail:queue {action : Action to perform (status|retry|clear)}'; protected $description = 'Manage mail queue'; public function handle(MailQueueMonitor $monitor): int { $action = $this->argument('action'); return match ($action) { 'status' => $this->showStatus($monitor), 'retry' => $this->retryFailed(), 'clear' => $this->clearFailed(), default => $this->invalidAction(), }; } protected function showStatus(MailQueueMonitor $monitor): int { $stats = $monitor->getQueueStats(); $this->info('=== Mail Queue Status ==='); $this->table( ['Metric', 'Value'], [ ['Pending Jobs', $stats['pending']], ['Failed Jobs', $stats['failed']], ['Processed Today', $stats['processed_today']], ] ); return self::SUCCESS; } protected function retryFailed(): int { $count = \Illuminate\Support\Facades\Artisan::call('queue:retry', [ 'queue' => 'mail', ]); $this->info('Retrying failed mail jobs...'); return self::SUCCESS; } protected function clearFailed(): int { if (!$this->confirm('Are you sure you want to clear all failed mail jobs?')) { return self::SUCCESS; } \Illuminate\Support\Facades\DB::table('failed_jobs') ->where('queue', 'mail') ->delete(); $this->info('Failed mail jobs cleared'); return self::SUCCESS; } protected function invalidAction(): int { $this->error('Invalid action. Use: status, retry, or clear'); return self::FAILURE; } }
|
邮件队列控制器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| <?php
namespace App\Http\Controllers\Api;
use App\Http\Controllers\Controller; use App\Services\Mail\MailQueueService; use Illuminate\Http\Request;
class MailQueueController extends Controller { public function __construct( protected MailQueueService $mailQueue, protected \App\Services\Mail\MailQueueMonitor $monitor ) {} public function status() { return response()->json($this->monitor->getQueueStats()); } public function batchStatus(string $batchId) { return response()->json($this->monitor->getBatchStatus($batchId)); } public function sendNewsletter(Request $request) { $request->validate([ 'user_ids' => 'required|array', 'user_ids.*' => 'exists:users,id', 'subject' => 'required|string|max:200', 'content' => 'required|string', ]); $batch = $this->mailQueue->sendNewsletter( $request->user_ids, $request->only('subject', 'content') ); return response()->json([ 'message' => 'Newsletter batch started', 'batch_id' => $batch->id, ]); } }
|
总结
Laravel 13 的邮件队列系统通过批处理、限流、重试策略和监控机制,可以高效处理大量邮件发送任务。合理配置队列参数和监控告警,可以确保邮件发送的稳定性和可靠性。