Laravel 13 事件系统提供了强大的观察者模式实现,本文深入探讨高级用法。

事件系统架构

事件基类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?php

namespace App\Events;

use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;

abstract class BaseEvent
{
use Dispatchable, InteractsWithSockets, SerializesModels;

protected array $metadata = [];

public function __construct()
{
$this->metadata = [
'timestamp' => now()->toIso8601String(),
'correlation_id' => app('correlation.id'),
'user_id' => auth()->id(),
];
}

public function getMetadata(): array
{
return $this->metadata;
}

public function addMetadata(string $key, mixed $value): self
{
$this->metadata[$key] = $value;
return $this;
}

public function broadcastOn(): array
{
return [];
}
}

自定义事件

业务事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
<?php

namespace App\Events;

use App\Models\Order;
use App\Models\User;
use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Broadcasting\PresenceChannel;

class OrderCreated extends BaseEvent implements ShouldBroadcast
{
public Order $order;
public User $user;

public function __construct(Order $order, User $user)
{
parent::__construct();

$this->order = $order;
$this->user = $user;

$this->addMetadata('order_id', $order->id);
$this->addMetadata('order_number', $order->order_number);
}

public function broadcastOn(): array
{
return [
new PrivateChannel('orders.' . $this->user->id),
new Channel('orders'),
];
}

public function broadcastAs(): string
{
return 'order.created';
}

public function broadcastWith(): array
{
return [
'id' => $this->order->id,
'order_number' => $this->order->order_number,
'total' => $this->order->total,
'created_at' => $this->order->created_at->toIso8601String(),
];
}

public function tags(): array
{
return [
'order:' . $this->order->id,
'user:' . $this->user->id,
];
}
}

class PaymentProcessed extends BaseEvent
{
public function __construct(
public Order $order,
public string $transactionId,
public float $amount,
public string $status
) {
parent::__construct();
}

public function tags(): array
{
return ['payment', 'order:' . $this->order->id];
}
}

class UserRegistered extends BaseEvent
{
public function __construct(
public User $user,
public array $registrationData = []
) {
parent::__construct();
}

public function tags(): array
{
return ['user:' . $this->user->id, 'registration'];
}
}

事件分发

事件分发器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
<?php

namespace App\Services\Events;

use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Log;

class EventDispatcher
{
protected array $events = [];
protected bool $logging = true;

public function dispatch($event, array $payload = []): array
{
$eventName = is_object($event) ? get_class($event) : $event;

if ($this->logging) {
Log::channel('events')->info("Dispatching event: {$eventName}", [
'event' => $eventName,
'payload' => $payload,
]);
}

$responses = Event::dispatch($event, $payload);

$this->events[] = [
'name' => $eventName,
'time' => now()->toIso8601String(),
'responses' => $responses,
];

return $responses;
}

public function dispatchIf(bool $condition, $event, array $payload = []): ?array
{
if ($condition) {
return $this->dispatch($event, $payload);
}

return null;
}

public function dispatchUnless(bool $condition, $event, array $payload = []): ?array
{
if (!$condition) {
return $this->dispatch($event, $payload);
}

return null;
}

public function forget(string $event): void
{
Event::forget($event);
}

public function hasListeners(string $event): bool
{
return Event::hasListeners($event);
}

public function getDispatchedEvents(): array
{
return $this->events;
}

public function flushDispatchedEvents(): void
{
$this->events = [];
}
}

条件事件

条件分发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?php

namespace App\Services\Events;

use App\Events\OrderCreated;
use App\Events\PaymentProcessed;
use App\Models\Order;

class ConditionalEventDispatcher
{
protected EventDispatcher $dispatcher;

public function __construct(EventDispatcher $dispatcher)
{
$this->dispatcher = $dispatcher;
}

public function dispatchOrderEvents(Order $order): void
{
OrderCreated::dispatchIf(
$order->wasRecentlyCreated,
$order,
$order->user
);

PaymentProcessed::dispatchUnless(
$order->payment_status === 'pending',
$order,
$order->transaction_id,
$order->total,
$order->payment_status
);
}

public function dispatchWithCondition(string $event, array $payload, callable $condition): ?array
{
if ($condition($payload)) {
return $this->dispatcher->dispatch($event, $payload);
}

return null;
}
}

事件队列

异步事件处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
<?php

namespace App\Events;

use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Contracts\Queue\ShouldQueue;

class AsyncEvent extends BaseEvent implements ShouldQueue
{
public string $queue = 'events';
public int $delay = 0;
public int $tries = 3;
public int $backoff = 60;

public function __construct(
public string $type,
public array $data
) {
parent::__construct();
}

public function viaQueue(): string
{
return $this->queue;
}

public function retryUntil(): \DateTime
{
return now()->addHours(24);
}

public function failed(\Throwable $exception): void
{
\Log::error('Async event failed', [
'type' => $this->type,
'data' => $this->data,
'error' => $exception->getMessage(),
]);
}
}

class BatchProcessEvent extends BaseEvent implements ShouldQueue
{
public string $queue = 'batch';
public int $timeout = 3600;

public function __construct(
public array $items,
public string $processor
) {
parent::__construct();
}
}

事件聚合

批量事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
<?php

namespace App\Services\Events;

use Illuminate\Support\Collection;

