Laravel 13 数据库事务处理完全指南

数据库事务是保证数据一致性的核心机制。本文将深入探讨 Laravel 13 中数据库事务的各种用法和最佳实践。

事务基础

基本事务

1
2
3
4
5
6
use Illuminate\Support\Facades\DB;

DB::transaction(function () {
DB::table('users')->insert(['name' => 'John']);
DB::table('posts')->insert(['user_id' => 1, 'title' => 'First Post']);
});

手动事务控制

1
2
3
4
5
6
7
8
9
10
11
DB::beginTransaction();

try {
DB::table('users')->insert(['name' => 'John']);
DB::table('posts')->insert(['user_id' => 1, 'title' => 'First Post']);

DB::commit();
} catch (\Exception $e) {
DB::rollBack();
throw $e;
}

指定死锁重试次数

1
2
3
DB::transaction(function () {
DB::table('users')->update(['votes' => 1]);
}, 5);

事务隔离级别

设置隔离级别

1
2
3
4
5
6
7
8
9
10
11
use Illuminate\Support\Facades\DB;

DB::transaction(function () {
// 事务操作
}, 5, DB::ISOLATION_SERIALIZABLE);

// 可用隔离级别
DB::ISOLATION_READ_UNCOMMITTED
DB::ISOLATION_READ_COMMITTED
DB::ISOLATION_REPEATABLE_READ
DB::ISOLATION_SERIALIZABLE

自定义隔离级别

1
2
3
4
5
DB::connection('mysql')->statement('SET TRANSACTION ISOLATION LEVEL READ COMMITTED');

DB::transaction(function () {
// 事务操作
});

Eloquent 事务

模型事务操作

1
2
3
4
5
6
7
8
9
10
use App\Models\User;
use App\Models\Post;

DB::transaction(function () use ($data) {
$user = User::create($data['user']);

foreach ($data['posts'] as $postData) {
$user->posts()->create($postData);
}
});

模型观察者与事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<?php

namespace App\Observers;

use App\Models\Order;
use Illuminate\Support\Facades\DB;

class OrderObserver
{
public function created(Order $order): void
{
if (DB::transactionLevel() > 0) {
DB::afterCommit(function () use ($order) {
// 事务提交后执行
$order->notifyUser();
});
} else {
$order->notifyUser();
}
}
}

高级事务模式

嵌套事务

1
2
3
4
5
6
7
DB::transaction(function () {
DB::table('users')->insert(['name' => 'John']);

DB::transaction(function () {
DB::table('posts')->insert(['user_id' => 1, 'title' => 'Post']);
});
});

事务点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
DB::beginTransaction();

try {
DB::table('users')->insert(['name' => 'John']);

DB::transactionLevel() === 1 && DB::statement('SAVEPOINT sp1');

DB::table('posts')->insert(['user_id' => 1, 'title' => 'Post']);

if ($someCondition) {
DB::statement('ROLLBACK TO SAVEPOINT sp1');
}

DB::commit();
} catch (\Exception $e) {
DB::rollBack();
}

条件事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php

namespace App\Services;

use Illuminate\Support\Facades\DB;

class OrderService
{
public function createOrder(array $data, bool $useTransaction = true): Order
{
$callback = function () use ($data) {
$order = Order::create($data['order']);

foreach ($data['items'] as $item) {
$order->items()->create($item);
}

return $order;
};

return $useTransaction ? DB::transaction($callback) : $callback();
}
}

分布式事务

两阶段提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<?php

namespace App\Services;

use Illuminate\Support\Facades\DB;

class DistributedTransaction
{
protected array $connections = ['mysql', 'mysql_secondary'];

public function execute(callable $callback): void
{
foreach ($this->connections as $connection) {
DB::connection($connection)->beginTransaction();
}

try {
$callback();

foreach ($this->connections as $connection) {
DB::connection($connection)->commit();
}
} catch (\Exception $e) {
foreach ($this->connections as $connection) {
DB::connection($connection)->rollBack();
}

throw $e;
}
}
}

Saga 模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
<?php

namespace App\Services;

class SagaTransaction
{
protected array $steps = [];
protected array $compensations = [];

public function addStep(callable $execute, callable $compensate): self
{
$this->steps[] = $execute;
$this->compensations[] = $compensate;

return $this;
}

public function execute(): void
{
$executedSteps = [];

try {
foreach ($this->steps as $index => $step) {
$step();
$executedSteps[] = $index;
}
} catch (\Exception $e) {
$this->compensate($executedSteps);
throw $e;
}
}

protected function compensate(array $executedSteps): void
{
foreach (array_reverse($executedSteps) as $index) {
try {
$this->compensations[$index]();
} catch (\Exception $e) {
\Log::error('Compensation failed', [
'step' => $index,
'error' => $e->getMessage(),
]);
}
}
}
}

// 使用
$saga = new SagaTransaction();

$saga->addStep(
fn() => $this->reserveInventory($order),
fn() => $this->releaseInventory($order)
);

$saga->addStep(
fn() => $this->chargePayment($order),
fn() => $this->refundPayment($order)
);

