Laravel 13 队列监控是保障后台任务稳定运行的关键,本文介绍高级队列监控技术。
队列监控架构
监控配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| <?php
return [ 'queue' => [ 'monitoring' => [ 'enabled' => env('QUEUE_MONITORING_ENABLED', true), 'metrics' => [ 'jobs_processed' => true, 'jobs_failed' => true, 'queue_size' => true, 'wait_time' => true, 'throughput' => true, ], 'alerts' => [ 'queue_size_threshold' => env('QUEUE_SIZE_THRESHOLD', 1000), 'failed_jobs_threshold' => env('FAILED_JOBS_THRESHOLD', 10), 'wait_time_threshold' => env('QUEUE_WAIT_THRESHOLD', 300), ], 'retention' => [ 'metrics_days' => 30, 'logs_days' => 7, ], ], ], ];
|
队列指标收集
指标服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
| <?php
namespace App\Services\Queue;
use Illuminate\Support\Facades\Redis;
class QueueMetrics { protected string $prefix = 'queue:metrics:'; public function recordJobProcessed(string $queue, float $duration): void { $date = now()->format('Ymd'); Redis::incr($this->prefix . "processed:{$queue}:{$date}"); Redis::incr($this->prefix . "processed:{$queue}:total"); Redis::lpush($this->prefix . "duration:{$queue}:{$date}", $duration); Redis::ltrim($this->prefix . "duration:{$queue}:{$date}", 0, 999); } public function recordJobFailed(string $queue, string $exception): void { $date = now()->format('Ymd'); Redis::incr($this->prefix . "failed:{$queue}:{$date}"); Redis::incr($this->prefix . "failed:{$queue}:total"); Redis::lpush($this->prefix . "errors:{$queue}:{$date}", json_encode([ 'exception' => $exception, 'time' => now()->toIso8601String(), ])); } public function getQueueSize(string $queue): int { return Redis::llen('queues:' . $queue); } public function getProcessedCount(string $queue, int $days = 1): int { $total = 0; for ($i = 0; $i < $days; $i++) { $date = now()->subDays($i)->format('Ymd'); $total += Redis::get($this->prefix . "processed:{$queue}:{$date}") ?? 0; } return $total; } public function getFailedCount(string $queue, int $days = 1): int { $total = 0; for ($i = 0; $i < $days; $i++) { $date = now()->subDays($i)->format('Ymd'); $total += Redis::get($this->prefix . "failed:{$queue}:{$date}") ?? 0; } return $total; } public function getAverageDuration(string $queue): float { $date = now()->format('Ymd'); $durations = Redis::lrange($this->prefix . "duration:{$queue}:{$date}", 0, 99); if (empty($durations)) { return 0.0; } return array_sum($durations) / count($durations); } public function getThroughput(string $queue, int $minutes = 60): float { $processed = $this->getProcessedCount($queue, 1); return $processed / ($minutes / 60); } public function getQueueStats(string $queue): array { return [ 'queue' => $queue, 'size' => $this->getQueueSize($queue), 'processed_today' => $this->getProcessedCount($queue, 1), 'failed_today' => $this->getFailedCount($queue, 1), 'avg_duration' => $this->getAverageDuration($queue), 'throughput' => $this->getThroughput($queue), ]; } public function getAllQueuesStats(): array { $queues = config('queue.connections.redis.queues', ['default']); return array_map(fn($q) => $this->getQueueStats($q), $queues); } }
|
队列监控器
实时监控
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| <?php
namespace App\Services\Queue;
use Illuminate\Support\Facades\Log;
class QueueMonitor { protected QueueMetrics $metrics; protected array $thresholds; public function __construct(QueueMetrics $metrics) { $this->metrics = $metrics; $this->thresholds = config('queue.monitoring.alerts', []); } public function check(): array { $queues = $this->metrics->getAllQueuesStats(); $alerts = []; foreach ($queues as $queue) { $queueAlerts = $this->checkQueue($queue); if (!empty($queueAlerts)) { $alerts[$queue['queue']] = $queueAlerts; } } return [ 'status' => empty($alerts) ? 'healthy' : 'warning', 'queues' => $queues, 'alerts' => $alerts, 'checked_at' => now()->toIso8601String(), ]; } protected function checkQueue(array $queue): array { $alerts = []; if ($queue['size'] > ($this->thresholds['queue_size_threshold'] ?? 1000)) { $alerts[] = [ 'type' => 'queue_size', 'message' => "Queue size ({$queue['size']}) exceeds threshold", 'severity' => 'high', ]; } if ($queue['failed_today'] > ($this->thresholds['failed_jobs_threshold'] ?? 10)) { $alerts[] = [ 'type' => 'failed_jobs', 'message' => "Failed jobs ({$queue['failed_today']}) exceeds threshold", 'severity' => 'critical', ]; } return $alerts; } public function getHealthStatus(): array { $check = $this->check(); return [ 'status' => $check['status'], 'queues' => count($check['queues']), 'alerts' => count($check['alerts']), ]; } }
|
队列仪表板
仪表板数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| <?php
namespace App\Services\Queue;
class QueueDashboard { protected QueueMetrics $metrics; public function __construct(QueueMetrics $metrics) { $this->metrics = $metrics; } public function getOverview(): array { $queues = $this->metrics->getAllQueuesStats(); return [ 'total_queues' => count($queues), 'total_pending' => array_sum(array_column($queues, 'size')), 'total_processed_today' => array_sum(array_column($queues, 'processed_today')), 'total_failed_today' => array_sum(array_column($queues, 'failed_today')), 'queues' => $queues, ]; } public function getTrend(string $queue, int $hours = 24): array { $trend = []; for ($i = $hours; $i >= 0; $i--) { $time = now()->subHours($i); $hour = $time->format('H'); $date = $time->format('Ymd'); $trend[] = [ 'hour' => $hour, 'processed' => Redis::get("queue:metrics:processed:{$queue}:{$date}:{$hour}") ?? 0, 'failed' => Redis::get("queue:metrics:failed:{$queue}:{$date}:{$hour}") ?? 0, ]; } return $trend; } public function getTopJobs(int $limit = 10): array { return Redis::zrevrange('queue:metrics:jobs:frequency', 0, $limit - 1, ['WITHSCORES' => true]); } public function getSlowestJobs(int $limit = 10): array { return Redis::zrevrange('queue:metrics:jobs:duration', 0, $limit - 1, ['WITHSCORES' => true]); } }
|
队列告警
告警服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| <?php
namespace App\Services\Queue;
use Illuminate\Support\Facades\Notification;
class QueueAlerter { protected QueueMonitor $monitor; public function __construct(QueueMonitor $monitor) { $this->monitor = $monitor; } public function checkAndAlert(): void { $check = $this->monitor->check(); if ($check['status'] !== 'healthy') { $this->sendAlerts($check['alerts']); } } protected function sendAlerts(array $alerts): void { foreach ($alerts as $queue => $queueAlerts) { foreach ($queueAlerts as $alert) { $this->sendAlert($queue, $alert); } } } protected function sendAlert(string $queue, array $alert): void { $recipients = config('queue.monitoring.alert_recipients', []); Notification::route('mail', $recipients) ->notify(new QueueAlertNotification($queue, $alert)); } }
|
队列监控命令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| <?php
namespace App\Console\Commands;
use App\Services\Queue\QueueMonitor; use Illuminate\Console\Command;
class QueueMonitorCommand extends Command { protected $signature = 'queue:monitor {--alert : Send alerts if issues found}'; protected $description = 'Monitor queue health'; public function handle(QueueMonitor $monitor): int { $result = $monitor->check(); $this->info('=== Queue Monitor ==='); $this->newLine(); foreach ($result['queues'] as $queue) { $status = $queue['size'] > 100 ? '<error>✗</error>' : '<info>✓</info>'; $this->line("{$status} {$queue['queue']}: {$queue['size']} pending, {$queue['processed_today']} processed"); } if (!empty($result['alerts'])) { $this->newLine(); $this->warn('Alerts:'); foreach ($result['alerts'] as $queue => $alerts) { foreach ($alerts as $alert) { $this->line(" - [{$queue}] {$alert['message']}"); } } } return $result['status'] === 'healthy' ? self::SUCCESS : self::FAILURE; } }
|
总结
Laravel 13 的队列监控需要结合指标收集、实时监控、仪表板展示和告警机制来构建。通过完善的监控体系,可以及时发现队列问题,保障后台任务稳定运行。