8.8 分布式事务处理¶
学习目标¶
- 深入理解分布式事务的复杂性与挑战
- 掌握主流分布式事务解决方案
- 熟练运用Saga、TCC等事务模式
- 学会在Go语言中实现分布式事务
学习内容¶
1. 分布式事务基础¶
1.1 ACID特性在分布式环境中的挑战¶
在分布式系统中,ACID特性面临巨大挑战: - 原子性:多个服务的操作需要同时成功或失败 - 一致性:跨多个数据库的数据一致性难以保证 - 隔离性:分布式并发控制复杂度高 - 持久性:需要确保所有参与节点都持久化数据
1.2 CAP定理与BASE理论¶
CAP定理指出分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition Tolerance)中的两项。
BASE理论是Basically Available(基本可用)、Soft state(软状态)和Eventually consistent(最终一致性)的缩写,为分布式事务提供了更实用的指导原则。
1.3 分布式事务的类型分类¶
- 强一致性事务:2PC、3PC
- 最终一致性事务:Saga、TCC、消息事务
- 混合型事务:结合多种模式
1.4 最终一致性 vs 强一致性¶
- 强一致性:数据实时同步,性能开销大
- 最终一致性:允许短暂不一致,性能更好,更适合分布式环境
2. Saga事务模式¶
4.1 Saga模式原理与特点¶
Saga通过一系列本地事务和补偿操作来管理分布式事务,适合长事务场景。
4.2 编排式Saga vs 协调式Saga¶
- 编排式(Choreography):每个服务产生并监听事件
- 协调式(Orchestration):中央协调器控制流程
4.3 补偿事务设计¶
每个正向操作都需要对应的补偿操作。
4.4 Saga状态机实现¶
package main
import (
"context"
"errors"
"fmt"
"log"
)
// SagaStep 定义Saga步骤
type SagaStep struct {
Name string
Execute func(ctx context.Context) error
Compensate func(ctx context.Context) error
IsCompensating bool
}
// Saga 定义Saga事务
type Saga struct {
steps []*SagaStep
}
// NewSaga 创建新的Saga实例
func NewSaga() *Saga {
return &Saga{
steps: make([]*SagaStep, 0),
}
}
// AddStep 添加步骤
func (s *Saga) AddStep(name string, execute, compensate func(ctx context.Context) error) {
s.steps = append(s.steps, &SagaStep{
Name: name,
Execute: execute,
Compensate: compensate,
})
}
// Execute 执行Saga事务
func (s *Saga) Execute(ctx context.Context) error {
for i, step := range s.steps {
log.Printf("Executing step %d: %s", i, step.Name)
if err := step.Execute(ctx); err != nil {
log.Printf("Step %d failed, starting compensation", i)
return s.compensate(ctx, i-1)
}
}
return nil
}
// compensate 执行补偿操作
func (s *Saga) compensate(ctx context.Context, from int) error {
for i := from; i >= 0; i-- {
step := s.steps[i]
log.Printf("Compensating step %d: %s", i, step.Name)
if err := step.Compensate(ctx); err != nil {
log.Printf("Compensation failed for step %d: %v", i, err)
return fmt.Errorf("compensation failed: %w", err)
}
}
return errors.New("transaction failed and was compensated")
}
// 示例使用
func main() {
saga := NewSaga()
// 添加步骤
saga.AddStep("create_order",
func(ctx context.Context) error {
fmt.Println("Creating order...")
// 模拟成功
return nil
},
func(ctx context.Context) error {
fmt.Println("Compensating order creation...")
return nil
},
)
saga.AddStep("process_payment",
func(ctx context.Context) error {
fmt.Println("Processing payment...")
// 模拟失败
return errors.New("payment failed")
},
func(ctx context.Context) error {
fmt.Println("Compensating payment...")
return nil
},
)
// 执行Saga
if err := saga.Execute(context.Background()); err != nil {
log.Printf("Saga execution failed: %v", err)
}
}
4.5 Go语言Saga框架实现¶
可以使用状态模式和工作流引擎实现更复杂的Saga模式。
3. TCC事务模式¶
5.1 Try-Confirm-Cancel模式¶
TCC通过Try、Confirm、Cancel三个阶段实现分布式事务: - Try:预留资源 - Confirm:确认操作 - Cancel:取消操作
5.2 TCC与2PC的区别¶
TCC是业务层面的2PC,不需要数据库支持XA协议。
5.3 TCC框架设计原理¶
package main
import (
"context"
"errors"
"fmt"
"log"
)
// TCCParticipant TCC参与者接口
type TCCParticipant interface {
Try(ctx context.Context) error
Confirm(ctx context.Context) error
Cancel(ctx context.Context) error
}
// TCCCoordinator TCC协调器
type TCCCoordinator struct {
participants []TCCParticipant
}
// NewTCCCoordinator 创建TCC协调器
func NewTCCCoordinator() *TCCCoordinator {
return &TCCCoordinator{
participants: make([]TCCParticipant, 0),
}
}
// AddParticipant 添加参与者
func (tc *TCCCoordinator) AddParticipant(participant TCCParticipant) {
tc.participants = append(tc.participants, participant)
}
// Execute 执行TCC事务
func (tc *TCCCoordinator) Execute(ctx context.Context) error {
// Try阶段
for i, participant := range tc.participants {
if err := participant.Try(ctx); err != nil {
log.Printf("Try phase failed at participant %d, canceling", i)
return tc.cancel(ctx, i-1)
}
}
// Confirm阶段
for i, participant := range tc.participants {
if err := participant.Confirm(ctx); err != nil {
log.Printf("Confirm phase failed at participant %d", i)
// Confirm失败需要人工干预
return errors.New("confirm phase failed, manual intervention required")
}
}
return nil
}
// cancel 执行取消操作
func (tc *TCCCoordinator) cancel(ctx context.Context, from int) error {
for i := from; i >= 0; i-- {
if err := tc.participants[i].Cancel(ctx); err != nil {
log.Printf("Cancel failed for participant %d: %v", i, err)
return fmt.Errorf("cancel failed: %w", err)
}
}
return errors.New("transaction canceled")
}
// 示例参与者
type OrderService struct{}
func (os *OrderService) Try(ctx context.Context) error {
fmt.Println("OrderService: Trying to create order...")
return nil
}
func (os *OrderService) Confirm(ctx context.Context) error {
fmt.Println("OrderService: Confirming order creation...")
return nil
}
func (os *OrderService) Cancel(ctx context.Context) error {
fmt.Println("OrderService: Canceling order creation...")
return nil
}
type PaymentService struct{}
func (ps *PaymentService) Try(ctx context.Context) error {
fmt.Println("PaymentService: Trying to process payment...")
return errors.New("payment service unavailable") // 模拟失败
}
func (ps *PaymentService) Confirm(ctx context.Context) error {
fmt.Println("PaymentService: Confirming payment...")
return nil
}
func (ps *PaymentService) Cancel(ctx context.Context) error {
fmt.Println("PaymentService: Canceling payment...")
return nil
}
func main() {
coordinator := NewTCCCoordinator()
coordinator.AddParticipant(&OrderService{})
coordinator.AddParticipant(&PaymentService{})
if err := coordinator.Execute(context.Background()); err != nil {
log.Printf("TCC transaction failed: %v", err)
}
}
5.4 业务补偿机制¶
TCC需要精心设计补偿逻辑,确保数据一致性。
5.5 TCC在Go中的实现¶
可以使用接口和泛型实现通用的TCC框架。
4. 两阶段提交协议(2PC)¶
2.1 2PC协议原理与流程¶
2PC通过协调者和参与者协作: 1. 准备阶段:协调者询问参与者是否可以提交 2. 提交阶段:协调者根据响应决定提交或回滚
2.2 协调者与参与者角色¶
- 协调者:决策中心
- 参与者:执行具体操作
2.3 2PC的优缺点分析¶
优点:强一致性 缺点:同步阻塞、单点故障、数据不一致风险
2.4 2PC的改进方案¶
超时机制、多协调者、异步确认等。
5. 三阶段提交协议(3PC)¶
3.1 3PC协议改进点¶
引入超时机制和预提交阶段,减少阻塞时间。
3.2 CanCommit、PreCommit、DoCommit阶段¶
- CanCommit:询问参与者是否具备执行条件
- PreCommit:预提交,执行操作但不提交
- DoCommit:最终提交
3.3 3PC与2PC的对比¶
3PC减少阻塞但增加复杂度,实际应用较少。
6. 本地消息表模式¶
6.1 本地消息表原理¶
通过本地数据库保证消息可靠性。
6.2 消息发布与确认机制¶
package main
import (
"database/sql"
"fmt"
"log"
"time"
_ "github.com/go-sql-driver/mysql"
)
type LocalMessage struct {
ID int64
Content string
Status string
CreatedAt time.Time
UpdatedAt time.Time
}
type MessageStore struct {
db *sql.DB
}
func NewMessageStore(dsn string) (*MessageStore, error) {
db, err := sql.Open("mysql", dsn)
if err != nil {
return nil, err
}
return &MessageStore{db: db}, nil
}
func (ms *MessageStore) SaveMessage(tx *sql.Tx, content string) (int64, error) {
result, err := tx.Exec(
"INSERT INTO local_messages (content, status, created_at, updated_at) VALUES (?, 'pending', NOW(), NOW())",
content,
)
if err != nil {
return 0, err
}
return result.LastInsertId()
}
func (ms *MessageStore) UpdateMessageStatus(id int64, status string) error {
_, err := ms.db.Exec(
"UPDATE local_messages SET status = ?, updated_at = NOW() WHERE id = ?",
status, id,
)
return err
}
// 业务事务示例
func BusinessTransaction(db *sql.DB, messageStore *MessageStore, businessData string, messageContent string) error {
tx, err := db.Begin()
if err != nil {
return err
}
// 执行业务操作
_, err = tx.Exec("INSERT INTO business_table (data) VALUES (?)", businessData)
if err != nil {
tx.Rollback()
return err
}
// 保存本地消息
_, err = messageStore.SaveMessage(tx, messageContent)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
6.3 消息去重与幂等性¶
通过唯一ID和状态检查实现幂等性。
7. 分布式事务框架¶
9.2 DTM分布式事务管理器¶
DTM是Go语言的分布式事务框架,支持多种模式。
package main
import (
"context"
"fmt"
"log"
"github.com/dtm-labs/dtm/client/dtmgrpc"
"github.com/lithammer/shortuuid/v3"
"google.golang.org/grpc"
)
func main() {
// 创建DTM服务器地址
dtmServer := "localhost:36790"
// 创建Saga事务
gid := shortuuid.New()
saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).
Add("localhost:50051/example.Business/TransOut",
"localhost:50051/example.Business/TransOutCompensate",
&req{Amount: 30}).
Add("localhost:50051/example.Business/TransIn",
"localhost:50051/example.Business/TransInCompensate",
&req{Amount: 30})
// 提交Saga事务
err := saga.Submit()
if err != nil {
log.Fatalf("Saga submit error: %v", err)
}
fmt.Printf("Saga transaction %s submitted\n", gid)
}
type req struct {
Amount int
}
本章小结¶
分布式事务处理是微服务架构中最具挑战性的技术难题之一。通过本章学习,我们掌握了多种分布式事务解决方案和实现方法。
关键要点回顾: - CAP定理和BASE理论为分布式事务设计提供了理论基础 - Saga模式通过补偿机制实现最终一致性,适合长事务场景 - TCC模式提供更精确的事务控制,适合对一致性要求高的场景 - 本地消息表和事务消息模式是实用的轻量级解决方案
技术实现要点: - Go语言可以通过状态机和协程实现Saga模式 - TCC框架需要精心设计Try、Confirm、Cancel三个阶段 - 事务消息需要消息队列的事务性支持 - 幂等性设计是所有分布式事务方案的关键要求
在实际项目中,应根据具体业务场景选择合适的事务模式,权衡一致性要求与系统性能。