class EventAggregator
{
protected Collection $events;
protected int $batchSize = 100;

public function __construct()
{
$this->events = collect();
}

public function add($event, array $payload = []): self
{
$this->events->push([
'event' => $event,
'payload' => $payload,
]);

return $this;
}

public function flush(): array
{
$results = [];

foreach ($this->events as $eventData) {
$results[] = event($eventData['event'], $eventData['payload']);
}

$this->events = collect();

return $results;
}

public function flushBatch(): array
{
$results = [];

$this->events->chunk($this->batchSize)->each(function ($batch) use (&$results) {
foreach ($batch as $eventData) {
$results[] = event($eventData['event'], $eventData['payload']);
}
});

$this->events = collect();

return $results;
}

public function count(): int
{
return $this->events->count();
}

public function isEmpty(): bool
{
return $this->events->isEmpty();
}
}

事件溯源

事件存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
<?php

namespace App\Services\Events;

use App\Models\StoredEvent;
use Illuminate\Support\Str;

class EventStore
{
protected array $events = [];

public function store($event): StoredEvent
{
$storedEvent = StoredEvent::create([
'id' => Str::uuid(),
'type' => get_class($event),
'payload' => json_encode($event),
'metadata' => json_encode([
'timestamp' => now()->toIso8601String(),
'user_id' => auth()->id(),
'ip' => request()->ip(),
]),
'created_at' => now(),
]);

$this->events[] = $storedEvent;

return $storedEvent;
}

public function getEvents(string $aggregateType, string $aggregateId): array
{
return StoredEvent::where('aggregate_type', $aggregateType)
->where('aggregate_id', $aggregateId)
->orderBy('created_at')
->get()
->toArray();
}

public function replay(array $events): void
{
foreach ($events as $storedEvent) {
$event = $this->reconstructEvent($storedEvent);
event($event);
}
}

protected function reconstructEvent(StoredEvent $storedEvent): object
{
$class = $storedEvent->type;
$payload = json_decode($storedEvent->payload, true);

return new $class(...$payload);
}
}

事件装饰器

事件增强

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<?php

namespace App\Services\Events;

class EventDecorator
{
public static function withMetadata($event, array $metadata): object
{
if (method_exists($event, 'addMetadata')) {
foreach ($metadata as $key => $value) {
$event->addMetadata($key, $value);
}
}

return $event;
}

public static function withTiming($event): object
{
return self::withMetadata($event, [
'dispatched_at' => now()->toIso8601String(),
]);
}

public static function withUser($event): object
{
return self::withMetadata($event, [
'user_id' => auth()->id(),
'user_email' => auth()->user()?->email,
]);
}

public static function withRequest($event): object
{
return self::withMetadata($event, [
'request_id' => request()->header('X-Request-ID'),
'ip' => request()->ip(),
'url' => request()->fullUrl(),
'method' => request()->method(),
]);
}

public static function enrich($event): object
{
$event = self::withTiming($event);
$event = self::withUser($event);
$event = self::withRequest($event);

return $event;
}
}

事件监控

事件监控器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<?php

namespace App\Services\Events;

use Illuminate\Support\Facades\Log;

class EventMonitor
{
protected array $stats = [];

public function recordDispatch(string $event, float $duration, array $listeners): void
{
if (!isset($this->stats[$event])) {
$this->stats[$event] = [
'count' => 0,
'total_duration' => 0,
'listeners' => count($listeners),
];
}

$this->stats[$event]['count']++;
$this->stats[$event]['total_duration'] += $duration;
}

public function getStats(): array
{
return array_map(function ($stat) {
return [
'count' => $stat['count'],
'avg_duration' => $stat['count'] > 0
? $stat['total_duration'] / $stat['count']
: 0,
'listeners' => $stat['listeners'],
];
}, $this->stats);
}

public function getSlowEvents(float $threshold = 100): array
{
$stats = $this->getStats();

return array_filter($stats, fn($s) => $s['avg_duration'] > $threshold);
}

public function getMostFrequent(int $limit = 10): array
{
$stats = $this->getStats();

uasort($stats, fn($a, $b) => $b['count'] <=> $a['count']);

return array_slice($stats, 0, $limit, true);
}
}

事件命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<?php

namespace App\Console\Commands;

use App\Services\Events\EventMonitor;
use Illuminate\Console\Command;

class EventStatsCommand extends Command
{
protected $signature = 'event:stats {--clear : Clear stats after showing}';
protected $description = 'Show event statistics';

public function handle(EventMonitor $monitor): int
{
$stats = $monitor->getStats();

if (empty($stats)) {
$this->info('No event statistics available');
return self::SUCCESS;
}

$this->info('=== Event Statistics ===');
$this->newLine();

$this->table(
['Event', 'Count', 'Avg Duration (ms)', 'Listeners'],
collect($stats)->map(fn($s, $e) => [
class_basename($e),
$s['count'],
round($s['avg_duration'], 2),
$s['listeners'],
])
);

$this->newLine();

$slow = $monitor->getSlowEvents();

if (!empty($slow)) {
$this->warn('Slow Events (>100ms avg):');
foreach (array_keys($slow) as $event) {
$this->line(" - {$event}");
}
}

return self::SUCCESS;
}
}

总结

Laravel 13 事件系统提供了强大的观察者模式实现。通过自定义事件、事件分发、条件事件、事件队列和事件溯源等技术,可以构建松耦合、可扩展的应用架构。