Laravel 13 异步任务监控完全指南

监控异步任务对于保证系统稳定性至关重要。本文将深入探讨 Laravel 13 中异步任务监控的各种方法和最佳实践。

Horizon 监控

安装配置

1
2
composer require laravel/horizon
php artisan horizon:install

Horizon 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// config/horizon.php
return [
'domain' => env('HORIZON_DOMAIN'),
'path' => 'horizon',
'use' => 'default',
'prefix' => env('HORIZON_PREFIX', 'horizon:'),
'middleware' => ['web'],
'waits' => [
'redis:default' => 60,
],
'trim' => [
'recent' => 60,
'pending' => 60,
'completed' => 60,
'recent_failed' => 60,
'failed' => 10080,
'monitored' => 10080,
],
'silenced' => [],
'metrics' => [
'snapshots' => [
'jobs_per_minute' => true,
'wait_times' => true,
],
],
'fast_termination' => false,
'memory_limit' => 64,
];

Supervisor 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// config/horizon.php
'environments' => [
'production' => [
'supervisor-default' => [
'connection' => 'redis',
'queue' => ['default'],
'balance' => 'auto',
'autoScalingStrategy' => 'time',
'maxProcesses' => 10,
'maxTime' => 0,
'maxJobs' => 0,
'memory' => 128,
'tries' => 3,
'timeout' => 60,
'nice' => 0,
],
'supervisor-high' => [
'connection' => 'redis',
'queue' => ['high'],
'balance' => 'simple',
'processes' => 3,
'tries' => 3,
'timeout' => 60,
],
],
],

自定义监控

任务监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
<?php

namespace App\Services;

use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Cache;

class QueueMonitorService
{
protected string $statsKey = 'queue:stats';

public function getStats(): array
{
return [
'queues' => $this->getQueueStats(),
'workers' => $this->getWorkerStats(),
'failed' => $this->getFailedStats(),
'throughput' => $this->getThroughput(),
];
}

protected function getQueueStats(): array
{
$queues = ['high', 'default', 'low'];
$stats = [];

foreach ($queues as $queue) {
$stats[$queue] = [
'pending' => DB::table('jobs')->where('queue', $queue)->count(),
'delayed' => DB::table('jobs')
->where('queue', $queue)
->where('available_at', '>', now()->timestamp)
->count(),
];
}

return $stats;
}

protected function getWorkerStats(): array
{
return Cache::get("{$this->statsKey}:workers", [
'active' => 0,
'idle' => 0,
'total' => 0,
]);
}

protected function getFailedStats(): array
{
$hourAgo = now()->subHour();

return [
'total' => DB::table('failed_jobs')->count(),
'last_hour' => DB::table('failed_jobs')
->where('failed_at', '>=', $hourAgo)
->count(),
];
}

protected function getThroughput(): array
{
return Cache::get("{$this->statsKey}:throughput", [
'jobs_per_minute' => 0,
'jobs_per_hour' => 0,
'avg_wait_time' => 0,
]);
}

public function recordJobProcessed(string $queue, float $duration): void
{
$key = "{$this->statsKey}:throughput";

$stats = Cache::get($key, [
'jobs_per_minute' => 0,
'jobs_per_hour' => 0,
'avg_wait_time' => 0,
'total_duration' => 0,
'job_count' => 0,
]);

$stats['job_count']++;
$stats['total_duration'] += $duration;
$stats['avg_wait_time'] = $stats['total_duration'] / $stats['job_count'];

Cache::put($key, $stats, 3600);
}
}

任务追踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
<?php

namespace App\Services;

use App\Models\JobTracking;
use Illuminate\Support\Str;

class JobTracker
{
protected string $jobId;

public function start(string $jobClass, array $payload = []): string
{
$this->jobId = Str::uuid()->toString();

JobTracking::create([
'id' => $this->jobId,
'job_class' => $jobClass,
'payload' => $payload,
'status' => 'pending',
'started_at' => now(),
]);

return $this->jobId;
}

public function processing(): void
{
JobTracking::where('id', $this->jobId)->update([
'status' => 'processing',
'processing_at' => now(),
]);
}

public function complete(array $result = []): void
{
JobTracking::where('id', $this->jobId)->update([
'status' => 'completed',
'completed_at' => now(),
'result' => $result,
]);
}

public function fail(string $error): void
{
JobTracking::where('id', $this->jobId)->update([
'status' => 'failed',
'failed_at' => now(),
'error' => $error,
]);
}

public function getStatus(): ?array
{
return JobTracking::where('id', $this->jobId)->first()?->toArray();
}
}

任务事件监听

事件订阅器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
<?php

namespace App\Listeners;

use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\Facades\Log;
use App\Services\QueueMonitorService;

