Laravel 13 延迟任务完全指南

延迟任务是处理定时执行需求的重要机制。本文将深入探讨 Laravel 13 中延迟任务的各种实现方法和最佳实践。

延迟任务基础

延迟分发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use App\Jobs\SendReminder;
use App\Jobs\ProcessSubscription;

// 延迟指定时间
SendReminder::dispatch($user)->delay(now()->addMinutes(5));

// 延迟到特定时间
ProcessSubscription::dispatch($subscription)->delay(
$subscription->next_billing_date
);

// 使用 Carbon
SendReminder::dispatch($user)->delay(
now()->addHours(2)->addMinutes(30)
);

任务类延迟配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;

class SendReminder implements ShouldQueue
{
use Queueable;

public function __construct(
protected $user,
protected int $delayMinutes = 5
) {
$this->delay(now()->addMinutes($this->delayMinutes));
}

public function handle(): void
{
// 发送提醒
}
}

延迟任务服务

延迟任务管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?php

namespace App\Services;

use Illuminate\Support\Facades\Queue;
use App\Jobs\SendReminder;

class DelayedTaskService
{
public function schedule(string $jobClass, array $payload, \DateTimeInterface $executeAt): string
{
$job = new $jobClass(...$payload);
$job->delay($executeAt);

dispatch($job);

return $job->jobId ?? uniqid();
}

public function scheduleReminder($user, \DateTimeInterface $remindAt): string
{
return $this->schedule(SendReminder::class, [$user], $remindAt);
}

public function scheduleRecurring(string $jobClass, array $payload, string $cron): void
{
// 存储周期任务配置
ScheduledTask::create([
'job_class' => $jobClass,
'payload' => $payload,
'cron' => $cron,
'next_run' => $this->calculateNextRun($cron),
]);
}

protected function calculateNextRun(string $cron): \DateTimeInterface
{
return \Cron\CronExpression::factory($cron)->getNextRunDate();
}
}

延迟任务存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<?php

namespace App\Services;

use App\Models\ScheduledTask;
use Illuminate\Support\Facades\Bus;

class TaskScheduler
{
public function schedule(array $config): ScheduledTask
{
return ScheduledTask::create([
'name' => $config['name'],
'job_class' => $config['job'],
'payload' => $config['payload'] ?? [],
'execute_at' => $config['execute_at'],
'repeat' => $config['repeat'] ?? false,
'repeat_interval' => $config['repeat_interval'] ?? null,
'status' => 'pending',
]);
}

public function processPending(): void
{
$tasks = ScheduledTask::where('status', 'pending')
->where('execute_at', '<=', now())
->get();

foreach ($tasks as $task) {
$this->executeTask($task);
}
}

protected function executeTask(ScheduledTask $task): void
{
$jobClass = $task->job_class;
$payload = $task->payload;

dispatch(new $jobClass(...$payload));

if ($task->repeat && $task->repeat_interval) {
$task->update([
'execute_at' => now()->add($task->repeat_interval),
]);
} else {
$task->update(['status' => 'completed']);
}
}

public function cancel(int $taskId): bool
{
return ScheduledTask::where('id', $taskId)
->where('status', 'pending')
->update(['status' => 'cancelled']) > 0;
}
}

延迟队列监控

延迟任务监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<?php

namespace App\Services;

use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Cache;

class DelayedTaskMonitor
{
public function getStats(): array
{
return [
'pending' => $this->getPendingCount(),
'upcoming' => $this->getUpcomingTasks(),
'delayed_jobs' => $this->getDelayedJobs(),
];
}

protected function getPendingCount(): int
{
return DB::table('jobs')
->where('available_at', '>', now()->timestamp)
->count();
}

protected function getUpcomingTasks(int $hours = 24): array
{
return DB::table('jobs')
->where('available_at', '>', now()->timestamp)
->where('available_at', '<=', now()->addHours($hours)->timestamp)
->selectRaw('queue, COUNT(*) as count, MIN(available_at) as earliest')
->groupBy('queue')
->get()
->toArray();
}

protected function getDelayedJobs(): array
{
return DB::table('jobs')
->where('available_at', '>', now()->timestamp)
->orderBy('available_at')
->limit(10)
->get(['id', 'queue', 'available_at', 'created_at'])
->toArray();
}
}

延迟任务取消

取消服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<?php

namespace App\Services;

use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Redis;

class DelayedTaskCancellation
{
public function cancelByPayload(string $key, mixed $value): int
{
$jobs = DB::table('jobs')
->where('available_at', '>', now()->timestamp)
->get();

$cancelled = 0;

foreach ($jobs as $job) {
$payload = json_decode($job->payload, true);

if (data_get($payload, $key) === $value) {
DB::table('jobs')->where('id', $job->id)->delete();
$cancelled++;
}
}

return $cancelled;
}

public function cancelByTag(string $tag): int
{
return $this->cancelByPayload('tags.' . $tag, true);
}

public function cancelByUser(int $userId): int
{
return $this->cancelByPayload('data.userId', $userId);
}
}

延迟任务最佳实践

唯一延迟任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class SendReminder implements ShouldQueue, ShouldBeUnique
{
use Queueable;

public int $uniqueId;

public function __construct(
protected $user,
protected string $type = 'general'
) {
$this->uniqueId = "reminder:{$user->id}:{$type}";
}

public function uniqueFor(): int
{
return 3600; // 1小时内唯一
}
}

条件延迟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?php

namespace App\Services;

class ConditionalDelayService
{
public function dispatchWithDelay($job, \DateTimeInterface $executeAt, array $conditions = []): void
{
if ($this->conditionsMet($conditions)) {
dispatch($job)->delay($executeAt);
}
}

protected function conditionsMet(array $conditions): bool
{
foreach ($conditions as $key => $value) {
if (!$this->checkCondition($key, $value)) {
return false;
}
}

return true;
}

protected function checkCondition(string $key, mixed $value): bool
{
return match ($key) {
'user_active' => User::find($value)?->is_active ?? false,
'subscription_active' => Subscription::find($value)?->is_active ?? false,
default => true,
};
}
}

延迟任务测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?php

namespace Tests\Feature;

use Tests\TestCase;
use App\Jobs\SendReminder;
use Illuminate\Support\Facades\Queue;

class DelayedTaskTest extends TestCase
{
public function test_job_is_delayed_correctly()
{
Queue::fake();

$user = User::factory()->create();
$delayUntil = now()->addMinutes(5);

SendReminder::dispatch($user)->delay($delayUntil);

Queue::assertPushed(SendReminder::class, function ($job) use ($delayUntil) {
return $job->delay == $delayUntil;
});
}
}

总结

Laravel 13 的延迟任务提供了:

  • 灵活的延迟时间设置
  • 延迟任务管理器
  • 延迟任务存储
  • 任务取消功能
  • 唯一任务支持
  • 条件延迟执行

合理使用延迟任务可以优化任务调度,提高系统效率。