Laravel 13 管道模式深度解析

管道模式是 Laravel 框架中最优雅的设计模式之一,广泛应用于中间件、请求处理等核心功能。本文将深入探讨 Laravel 13 中管道模式的高级用法。

管道模式基础

什么是管道模式

管道模式允许数据通过一系列处理阶段流动,每个阶段可以对数据进行处理、修改或过滤。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<?php

namespace App\Pipelines;

use Closure;

class Pipeline
{
protected array $pipes = [];
protected mixed $passable;

public function send(mixed $passable): self
{
$this->passable = $passable;
return $this;
}

public function through(array $pipes): self
{
$this->pipes = $pipes;
return $this;
}

public function then(Closure $destination): mixed
{
$pipeline = array_reduce(
array_reverse($this->pipes),
$this->carry(),
$this->prepareDestination($destination)
);

return $pipeline($this->passable);
}

protected function carry(): Closure
{
return function ($stack, $pipe) {
return function ($passable) use ($stack, $pipe) {
if (is_callable($pipe)) {
return $pipe($passable, $stack);
}

$pipe = app($pipe);

return $pipe->handle($passable, $stack);
};
};
}

protected function prepareDestination(Closure $destination): Closure
{
return function ($passable) use ($destination) {
return $destination($passable);
};
}
}

Laravel 内置管道

使用 Illuminate\Pipeline\Pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?php

namespace App\Services;

use Illuminate\Pipeline\Pipeline;
use App\Pipelines\User\ValidateUserData;
use App\Pipelines\User\NormalizeEmail;
use App\Pipelines\User\HashPassword;
use App\Pipelines\User\AssignDefaultRole;

class UserRegistrationService
{
public function register(array $userData): User
{
return app(Pipeline::class)
->send($userData)
->through([
ValidateUserData::class,
NormalizeEmail::class,
HashPassword::class,
AssignDefaultRole::class,
])
->then(function ($data) {
return User::create($data);
});
}
}

管道处理类示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php

namespace App\Pipelines\User;

use Closure;

