Laravel 13 的邮件队列系统可以高效处理大量邮件发送,本文介绍邮件队列的高级用法。

邮件队列配置

队列配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<?php

return [
'default' => env('QUEUE_CONNECTION', 'redis'),

'connections' => [
'sync' => [
'driver' => 'sync',
],

'database' => [
'driver' => 'database',
'table' => 'jobs',
'queue' => 'default',
'retry_after' => 90,
'after_commit' => true,
],

'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => 90,
'block_for' => null,
'after_commit' => true,
],
],

'batching' => [
'database' => env('DB_CONNECTION', 'mysql'),
'table' => 'job_batches',
],

'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'database-uuids'),
'database' => env('DB_CONNECTION', 'mysql'),
'table' => 'failed_jobs',
],
];

队列邮件

可队列邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
<?php

namespace App\Mail;

use App\Models\User;
use Illuminate\Bus\Queueable;
use Illuminate\Mail\Mailable;
use Illuminate\Mail\Mailables\Content;
use Illuminate\Mail\Mailables\Envelope;
use Illuminate\Queue\SerializesModels;

class NewsletterEmail extends Mailable
{
use Queueable, SerializesModels;

public int $tries = 3;
public int $backoff = 60;
public int $maxExceptions = 3;
public int $timeout = 120;

public function __construct(
protected User $user,
protected array $content
) {}

public function envelope(): Envelope
{
return new Envelope(
subject: $this->content['subject'],
tags: ['newsletter'],
metadata: [
'user_id' => $this->user->id,
],
);
}

public function content(): Content
{
return new Content(
view: 'emails.newsletter',
with: [
'user' => $this->user,
'content' => $this->content,
],
);
}

public function shouldQueue(): bool
{
return true;
}

public function retryUntil(): \DateTime
{
return now()->addHours(24);
}

public function failed(\Throwable $exception): void
{
\Log::error('Newsletter email failed', [
'user_id' => $this->user->id,
'error' => $exception->getMessage(),
]);
}
}

邮件队列服务

批量邮件发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
<?php

namespace App\Services\Mail;

use App\Mail\NewsletterEmail;
use App\Models\User;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Mail;

class MailQueueService
{
public function sendNewsletter(array $userIds, array $content): Batch
{
$jobs = collect($userIds)->map(function ($userId) use ($content) {
$user = User::find($userId);

return new SendNewsletterJob($user, $content);
});

return Bus::batch($jobs)
->name('Newsletter Batch')
->onQueue('mail')
->allowFailures()
->finally(function (Batch $batch) {
\Log::info("Newsletter batch completed", [
'total_jobs' => $batch->totalJobs,
'failed_jobs' => $batch->failedJobs,
]);
})
->dispatch();
}

public function sendTransactional(User $user, string $template, array $data): void
{
Mail::to($user)
->queue(new TransactionalEmail($user, $template, $data));
}

public function sendWithDelay(User $user, Mailable $mail, int $delaySeconds): void
{
Mail::to($user)
->later(now()->addSeconds($delaySeconds), $mail);
}
}

class SendNewsletterJob implements \Illuminate\Contracts\Queue\ShouldQueue
{
use \Illuminate\Bus\Queueable;

public int $tries = 3;
public int $backoff = [60, 120, 300];

public function __construct(
protected User $user,
protected array $content
) {}

public function handle(): void
{
Mail::to($this->user)->send(new NewsletterEmail($this->user, $this->content));
}

public function failed(\Throwable $exception): void
{
\Log::error('Newsletter job failed', [
'user_id' => $this->user->id,
'error' => $exception->getMessage(),
]);
}
}

邮件队列监控

队列监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
<?php

namespace App\Services\Mail;

use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;

class MailQueueMonitor
{
public function getQueueStats(): array
{
return [
'pending' => $this->getPendingCount(),
'failed' => $this->getFailedCount(),
'processed_today' => $this->getProcessedToday(),
];
}

protected function getPendingCount(): int
{
return DB::table('jobs')
->where('queue', 'mail')
->count();
}

protected function getFailedCount(): int
{
return DB::table('failed_jobs')
->where('queue', 'mail')
->count();
}

protected function getProcessedToday(): int
{
return DB::table('job_batches')
->whereDate('created_at', today())
->where('name', 'like', '%mail%')
->sum('processed_jobs');
}

public function getBatchStatus(string $batchId): array
{
$batch = Bus::findBatch($batchId);

if (!$batch) {
return ['error' => 'Batch not found'];
}

return [
'id' => $batch->id,
'name' => $batch->name,
'total_jobs' => $batch->totalJobs,
'pending_jobs' => $batch->pendingJobs,
'processed_jobs' => $batch->processedJobs(),
'failed_jobs' => $batch->failedJobs,
'progress' => $batch->progress(),
'finished' => $batch->finished(),
'cancelled' => $batch->cancelled(),
];
}

public function getFailedJobs(int $limit = 50): array
{
return DB::table('failed_jobs')
->where('queue', 'mail')
->orderByDesc('failed_at')
->limit($limit)
->get()
->map(function ($job) {
return [
'id' => $job->id,
'queue' => $job->queue,
'error' => $job->exception,
'failed_at' => $job->failed_at,
];
})
->toArray();
}
}

