Laravel 13 事件系统提供了强大的观察者模式实现,本文深入探讨高级用法。
事件系统架构
事件基类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| <?php
namespace App\Events;
use Illuminate\Broadcasting\InteractsWithSockets; use Illuminate\Foundation\Events\Dispatchable; use Illuminate\Queue\SerializesModels; use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
abstract class BaseEvent { use Dispatchable, InteractsWithSockets, SerializesModels; protected array $metadata = []; public function __construct() { $this->metadata = [ 'timestamp' => now()->toIso8601String(), 'correlation_id' => app('correlation.id'), 'user_id' => auth()->id(), ]; } public function getMetadata(): array { return $this->metadata; } public function addMetadata(string $key, mixed $value): self { $this->metadata[$key] = $value; return $this; } public function broadcastOn(): array { return []; } }
|
自定义事件
业务事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| <?php
namespace App\Events;
use App\Models\Order; use App\Models\User; use Illuminate\Broadcasting\Channel; use Illuminate\Broadcasting\PrivateChannel; use Illuminate\Broadcasting\PresenceChannel;
class OrderCreated extends BaseEvent implements ShouldBroadcast { public Order $order; public User $user; public function __construct(Order $order, User $user) { parent::__construct(); $this->order = $order; $this->user = $user; $this->addMetadata('order_id', $order->id); $this->addMetadata('order_number', $order->order_number); } public function broadcastOn(): array { return [ new PrivateChannel('orders.' . $this->user->id), new Channel('orders'), ]; } public function broadcastAs(): string { return 'order.created'; } public function broadcastWith(): array { return [ 'id' => $this->order->id, 'order_number' => $this->order->order_number, 'total' => $this->order->total, 'created_at' => $this->order->created_at->toIso8601String(), ]; } public function tags(): array { return [ 'order:' . $this->order->id, 'user:' . $this->user->id, ]; } }
class PaymentProcessed extends BaseEvent { public function __construct( public Order $order, public string $transactionId, public float $amount, public string $status ) { parent::__construct(); } public function tags(): array { return ['payment', 'order:' . $this->order->id]; } }
class UserRegistered extends BaseEvent { public function __construct( public User $user, public array $registrationData = [] ) { parent::__construct(); } public function tags(): array { return ['user:' . $this->user->id, 'registration']; } }
|
事件分发
事件分发器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| <?php
namespace App\Services\Events;
use Illuminate\Support\Facades\Event; use Illuminate\Support\Facades\Log;
class EventDispatcher { protected array $events = []; protected bool $logging = true; public function dispatch($event, array $payload = []): array { $eventName = is_object($event) ? get_class($event) : $event; if ($this->logging) { Log::channel('events')->info("Dispatching event: {$eventName}", [ 'event' => $eventName, 'payload' => $payload, ]); } $responses = Event::dispatch($event, $payload); $this->events[] = [ 'name' => $eventName, 'time' => now()->toIso8601String(), 'responses' => $responses, ]; return $responses; } public function dispatchIf(bool $condition, $event, array $payload = []): ?array { if ($condition) { return $this->dispatch($event, $payload); } return null; } public function dispatchUnless(bool $condition, $event, array $payload = []): ?array { if (!$condition) { return $this->dispatch($event, $payload); } return null; } public function forget(string $event): void { Event::forget($event); } public function hasListeners(string $event): bool { return Event::hasListeners($event); } public function getDispatchedEvents(): array { return $this->events; } public function flushDispatchedEvents(): void { $this->events = []; } }
|
条件事件
条件分发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| <?php
namespace App\Services\Events;
use App\Events\OrderCreated; use App\Events\PaymentProcessed; use App\Models\Order;
class ConditionalEventDispatcher { protected EventDispatcher $dispatcher; public function __construct(EventDispatcher $dispatcher) { $this->dispatcher = $dispatcher; } public function dispatchOrderEvents(Order $order): void { OrderCreated::dispatchIf( $order->wasRecentlyCreated, $order, $order->user ); PaymentProcessed::dispatchUnless( $order->payment_status === 'pending', $order, $order->transaction_id, $order->total, $order->payment_status ); } public function dispatchWithCondition(string $event, array $payload, callable $condition): ?array { if ($condition($payload)) { return $this->dispatcher->dispatch($event, $payload); } return null; } }
|
事件队列
异步事件处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| <?php
namespace App\Events;
use Illuminate\Broadcasting\InteractsWithSockets; use Illuminate\Foundation\Events\Dispatchable; use Illuminate\Queue\SerializesModels; use Illuminate\Contracts\Queue\ShouldQueue;
class AsyncEvent extends BaseEvent implements ShouldQueue { public string $queue = 'events'; public int $delay = 0; public int $tries = 3; public int $backoff = 60; public function __construct( public string $type, public array $data ) { parent::__construct(); } public function viaQueue(): string { return $this->queue; } public function retryUntil(): \DateTime { return now()->addHours(24); } public function failed(\Throwable $exception): void { \Log::error('Async event failed', [ 'type' => $this->type, 'data' => $this->data, 'error' => $exception->getMessage(), ]); } }
class BatchProcessEvent extends BaseEvent implements ShouldQueue { public string $queue = 'batch'; public int $timeout = 3600; public function __construct( public array $items, public string $processor ) { parent::__construct(); } }
|
事件聚合
批量事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| <?php
namespace App\Services\Events;
use Illuminate\Support\Collection;
class EventAggregator { protected Collection $events; protected int $batchSize = 100; public function __construct() { $this->events = collect(); } public function add($event, array $payload = []): self { $this->events->push([ 'event' => $event, 'payload' => $payload, ]); return $this; } public function flush(): array { $results = []; foreach ($this->events as $eventData) { $results[] = event($eventData['event'], $eventData['payload']); } $this->events = collect(); return $results; } public function flushBatch(): array { $results = []; $this->events->chunk($this->batchSize)->each(function ($batch) use (&$results) { foreach ($batch as $eventData) { $results[] = event($eventData['event'], $eventData['payload']); } }); $this->events = collect(); return $results; } public function count(): int { return $this->events->count(); } public function isEmpty(): bool { return $this->events->isEmpty(); } }
|
事件溯源
事件存储
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| <?php
namespace App\Services\Events;
use App\Models\StoredEvent; use Illuminate\Support\Str;
class EventStore { protected array $events = []; public function store($event): StoredEvent { $storedEvent = StoredEvent::create([ 'id' => Str::uuid(), 'type' => get_class($event), 'payload' => json_encode($event), 'metadata' => json_encode([ 'timestamp' => now()->toIso8601String(), 'user_id' => auth()->id(), 'ip' => request()->ip(), ]), 'created_at' => now(), ]); $this->events[] = $storedEvent; return $storedEvent; } public function getEvents(string $aggregateType, string $aggregateId): array { return StoredEvent::where('aggregate_type', $aggregateType) ->where('aggregate_id', $aggregateId) ->orderBy('created_at') ->get() ->toArray(); } public function replay(array $events): void { foreach ($events as $storedEvent) { $event = $this->reconstructEvent($storedEvent); event($event); } } protected function reconstructEvent(StoredEvent $storedEvent): object { $class = $storedEvent->type; $payload = json_decode($storedEvent->payload, true); return new $class(...$payload); } }
|
事件装饰器
事件增强
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| <?php
namespace App\Services\Events;
class EventDecorator { public static function withMetadata($event, array $metadata): object { if (method_exists($event, 'addMetadata')) { foreach ($metadata as $key => $value) { $event->addMetadata($key, $value); } } return $event; } public static function withTiming($event): object { return self::withMetadata($event, [ 'dispatched_at' => now()->toIso8601String(), ]); } public static function withUser($event): object { return self::withMetadata($event, [ 'user_id' => auth()->id(), 'user_email' => auth()->user()?->email, ]); } public static function withRequest($event): object { return self::withMetadata($event, [ 'request_id' => request()->header('X-Request-ID'), 'ip' => request()->ip(), 'url' => request()->fullUrl(), 'method' => request()->method(), ]); } public static function enrich($event): object { $event = self::withTiming($event); $event = self::withUser($event); $event = self::withRequest($event); return $event; } }
|
事件监控
事件监控器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| <?php
namespace App\Services\Events;
use Illuminate\Support\Facades\Log;
class EventMonitor { protected array $stats = []; public function recordDispatch(string $event, float $duration, array $listeners): void { if (!isset($this->stats[$event])) { $this->stats[$event] = [ 'count' => 0, 'total_duration' => 0, 'listeners' => count($listeners), ]; } $this->stats[$event]['count']++; $this->stats[$event]['total_duration'] += $duration; } public function getStats(): array { return array_map(function ($stat) { return [ 'count' => $stat['count'], 'avg_duration' => $stat['count'] > 0 ? $stat['total_duration'] / $stat['count'] : 0, 'listeners' => $stat['listeners'], ]; }, $this->stats); } public function getSlowEvents(float $threshold = 100): array { $stats = $this->getStats(); return array_filter($stats, fn($s) => $s['avg_duration'] > $threshold); } public function getMostFrequent(int $limit = 10): array { $stats = $this->getStats(); uasort($stats, fn($a, $b) => $b['count'] <=> $a['count']); return array_slice($stats, 0, $limit, true); } }
|
事件命令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| <?php
namespace App\Console\Commands;
use App\Services\Events\EventMonitor; use Illuminate\Console\Command;
class EventStatsCommand extends Command { protected $signature = 'event:stats {--clear : Clear stats after showing}'; protected $description = 'Show event statistics'; public function handle(EventMonitor $monitor): int { $stats = $monitor->getStats(); if (empty($stats)) { $this->info('No event statistics available'); return self::SUCCESS; } $this->info('=== Event Statistics ==='); $this->newLine(); $this->table( ['Event', 'Count', 'Avg Duration (ms)', 'Listeners'], collect($stats)->map(fn($s, $e) => [ class_basename($e), $s['count'], round($s['avg_duration'], 2), $s['listeners'], ]) ); $this->newLine(); $slow = $monitor->getSlowEvents(); if (!empty($slow)) { $this->warn('Slow Events (>100ms avg):'); foreach (array_keys($slow) as $event) { $this->line(" - {$event}"); } } return self::SUCCESS; } }
|
总结
Laravel 13 事件系统提供了强大的观察者模式实现。通过自定义事件、事件分发、条件事件、事件队列和事件溯源等技术,可以构建松耦合、可扩展的应用架构。