class QueueEventSubscriber
{
public function __construct(
protected QueueMonitorService $monitor
) {}

public function handleJobProcessing(JobProcessing $event): void
{
$this->monitor->recordJobStarted([
'job' => $event->job->getName(),
'queue' => $event->job->getQueue(),
'attempts' => $event->job->attempts(),
]);

Log::info('Job started', [
'job' => $event->job->getName(),
'queue' => $event->job->getQueue(),
]);
}

public function handleJobProcessed(JobProcessed $event): void
{
$duration = microtime(true) - LARAVEL_START;

$this->monitor->recordJobProcessed(
$event->job->getQueue(),
$duration
);

Log::info('Job completed', [
'job' => $event->job->getName(),
'duration' => round($duration * 1000, 2) . 'ms',
]);
}

public function handleJobFailed(JobFailed $event): void
{
$this->monitor->recordJobFailed([
'job' => $event->job->getName(),
'queue' => $event->job->getQueue(),
'error' => $event->exception->getMessage(),
]);

Log::error('Job failed', [
'job' => $event->job->getName(),
'error' => $event->exception->getMessage(),
'trace' => $event->exception->getTraceAsString(),
]);
}

public function subscribe($events): void
{
$events->listen(
JobProcessing::class,
[self::class, 'handleJobProcessing']
);

$events->listen(
JobProcessed::class,
[self::class, 'handleJobProcessed']
);

$events->listen(
JobFailed::class,
[self::class, 'handleJobFailed']
);
}
}

告警系统

告警服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
<?php

namespace App\Services;

use Illuminate\Support\Facades\Notification;
use App\Notifications\QueueAlert;

class QueueAlertService
{
protected array $thresholds = [
'failed_jobs_hour' => 10,
'queue_size' => 1000,
'wait_time' => 300,
'worker_down' => true,
];

public function check(): array
{
$alerts = [];

if ($this->checkFailedJobs()) {
$alerts[] = 'failed_jobs_high';
}

if ($this->checkQueueSize()) {
$alerts[] = 'queue_size_high';
}

if ($this->checkWaitTime()) {
$alerts[] = 'wait_time_high';
}

if ($this->checkWorkers()) {
$alerts[] = 'workers_down';
}

if (!empty($alerts)) {
$this->sendAlerts($alerts);
}

return $alerts;
}

protected function checkFailedJobs(): bool
{
$count = DB::table('failed_jobs')
->where('failed_at', '>=', now()->subHour())
->count();

return $count >= $this->thresholds['failed_jobs_hour'];
}

protected function checkQueueSize(): bool
{
$count = DB::table('jobs')->count();

return $count >= $this->thresholds['queue_size'];
}

protected function checkWaitTime(): bool
{
$oldestJob = DB::table('jobs')
->orderBy('created_at')
->first();

if (!$oldestJob) {
return false;
}

$waitTime = now()->diffInSeconds($oldestJob->created_at);

return $waitTime >= $this->thresholds['wait_time'];
}

protected function checkWorkers(): bool
{
$lastHeartbeat = Cache::get('workers:heartbeat');

if (!$lastHeartbeat) {
return true;
}

return now()->diffInSeconds($lastHeartbeat) > 60;
}

protected function sendAlerts(array $alerts): void
{
Notification::route('mail', config('alerts.email'))
->route('slack', config('alerts.slack_webhook'))
->notify(new QueueAlert($alerts));
}
}

健康检查

队列健康检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
<?php

namespace App\Services;

use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Cache;

class QueueHealthCheck
{
public function check(): array
{
return [
'status' => $this->determineStatus(),
'checks' => [
'database' => $this->checkDatabase(),
'redis' => $this->checkRedis(),
'workers' => $this->checkWorkers(),
'queues' => $this->checkQueues(),
],
'metrics' => [
'pending_jobs' => $this->getPendingJobs(),
'failed_jobs_24h' => $this->getFailedJobs24h(),
'processed_jobs_24h' => $this->getProcessedJobs24h(),
],
];
}

protected function checkDatabase(): array
{
try {
DB::connection()->getPdo();
return ['status' => 'healthy', 'message' => 'Connected'];
} catch (\Exception $e) {
return ['status' => 'unhealthy', 'message' => $e->getMessage()];
}
}

protected function checkRedis(): array
{
try {
Cache::put('health_check', true, 10);
return ['status' => 'healthy', 'message' => 'Connected'];
} catch (\Exception $e) {
return ['status' => 'unhealthy', 'message' => $e->getMessage()];
}
}

protected function checkWorkers(): array
{
$lastHeartbeat = Cache::get('workers:heartbeat');
$active = $lastHeartbeat && now()->diffInSeconds($lastHeartbeat) < 60;

return [
'status' => $active ? 'healthy' : 'unhealthy',
'message' => $active ? 'Workers active' : 'Workers inactive',
'last_heartbeat' => $lastHeartbeat,
];
}

protected function checkQueues(): array
{
$queues = ['high', 'default', 'low'];
$result = [];

foreach ($queues as $queue) {
$size = DB::table('jobs')->where('queue', $queue)->count();
$result[$queue] = [
'size' => $size,
'status' => $size < 1000 ? 'healthy' : 'warning',
];
}

return $result;
}

protected function getPendingJobs(): int
{
return DB::table('jobs')->count();
}

protected function getFailedJobs24h(): int
{
return DB::table('failed_jobs')
->where('failed_at', '>=', now()->subDay())
->count();
}

protected function getProcessedJobs24h(): int
{
return Cache::get('queue:processed_24h', 0);
}

protected function determineStatus(): string
{
$checks = [
$this->checkDatabase(),
$this->checkRedis(),
$this->checkWorkers(),
];

foreach ($checks as $check) {
if ($check['status'] === 'unhealthy') {
return 'unhealthy';
}
}

return 'healthy';
}
}

总结

Laravel 13 的异步任务监控提供了:

  • Horizon 可视化监控面板
  • 自定义监控服务
  • 任务追踪系统
  • 事件监听机制
  • 告警通知系统
  • 健康检查端点

完善的监控系统是保证队列任务稳定运行的基础。