邮件限流

限流中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<?php

namespace App\Jobs\Middleware;

use Illuminate\Support\Facades\Redis;

class RateLimitedMail
{
protected int $maxAttempts = 100;
protected int $decaySeconds = 60;

public function __construct(int $maxAttempts = 100, int $decaySeconds = 60)
{
$this->maxAttempts = $maxAttempts;
$this->decaySeconds = $decaySeconds;
}

public function handle($job, $next): void
{
$key = 'mail_rate_limit:' . now()->format('YmdHi');

$attempts = Redis::incr($key);

if ($attempts === 1) {
Redis::expire($key, $this->decaySeconds);
}

if ($attempts > $this->maxAttempts) {
$job->release($this->decaySeconds);
return;
}

$next($job);
}
}

class ThrottledMailJob implements \Illuminate\Contracts\Queue\ShouldQueue
{
use \Illuminate\Bus\Queueable;

public function middleware(): array
{
return [new RateLimitedMail(100, 60)];
}

public function handle(): void
{
}
}

邮件重试策略

自定义重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Facades\Mail;

class RetryableMailJob implements ShouldQueue
{
use Queueable, InteractsWithQueue;

protected int $maxRetries = 5;
protected array $backoffTimes = [60, 300, 900, 1800, 3600];

public function __construct(
protected $mailable,
protected $recipient
) {}

public function handle(): void
{
try {
Mail::to($this->recipient)->send($this->mailable);
} catch (\Throwable $e) {
$this->handleFailure($e);
}
}

protected function handleFailure(\Throwable $exception): void
{
$attempts = $this->attempts();

if ($attempts >= $this->maxRetries) {
$this->fail($exception);
return;
}

$backoff = $this->backoffTimes[$attempts - 1] ?? 3600;

\Log::warning('Mail job failed, retrying', [
'attempt' => $attempts,
'backoff' => $backoff,
'error' => $exception->getMessage(),
]);

$this->release($backoff);
}

public function backoff(): array
{
return $this->backoffTimes;
}

public function retryUntil(): \DateTime
{
return now()->addHours(24);
}
}

邮件队列命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
<?php

namespace App\Console\Commands;

use App\Services\Mail\MailQueueMonitor;
use Illuminate\Console\Command;

class MailQueueCommand extends Command
{
protected $signature = 'mail:queue {action : Action to perform (status|retry|clear)}';
protected $description = 'Manage mail queue';

public function handle(MailQueueMonitor $monitor): int
{
$action = $this->argument('action');

return match ($action) {
'status' => $this->showStatus($monitor),
'retry' => $this->retryFailed(),
'clear' => $this->clearFailed(),
default => $this->invalidAction(),
};
}

protected function showStatus(MailQueueMonitor $monitor): int
{
$stats = $monitor->getQueueStats();

$this->info('=== Mail Queue Status ===');
$this->table(
['Metric', 'Value'],
[
['Pending Jobs', $stats['pending']],
['Failed Jobs', $stats['failed']],
['Processed Today', $stats['processed_today']],
]
);

return self::SUCCESS;
}

protected function retryFailed(): int
{
$count = \Illuminate\Support\Facades\Artisan::call('queue:retry', [
'queue' => 'mail',
]);

$this->info('Retrying failed mail jobs...');

return self::SUCCESS;
}

protected function clearFailed(): int
{
if (!$this->confirm('Are you sure you want to clear all failed mail jobs?')) {
return self::SUCCESS;
}

\Illuminate\Support\Facades\DB::table('failed_jobs')
->where('queue', 'mail')
->delete();

$this->info('Failed mail jobs cleared');

return self::SUCCESS;
}

protected function invalidAction(): int
{
$this->error('Invalid action. Use: status, retry, or clear');
return self::FAILURE;
}
}

邮件队列控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<?php

namespace App\Http\Controllers\Api;

use App\Http\Controllers\Controller;
use App\Services\Mail\MailQueueService;
use Illuminate\Http\Request;

class MailQueueController extends Controller
{
public function __construct(
protected MailQueueService $mailQueue,
protected \App\Services\Mail\MailQueueMonitor $monitor
) {}

public function status()
{
return response()->json($this->monitor->getQueueStats());
}

public function batchStatus(string $batchId)
{
return response()->json($this->monitor->getBatchStatus($batchId));
}

public function sendNewsletter(Request $request)
{
$request->validate([
'user_ids' => 'required|array',
'user_ids.*' => 'exists:users,id',
'subject' => 'required|string|max:200',
'content' => 'required|string',
]);

$batch = $this->mailQueue->sendNewsletter(
$request->user_ids,
$request->only('subject', 'content')
);

return response()->json([
'message' => 'Newsletter batch started',
'batch_id' => $batch->id,
]);
}
}

总结

Laravel 13 的邮件队列系统通过批处理、限流、重试策略和监控机制,可以高效处理大量邮件发送任务。合理配置队列参数和监控告警,可以确保邮件发送的稳定性和可靠性。