Laravel 13 队列监控是保障后台任务稳定运行的关键,本文介绍高级队列监控技术。

队列监控架构

监控配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php

return [
'queue' => [
'monitoring' => [
'enabled' => env('QUEUE_MONITORING_ENABLED', true),

'metrics' => [
'jobs_processed' => true,
'jobs_failed' => true,
'queue_size' => true,
'wait_time' => true,
'throughput' => true,
],

'alerts' => [
'queue_size_threshold' => env('QUEUE_SIZE_THRESHOLD', 1000),
'failed_jobs_threshold' => env('FAILED_JOBS_THRESHOLD', 10),
'wait_time_threshold' => env('QUEUE_WAIT_THRESHOLD', 300),
],

'retention' => [
'metrics_days' => 30,
'logs_days' => 7,
],
],
],
];

队列指标收集

指标服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
<?php

namespace App\Services\Queue;

use Illuminate\Support\Facades\Redis;

class QueueMetrics
{
protected string $prefix = 'queue:metrics:';

public function recordJobProcessed(string $queue, float $duration): void
{
$date = now()->format('Ymd');

Redis::incr($this->prefix . "processed:{$queue}:{$date}");
Redis::incr($this->prefix . "processed:{$queue}:total");

Redis::lpush($this->prefix . "duration:{$queue}:{$date}", $duration);
Redis::ltrim($this->prefix . "duration:{$queue}:{$date}", 0, 999);
}

public function recordJobFailed(string $queue, string $exception): void
{
$date = now()->format('Ymd');

Redis::incr($this->prefix . "failed:{$queue}:{$date}");
Redis::incr($this->prefix . "failed:{$queue}:total");

Redis::lpush($this->prefix . "errors:{$queue}:{$date}", json_encode([
'exception' => $exception,
'time' => now()->toIso8601String(),
]));
}

public function getQueueSize(string $queue): int
{
return Redis::llen('queues:' . $queue);
}

public function getProcessedCount(string $queue, int $days = 1): int
{
$total = 0;

for ($i = 0; $i < $days; $i++) {
$date = now()->subDays($i)->format('Ymd');
$total += Redis::get($this->prefix . "processed:{$queue}:{$date}") ?? 0;
}

return $total;
}

public function getFailedCount(string $queue, int $days = 1): int
{
$total = 0;

for ($i = 0; $i < $days; $i++) {
$date = now()->subDays($i)->format('Ymd');
$total += Redis::get($this->prefix . "failed:{$queue}:{$date}") ?? 0;
}

return $total;
}

public function getAverageDuration(string $queue): float
{
$date = now()->format('Ymd');
$durations = Redis::lrange($this->prefix . "duration:{$queue}:{$date}", 0, 99);

if (empty($durations)) {
return 0.0;
}

return array_sum($durations) / count($durations);
}

public function getThroughput(string $queue, int $minutes = 60): float
{
$processed = $this->getProcessedCount($queue, 1);

return $processed / ($minutes / 60);
}

public function getQueueStats(string $queue): array
{
return [
'queue' => $queue,
'size' => $this->getQueueSize($queue),
'processed_today' => $this->getProcessedCount($queue, 1),
'failed_today' => $this->getFailedCount($queue, 1),
'avg_duration' => $this->getAverageDuration($queue),
'throughput' => $this->getThroughput($queue),
];
}

public function getAllQueuesStats(): array
{
$queues = config('queue.connections.redis.queues', ['default']);

return array_map(fn($q) => $this->getQueueStats($q), $queues);
}
}

队列监控器

实时监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
<?php

namespace App\Services\Queue;

use Illuminate\Support\Facades\Log;

class QueueMonitor
{
protected QueueMetrics $metrics;
protected array $thresholds;

public function __construct(QueueMetrics $metrics)
{
$this->metrics = $metrics;
$this->thresholds = config('queue.monitoring.alerts', []);
}

public function check(): array
{
$queues = $this->metrics->getAllQueuesStats();
$alerts = [];

foreach ($queues as $queue) {
$queueAlerts = $this->checkQueue($queue);

if (!empty($queueAlerts)) {
$alerts[$queue['queue']] = $queueAlerts;
}
}

return [
'status' => empty($alerts) ? 'healthy' : 'warning',
'queues' => $queues,
'alerts' => $alerts,
'checked_at' => now()->toIso8601String(),
];
}

protected function checkQueue(array $queue): array
{
$alerts = [];

if ($queue['size'] > ($this->thresholds['queue_size_threshold'] ?? 1000)) {
$alerts[] = [
'type' => 'queue_size',
'message' => "Queue size ({$queue['size']}) exceeds threshold",
'severity' => 'high',
];
}

if ($queue['failed_today'] > ($this->thresholds['failed_jobs_threshold'] ?? 10)) {
$alerts[] = [
'type' => 'failed_jobs',
'message' => "Failed jobs ({$queue['failed_today']}) exceeds threshold",
'severity' => 'critical',
];
}

return $alerts;
}

public function getHealthStatus(): array
{
$check = $this->check();

return [
'status' => $check['status'],
'queues' => count($check['queues']),
'alerts' => count($check['alerts']),
];
}
}