class ValidateUserData
{
public function handle(array $data, Closure $next): mixed
{
$validator = validator($data, [
'name' => 'required|string|max:255',
'email' => 'required|email|unique:users',
'password' => 'required|string|min:8',
]);

if ($validator->fails()) {
throw new ValidationException($validator);
}

return $next($data);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<?php

namespace App\Pipelines\User;

use Closure;

class NormalizeEmail
{
public function handle(array $data, Closure $next): mixed
{
$data['email'] = strtolower(trim($data['email']));

return $next($data);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<?php

namespace App\Pipelines\User;

use Closure;
use Illuminate\Support\Facades\Hash;

class HashPassword
{
public function handle(array $data, Closure $next): mixed
{
if (isset($data['password'])) {
$data['password'] = Hash::make($data['password']);
}

return $next($data);
}
}

高级管道用法

条件管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<?php

namespace App\Pipelines\Order;

use Closure;

class ApplyDiscount
{
public function __construct(
protected bool $hasDiscount = true
) {}

public function handle(array $order, Closure $next): mixed
{
if ($this->hasDiscount && $order['total'] > 100) {
$order['discount'] = $order['total'] * 0.1;
$order['total'] -= $order['discount'];
}

return $next($order);
}
}

$order = app(Pipeline::class)
->send($orderData)
->through([
new ApplyDiscount($user->hasDiscount()),
CalculateTax::class,
ProcessPayment::class,
])
->then(fn($order) => Order::create($order));

管道参数传递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?php

namespace App\Pipelines\Content;

use Closure;

class ContentFilter
{
protected array $config;

public function __construct(array $config = [])
{
$this->config = $config;
}

public function handle(string $content, Closure $next): string
{
if ($this->config['strip_tags'] ?? false) {
$content = strip_tags($content, $this->config['allowed_tags'] ?? '');
}

if ($this->config['trim'] ?? true) {
$content = trim($content);
}

return $next($content);
}
}

$content = app(Pipeline::class)
->send($rawContent)
->through([
new ContentFilter(['strip_tags' => true, 'allowed_tags' => '<p><br>']),
new ProfanityFilter(['replacement' => '***']),
new LinkifyUrls(['target' => '_blank']),
])
->then(fn($content) => $content);

可中断管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?php

namespace App\Pipelines\Auth;

use Closure;
use Exception;

class CheckAccountStatus
{
public function handle($user, Closure $next): mixed
{
if ($user->isBanned()) {
throw new AccountBannedException();
}

if ($user->isSuspended()) {
throw new AccountSuspendedException();
}

if (!$user->isVerified()) {
throw new AccountNotVerifiedException();
}

return $next($user);
}
}

数据处理管道

ETL 管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?php

namespace App\Pipelines\ETL;

use Closure;
use Illuminate\Support\Collection;

class ExtractData
{
protected string $source;

public function __construct(string $source)
{
$this->source = $source;
}

public function handle(mixed $data, Closure $next): Collection
{
$extracted = match($this->source) {
'database' => $this->fromDatabase(),
'api' => $this->fromApi(),
'file' => $this->fromFile(),
default => collect(),
};

return $next($extracted);
}

protected function fromDatabase(): Collection
{
return DB::table('source_data')->get();
}

protected function fromApi(): Collection
{
return Http::get(config('services.api.source'))->collect();
}

protected function fromFile(): Collection
{
return collect(json_decode(file_get_contents(storage_path('data.json')), true));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?php

namespace App\Pipelines\ETL;

use Closure;
use Illuminate\Support\Collection;

class TransformData
{
protected array $transformers;

public function __construct(array $transformers)
{
$this->transformers = $transformers;
}

public function handle(Collection $data, Closure $next): Collection
{
foreach ($this->transformers as $transformer) {
$data = $data->map(fn($item) => $transformer($item));
}

return $next($data);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?php

namespace App\Pipelines\ETL;

use Closure;
use Illuminate\Support\Collection;

class LoadData
{
protected string $destination;
protected int $chunkSize;

public function __construct(string $destination, int $chunkSize = 1000)
{
$this->destination = $destination;
$this->chunkSize = $chunkSize;
}

public function handle(Collection $data, Closure $next): int
{
$inserted = 0;

$data->chunk($this->chunkSize)->each(function ($chunk) use (&$inserted) {
DB::table($this->destination)->insert($chunk->toArray());
$inserted += $chunk->count();
});

return $next($inserted);
}
}

使用 ETL 管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?php

namespace App\Services;

use Illuminate\Pipeline\Pipeline;
use App\Pipelines\ETL\ExtractData;
use App\Pipelines\ETL\TransformData;
use App\Pipelines\ETL\LoadData;

class DataImportService
{
public function import(string $source, string $destination): int
{
return app(Pipeline::class)
->send(null)
->through([
new ExtractData($source),
new TransformData([
fn($item) => array_change_key_case($item, CASE_LOWER),
fn($item) => array_filter($item, fn($v) => $v !== null),
fn($item) => $item + ['imported_at' => now()],
]),
new LoadData($destination, 500),
])
->then(fn($count) => $count);
}
}

请求处理管道

API 请求管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<?php

namespace App\Pipelines\Api;

use Closure;

class AddRequestId
{
public function handle($request, Closure $next): mixed
{
$request->headers->set('X-Request-ID', (string) Str::uuid());

return $next($request);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<?php

namespace App\Pipelines\Api;

use Closure;

class RateLimit
{
protected int $maxAttempts;
protected int $decayMinutes;

public function __construct(int $maxAttempts = 60, int $decayMinutes = 1)
{
$this->maxAttempts = $maxAttempts;
$this->decayMinutes = $decayMinutes;
}

public function handle($request, Closure $next): mixed
{
$key = 'rate_limit:' . $request->ip();

if (Cache::get($key, 0) >= $this->maxAttempts) {
throw new TooManyRequestsException();
}

Cache::increment($key);
Cache::put($key, Cache::get($key), now()->addMinutes($this->decayMinutes));

return $next($request);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php

namespace App\Pipelines\Api;

use Closure;

class TransformResponse
{
public function handle($request, Closure $next): mixed
{
$response = $next($request);

$data = json_decode($response->getContent(), true);

$transformed = [
'success' => $response->isSuccessful(),
'data' => $data,
'meta' => [
'timestamp' => now()->toIso8601String(),
'request_id' => $request->header('X-Request-ID'),
],
];

$response->setContent(json_encode($transformed));

return $response;
}
}

验证管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<?php

namespace App\Pipelines\Validation;

use Closure;
use Illuminate\Contracts\Validation\Validator;

class ValidationPipeline
{
protected array $rules;
protected array $messages;
protected array $customAttributes;

public function __construct(array $rules, array $messages = [], array $customAttributes = [])
{
$this->rules = $rules;
$this->messages = $messages;
$this->customAttributes = $customAttributes;
}

public function handle(array $data, Closure $next): mixed
{
$validator = validator($data, $this->rules, $this->messages, $this->customAttributes);

if ($validator->fails()) {
return new ValidationResult(false, $validator->errors()->all());
}

return $next(new ValidationResult(true, [], $data));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?php

namespace App\Pipelines\Validation;

class ValidationResult
{
public function __construct(
public bool $valid,
public array $errors = [],
public array $data = []
) {}

public function isValid(): bool
{
return $this->valid;
}

public function errors(): array
{
return $this->errors;
}

public function data(): array
{
return $this->data;
}
}

链式验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?php

namespace App\Services;

use Illuminate\Pipeline\Pipeline;
use App\Pipelines\Validation\ValidationPipeline;

class RegistrationValidator
{
public function validate(array $data): ValidationResult
{
return app(Pipeline::class)
->send($data)
->through([
new ValidationPipeline([
'name' => 'required|string|max:255',
'email' => 'required|email',
'password' => 'required|string|min:8',
]),
new ValidationPipeline([
'email' => 'unique:users,email',
]),
new ValidationPipeline([
'password' => 'confirmed',
'terms' => 'accepted',
]),
])
->then(fn($result) => $result);
}
}

管道中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?php

namespace App\Http\Middleware;

use Closure;
use Illuminate\Pipeline\Pipeline;

class PipelineMiddleware
{
protected array $pipes = [];

public function handle($request, Closure $next)
{
return app(Pipeline::class)
->send($request)
->through($this->pipes)
->then(fn($req) => $next($req));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
<?php

namespace App\Http\Middleware;

class ApiPipelineMiddleware extends PipelineMiddleware
{
protected array $pipes = [
\App\Pipelines\Api\AddRequestId::class,
\App\Pipelines\Api\RateLimit::class,
\App\Pipelines\Api\TransformResponse::class,
];
}

测试管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?php

namespace Tests\Unit\Pipelines;

use Tests\TestCase;
use Illuminate\Pipeline\Pipeline;
use App\Pipelines\User\ValidateUserData;
use App\Pipelines\User\NormalizeEmail;

class UserPipelineTest extends TestCase
{
public function test_user_registration_pipeline(): void
{
$userData = [
'name' => 'John Doe',
'email' => 'JOHN@EXAMPLE.COM',
'password' => 'password123',
];

$result = app(Pipeline::class)
->send($userData)
->through([
ValidateUserData::class,
NormalizeEmail::class,
])
->then(function ($data) {
$this->assertEquals('john@example.com', $data['email']);
return $data;
});

$this->assertIsArray($result);
}

public function test_pipeline_stops_on_exception(): void
{
$this->expectException(ValidationException::class);

app(Pipeline::class)
->send(['email' => 'invalid-email'])
->through([ValidateUserData::class])
->then(fn($data) => $data);
}
}

最佳实践

1. 单一职责

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<?php

namespace App\Pipelines\User;

use Closure;

class ValidateEmail
{
public function handle(array $data, Closure $next): mixed
{
if (!filter_var($data['email'] ?? '', FILTER_VALIDATE_EMAIL)) {
throw new InvalidEmailException();
}

return $next($data);
}
}

2. 可测试性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<?php

namespace App\Pipelines\User;

use Closure;
use App\Contracts\EmailValidator;

class ValidateEmail
{
public function __construct(
protected EmailValidator $validator
) {}

public function handle(array $data, Closure $next): mixed
{
$this->validator->validate($data['email'] ?? '');

return $next($data);
}
}

3. 文档化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?php

namespace App\Pipelines\User;

use Closure;

class HashPassword
{
public function handle(array $data, Closure $next): mixed
{
if (!isset($data['password'])) {
return $next($data);
}

$data['password'] = Hash::make($data['password']);

return $next($data);
}
}

总结

Laravel 13 的管道模式提供了一种优雅的方式来处理数据流和请求处理。通过合理使用管道模式,可以创建可维护、可测试、可扩展的应用程序架构。