$saga->addStep(
fn() => $this->createShipment($order),
fn() => $this->cancelShipment($order)
);

$saga->execute();

事务事件

监听事务事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
use Illuminate\Support\Facades\DB;

DB::beforeExecuting(function ($query, $bindings) {
// SQL 执行前
});

DB::afterExecuting(function ($query, $bindings, $time) {
// SQL 执行后
});

DB::listen(function ($query) {
\Log::debug('SQL', [
'sql' => $query->sql,
'bindings' => $query->bindings,
'time' => $query->time,
]);
});

事务回调

1
2
3
4
5
6
7
8
DB::transaction(function () {
DB::table('users')->insert(['name' => 'John']);

DB::afterCommit(function () {
// 事务提交后执行
event(new UserCreated($user));
});
});

事务最佳实践

事务服务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
<?php

namespace App\Services;

use Illuminate\Support\Facades\DB;
use App\Models\Order;
use App\Models\OrderItem;
use App\Events\OrderCreated;

class OrderService
{
public function createOrder(array $data): Order
{
return DB::transaction(function () use ($data) {
$order = Order::create([
'user_id' => $data['user_id'],
'total' => $this->calculateTotal($data['items']),
'status' => 'pending',
]);

$this->createItems($order, $data['items']);
$this->updateInventory($data['items']);
$this->applyDiscount($order, $data['discount_code'] ?? null);

DB::afterCommit(function () use ($order) {
event(new OrderCreated($order));
$this->sendConfirmation($order);
});

return $order;
});
}

protected function createItems(Order $order, array $items): void
{
foreach ($items as $item) {
$order->items()->create([
'product_id' => $item['product_id'],
'quantity' => $item['quantity'],
'price' => $item['price'],
]);
}
}

protected function updateInventory(array $items): void
{
foreach ($items as $item) {
Product::where('id', $item['product_id'])
->decrement('stock', $item['quantity']);
}
}

protected function applyDiscount(Order $order, ?string $code): void
{
if ($code) {
$discount = Discount::where('code', $code)->first();

if ($discount && $discount->isValid()) {
$order->update([
'discount_id' => $discount->id,
'discount_amount' => $discount->calculate($order->total),
]);
}
}
}
}

事务与队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php

namespace App\Services;

use Illuminate\Support\Facades\DB;
use App\Jobs\ProcessOrder;

class CheckoutService
{
public function checkout(array $data): Order
{
return DB::transaction(function () use ($data) {
$order = $this->createOrder($data);

DB::afterCommit(function () use ($order) {
ProcessOrder::dispatch($order)
->onQueue('orders');
});

return $order;
});
}
}

死锁处理

检测死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
use Illuminate\Database\QueryException;

try {
DB::transaction(function () {
// 可能产生死锁的操作
});
} catch (QueryException $e) {
if ($this->isDeadlock($e)) {
// 重试逻辑
return $this->retry($e);
}

throw $e;
}

protected function isDeadlock(QueryException $e): bool
{
$deadlockCodes = [
1213, // MySQL deadlock
1205, // MySQL lock wait timeout
];

return in_array($e->getCode(), $deadlockCodes);
}

死锁重试策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?php

namespace App\Services;

use Illuminate\Support\Facades\DB;
use Illuminate\Database\QueryException;

class DeadlockRetry
{
protected int $maxAttempts = 5;
protected array $backoff = [100, 200, 500, 1000, 2000];

public function execute(callable $callback, int $attempts = 0): mixed
{
try {
return DB::transaction($callback);
} catch (QueryException $e) {
if (!$this->isDeadlock($e) || $attempts >= $this->maxAttempts) {
throw $e;
}

usleep($this->backoff[$attempts] * 1000);

return $this->execute($callback, $attempts + 1);
}
}

protected function isDeadlock(QueryException $e): bool
{
return str_contains($e->getMessage(), 'Deadlock');
}
}

事务监控

事务性能监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<?php

namespace App\Services;

use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;

class TransactionMonitor
{
protected float $startTime;
protected int $startQueries;

public function begin(): void
{
$this->startTime = microtime(true);
$this->startQueries = DB::getQueryLog() ? count(DB::getQueryLog()) : 0;
}

public function end(string $name): array
{
$duration = microtime(true) - $this->startTime;
$queries = DB::getQueryLog() ? count(DB::getQueryLog()) - $this->startQueries : 0;

$metrics = [
'name' => $name,
'duration_ms' => round($duration * 1000, 2),
'queries' => $queries,
];

if ($duration > 1.0) {
Log::warning('Slow transaction', $metrics);
}

return $metrics;
}
}

// 使用
$monitor = new TransactionMonitor();
$monitor->begin();

DB::transaction(function () {
// 事务操作
});

$metrics = $monitor->end('create_order');

总结

Laravel 13 的数据库事务提供了:

  • 灵活的事务控制方式
  • 多种隔离级别支持
  • 嵌套事务和保存点
  • 分布式事务模式
  • 完善的死锁处理
  • 事务事件监听

正确使用事务是保证数据一致性和应用稳定性的关键。