队列仪表板

仪表板数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
<?php

namespace App\Services\Queue;

class QueueDashboard
{
protected QueueMetrics $metrics;

public function __construct(QueueMetrics $metrics)
{
$this->metrics = $metrics;
}

public function getOverview(): array
{
$queues = $this->metrics->getAllQueuesStats();

return [
'total_queues' => count($queues),
'total_pending' => array_sum(array_column($queues, 'size')),
'total_processed_today' => array_sum(array_column($queues, 'processed_today')),
'total_failed_today' => array_sum(array_column($queues, 'failed_today')),
'queues' => $queues,
];
}

public function getTrend(string $queue, int $hours = 24): array
{
$trend = [];

for ($i = $hours; $i >= 0; $i--) {
$time = now()->subHours($i);
$hour = $time->format('H');
$date = $time->format('Ymd');

$trend[] = [
'hour' => $hour,
'processed' => Redis::get("queue:metrics:processed:{$queue}:{$date}:{$hour}") ?? 0,
'failed' => Redis::get("queue:metrics:failed:{$queue}:{$date}:{$hour}") ?? 0,
];
}

return $trend;
}

public function getTopJobs(int $limit = 10): array
{
return Redis::zrevrange('queue:metrics:jobs:frequency', 0, $limit - 1, ['WITHSCORES' => true]);
}

public function getSlowestJobs(int $limit = 10): array
{
return Redis::zrevrange('queue:metrics:jobs:duration', 0, $limit - 1, ['WITHSCORES' => true]);
}
}

队列告警

告警服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<?php

namespace App\Services\Queue;

use Illuminate\Support\Facades\Notification;

class QueueAlerter
{
protected QueueMonitor $monitor;

public function __construct(QueueMonitor $monitor)
{
$this->monitor = $monitor;
}

public function checkAndAlert(): void
{
$check = $this->monitor->check();

if ($check['status'] !== 'healthy') {
$this->sendAlerts($check['alerts']);
}
}

protected function sendAlerts(array $alerts): void
{
foreach ($alerts as $queue => $queueAlerts) {
foreach ($queueAlerts as $alert) {
$this->sendAlert($queue, $alert);
}
}
}

protected function sendAlert(string $queue, array $alert): void
{
$recipients = config('queue.monitoring.alert_recipients', []);

Notification::route('mail', $recipients)
->notify(new QueueAlertNotification($queue, $alert));
}
}

队列监控命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?php

namespace App\Console\Commands;

use App\Services\Queue\QueueMonitor;
use Illuminate\Console\Command;

class QueueMonitorCommand extends Command
{
protected $signature = 'queue:monitor {--alert : Send alerts if issues found}';
protected $description = 'Monitor queue health';

public function handle(QueueMonitor $monitor): int
{
$result = $monitor->check();

$this->info('=== Queue Monitor ===');
$this->newLine();

foreach ($result['queues'] as $queue) {
$status = $queue['size'] > 100 ? '<error>✗</error>' : '<info>✓</info>';
$this->line("{$status} {$queue['queue']}: {$queue['size']} pending, {$queue['processed_today']} processed");
}

if (!empty($result['alerts'])) {
$this->newLine();
$this->warn('Alerts:');

foreach ($result['alerts'] as $queue => $alerts) {
foreach ($alerts as $alert) {
$this->line(" - [{$queue}] {$alert['message']}");
}
}
}

return $result['status'] === 'healthy' ? self::SUCCESS : self::FAILURE;
}
}

总结

Laravel 13 的队列监控需要结合指标收集、实时监控、仪表板展示和告警机制来构建。通过完善的监控体系,可以及时发现队列问题,保障后台任务稳定运行。