7.5 事务处理与并发控制¶
学习目标¶
- 深入理解事务的ACID特性
- 掌握不同事务隔离级别的应用
- 熟练处理并发控制问题
- 学会设计分布式事务解决方案
核心内容¶
1. 事务基础理论¶
1.1 ACID特性详解¶
事务是数据库操作的基本单位,具有四个关键特性,通常称为ACID特性:
-
原子性(Atomicity):事务中的所有操作要么全部完成,要么全部不完成,不会处于中间状态。如果事务执行过程中发生错误,会回滚到事务开始前的状态。
-
一致性(Consistency):事务必须使数据库从一个一致性状态变换到另一个一致性状态。例如,在银行转账中,总金额应该保持不变。
-
隔离性(Isolation):多个事务并发执行时,一个事务的执行不应影响其他事务的执行。数据库通过隔离级别控制并发事务之间的相互影响。
-
持久性(Durability):一旦事务提交,其对数据库的修改就是永久性的,即使系统崩溃也不会丢失。
1.2 事务的生命周期¶
一个完整的事务生命周期包含以下阶段:
- 开始(Begin):事务开始,数据库记录当前状态。
- 执行(Execute):执行一系列数据库操作。
- 提交(Commit):所有操作成功完成,将修改永久保存到数据库。
- 回滚(Rollback):如果执行过程中发生错误,撤销所有已执行的操作,恢复到事务开始前的状态。
事务的状态转换:活跃状态 → 部分提交状态 → 提交状态 或 失败状态 → 中止状态。
1.3 事务日志机制¶
事务日志是保证事务ACID特性的关键机制,主要用于:
- 崩溃恢复:系统崩溃后,通过日志恢复未完成的事务
- 事务回滚:当需要撤销操作时,使用日志记录的信息
- 数据复制:在主从复制中,日志用于同步数据
常见的日志类型包括: - 重做日志(Redo Log):记录数据修改后的值,用于崩溃后重做操作 - 撤销日志(Undo Log):记录数据修改前的值,用于事务回滚
2. Go语言事务操作¶
Go语言通过数据库驱动提供事务支持,标准库中的database/sql包定义了事务接口。
2.1 事务的开启与提交¶
下面是一个使用Go语言操作MySQL事务的示例:
package main
import (
"database/sql"
"fmt"
"log"
_ "github.com/go-sql-driver/mysql"
)
func main() {
// 连接数据库
db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/testdb")
if err != nil {
log.Fatalf("无法连接数据库: %v", err)
}
defer db.Close()
// 验证连接
err = db.Ping()
if err != nil {
log.Fatalf("无法验证数据库连接: %v", err)
}
// 开启事务
tx, err := db.Begin()
if err != nil {
log.Fatalf("开启事务失败: %v", err)
}
// 执行SQL操作
_, err = tx.Exec("INSERT INTO users (name, email) VALUES (?, ?)", "张三", "zhangsan@example.com")
if err != nil {
log.Printf("执行SQL失败: %v", err)
// 发生错误时回滚
if rollbackErr := tx.Rollback(); rollbackErr != nil {
log.Printf("回滚事务失败: %v", rollbackErr)
}
return
}
// 提交事务
if err := tx.Commit(); err != nil {
log.Printf("提交事务失败: %v", err)
// 提交失败时尝试回滚
if rollbackErr := tx.Rollback(); rollbackErr != nil {
log.Printf("回滚事务失败: %v", rollbackErr)
}
return
}
fmt.Println("事务执行成功并提交")
}
2.2 事务回滚处理¶
在Go中,事务回滚通常在发生错误时执行。以下示例展示了更完整的错误处理和回滚机制:
package main
import (
"database/sql"
"fmt"
"log"
_ "github.com/go-sql-driver/mysql"
)
func transferMoney(db *sql.DB, fromAccount, toAccount int, amount float64) error {
// 开启事务
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("开启事务失败: %v", err)
}
defer func() {
// 如果发生panic,回滚事务
if r := recover(); r != nil {
log.Printf("发生panic,回滚事务: %v", r)
tx.Rollback()
}
}()
// 检查转出账户余额
var balance float64
err = tx.QueryRow("SELECT balance FROM accounts WHERE id = ?", fromAccount).Scan(&balance)
if err != nil {
tx.Rollback()
return fmt.Errorf("查询余额失败: %v", err)
}
// 检查余额是否充足
if balance < amount {
tx.Rollback()
return fmt.Errorf("余额不足")
}
// 减少转出账户余额
_, err = tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, fromAccount)
if err != nil {
tx.Rollback()
return fmt.Errorf("更新转出账户失败: %v", err)
}
// 增加转入账户余额
_, err = tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, toAccount)
if err != nil {
tx.Rollback()
return fmt.Errorf("更新转入账户失败: %v", err)
}
// 提交事务
if err := tx.Commit(); err != nil {
tx.Rollback()
return fmt.Errorf("提交事务失败: %v", err)
}
return nil
}
func main() {
// 连接数据库
db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/bankdb")
if err != nil {
log.Fatalf("无法连接数据库: %v", err)
}
defer db.Close()
// 执行转账
err = transferMoney(db, 1, 2, 100.0)
if err != nil {
log.Fatalf("转账失败: %v", err)
}
fmt.Println("转账成功")
}
2.3 嵌套事务管理¶
Go的database/sql包本身不直接支持嵌套事务,但可以通过保存点(savepoint)实现类似功能:
package main
import (
"database/sql"
"fmt"
"log"
_ "github.com/go-sql-driver/mysql"
)
func main() {
// 连接数据库
db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/testdb")
if err != nil {
log.Fatalf("无法连接数据库: %v", err)
}
defer db.Close()
// 开启外层事务
tx, err := db.Begin()
if err != nil {
log.Fatalf("开启事务失败: %v", err)
}
// 执行外层操作
_, err = tx.Exec("INSERT INTO orders (user_id, total) VALUES (?, ?)", 1, 200)
if err != nil {
tx.Rollback()
log.Fatalf("插入订单失败: %v", err)
}
// 创建保存点,模拟嵌套事务开始
_, err = tx.Exec("SAVEPOINT order_items_savepoint")
if err != nil {
tx.Rollback()
log.Fatalf("创建保存点失败: %v", err)
}
// 执行"嵌套事务"中的操作
_, err = tx.Exec("INSERT INTO order_items (order_id, product_id, quantity) VALUES (?, ?, ?)", 1, 101, 2)
if err != nil {
// 回滚到保存点,而不是整个事务
_, rollbackErr := tx.Exec("ROLLBACK TO SAVEPOINT order_items_savepoint")
if rollbackErr != nil {
tx.Rollback()
log.Fatalf("回滚到保存点失败: %v", rollbackErr)
}
log.Printf("插入订单项失败,已回滚到保存点: %v", err)
}
// 提交整个事务
if err := tx.Commit(); err != nil {
log.Fatalf("提交事务失败: %v", err)
}
fmt.Println("事务执行成功")
}
3. 事务隔离级别¶
数据库通过隔离级别控制并发事务之间的相互影响,不同的隔离级别提供不同的一致性保证和性能特性。
3.1 四种隔离级别对比¶
- 读未提交(Read Uncommitted)
- 最低的隔离级别
- 允许事务查看其他未提交事务的修改
-
可能导致脏读、不可重复读和幻读
-
读已提交(Read Committed)
- 保证事务只能看到已提交事务的修改
- 避免脏读,但可能出现不可重复读和幻读
-
是许多数据库的默认隔离级别(如PostgreSQL、SQL Server)
-
可重复读(Repeatable Read)
- 保证事务在整个过程中看到相同的数据
- 避免脏读和不可重复读,但可能出现幻读
-
是MySQL的默认隔离级别
-
串行化(Serializable)
- 最高的隔离级别
- 通过强制事务串行执行避免所有并发问题
- 完全避免脏读、不可重复读和幻读,但性能最差
3.2 隔离级别的选择策略¶
在Go中设置事务隔离级别:
package main
import (
"database/sql"
"fmt"
"log"
_ "github.com/go-sql-driver/mysql"
)
func main() {
// 连接数据库
db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/testdb")
if err != nil {
log.Fatalf("无法连接数据库: %v", err)
}
defer db.Close()
// 查看数据库当前默认隔离级别
var isolationLevel string
err = db.QueryRow("SELECT @@tx_isolation").Scan(&isolationLevel)
if err != nil {
log.Fatalf("查询隔离级别失败: %v", err)
}
fmt.Printf("默认隔离级别: %s\n", isolationLevel)
// 开启事务并设置隔离级别
tx, err := db.Begin()
if err != nil {
log.Fatalf("开启事务失败: %v", err)
}
// 设置隔离级别为可重复读
_, err = tx.Exec("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
if err != nil {
tx.Rollback()
log.Fatalf("设置隔离级别失败: %v", err)
}
// 执行数据库操作...
_, err = tx.Exec("INSERT INTO products (name, price) VALUES (?, ?)", "测试商品", 99.99)
if err != nil {
tx.Rollback()
log.Fatalf("插入数据失败: %v", err)
}
// 提交事务
if err := tx.Commit(); err != nil {
log.Fatalf("提交事务失败: %v", err)
}
fmt.Println("事务执行成功")
}
3.3 隔离级别与性能权衡¶
隔离级别与性能的关系通常是:隔离级别越高,一致性保证越强,但并发性能越差。
选择隔离级别时应考虑:
- 应用的一致性需求:金融交易系统通常需要更高的隔离级别
- 并发访问量:高并发系统可能需要在一致性和性能间做平衡
- 数据修改频率:频繁修改的数据可能需要更高的隔离级别
以下是一个在不同场景下选择隔离级别的示例:
package main
import (
"database/sql"
"fmt"
"log"
_ "github.com/go-sql-driver/mysql"
)
// 根据业务场景设置合适的隔离级别
func beginTransactionWithIsolation(db *sql.DB, scenario string) (*sql.Tx, error) {
tx, err := db.Begin()
if err != nil {
return nil, err
}
var isolationLevel string
switch scenario {
case "financial": // 金融交易,需要高隔离级别
isolationLevel = "SERIALIZABLE"
case "reporting": // 报表生成,可容忍较低隔离级别
isolationLevel = "READ COMMITTED"
case "inventory": // 库存管理,需要中等隔离级别
isolationLevel = "REPEATABLE READ"
default:
isolationLevel = "REPEATABLE READ" // 默认级别
}
_, err = tx.Exec(fmt.Sprintf("SET TRANSACTION ISOLATION LEVEL %s", isolationLevel))
if err != nil {
tx.Rollback()
return nil, err
}
return tx, nil
}
func main() {
// 连接数据库
db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/testdb")
if err != nil {
log.Fatalf("无法连接数据库: %v", err)
}
defer db.Close()
// 金融交易场景,使用最高隔离级别
tx, err := beginTransactionWithIsolation(db, "financial")
if err != nil {
log.Fatalf("开启事务失败: %v", err)
}
// 执行金融交易操作...
fmt.Println("执行金融交易操作")
// 提交事务
if err := tx.Commit(); err != nil {
log.Fatalf("提交事务失败: %v", err)
}
fmt.Println("事务执行成功")
}
4. 并发控制机制¶
并发控制是数据库管理系统处理多个并发事务的关键技术,确保事务的隔离性和一致性。
4.1 锁机制与死锁预防¶
锁是最常用的并发控制机制,Go中可以通过数据库提供的锁机制或应用层锁实现。
package main
import (
"database/sql"
"fmt"
"log"
"sync"
"time"
_ "github.com/go-sql-driver/mysql"
)
// 使用数据库行级锁
func updateWithRowLock(db *sql.DB, productID int, quantity int) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// 使用SELECT ... FOR UPDATE获取行级锁
var currentStock int
err = tx.QueryRow("SELECT stock FROM products WHERE id = ? FOR UPDATE", productID).Scan(¤tStock)
if err != nil {
return err
}
if currentStock < quantity {
return fmt.Errorf("库存不足")
}
// 更新库存
_, err = tx.Exec("UPDATE products SET stock = stock - ? WHERE id = ?", quantity, productID)
if err != nil {
return err
}
return tx.Commit()
}
// 应用层锁防止死锁
var (
lockMap = make(map[int]*sync.Mutex)
mapLock sync.Mutex
)
func getLock(resourceID int) *sync.Mutex {
mapLock.Lock()
defer mapLock.Unlock()
if _, exists := lockMap[resourceID]; !exists {
lockMap[resourceID] = &sync.Mutex{}
}
return lockMap[resourceID]
}
// 按资源ID顺序获取锁,预防死锁
func safeLock(resources ...int) []*sync.Mutex {
// 排序资源ID,确保获取锁的顺序一致
for i := 0; i < len(resources); i++ {
for j := i + 1; j < len(resources); j++ {
if resources[i] > resources[j] {
resources[i], resources[j] = resources[j], resources[i]
}
}
}
// 按顺序获取锁
locks := make([]*sync.Mutex, len(resources))
for i, id := range resources {
lock := getLock(id)
lock.Lock()
locks[i] = lock
}
return locks
}
func main() {
db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/testdb")
if err != nil {
log.Fatalf("无法连接数据库: %v", err)
}
defer db.Close()
// 模拟并发更新
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := updateWithRowLock(db, 1, 1)
if err != nil {
log.Printf("更新失败: %v", err)
} else {
fmt.Println("更新成功")
}
}()
}
wg.Wait()
fmt.Println("所有并发操作完成")
}
4.2 乐观锁与悲观锁¶
乐观锁和悲观锁是两种不同的并发控制策略:
- 悲观锁:假设冲突经常发生,在操作数据前先获取锁,阻止其他事务修改
- 乐观锁:假设冲突很少发生,在更新时检查数据是否被修改过,如果被修改则重试
package main
import (
"database/sql"
"errors"
"fmt"
"log"
"sync"
_ "github.com/go-sql-driver/mysql"
)
// 悲观锁实现
func decreaseStockPessimistic(db *sql.DB, productID int, quantity int) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// 获取悲观锁
var stock int
err = tx.QueryRow("SELECT stock FROM products WHERE id = ? FOR UPDATE", productID).Scan(&stock)
if err != nil {
return err
}
if stock < quantity {
return errors.New("库存不足")
}
// 更新库存
_, err = tx.Exec("UPDATE products SET stock = stock - ? WHERE id = ?", quantity, productID)
if err != nil {
return err
}
return tx.Commit()
}
// 乐观锁实现
func decreaseStockOptimistic(db *sql.DB, productID int, quantity int, maxRetries int) error {
for retry := 0; retry < maxRetries; retry++ {
tx, err := db.Begin()
if err != nil {
return err
}
// 获取当前库存和版本号
var stock, version int
err = tx.QueryRow("SELECT stock, version FROM products WHERE id = ?", productID).Scan(&stock, &version)
if err != nil {
tx.Rollback()
return err
}
if stock < quantity {
tx.Rollback()
return errors.New("库存不足")
}
// 使用版本号作为乐观锁,只有版本号匹配时才更新
result, err := tx.Exec(
"UPDATE products SET stock = stock - ?, version = version + 1 WHERE id = ? AND version = ?",
quantity, productID, version,
)
if err != nil {
tx.Rollback()
return err
}
// 检查是否有行被更新
rowsAffected, err := result.RowsAffected()
if err != nil {
tx.Rollback()
return err
}
if rowsAffected == 0 {
// 没有行被更新,说明数据已被修改,重试
tx.Rollback()
continue
}
// 提交事务成功
if err := tx.Commit(); err != nil {
return err
}
return nil
}
return errors.New("达到最大重试次数,更新失败")
}
func main() {
db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/testdb")
if err != nil {
log.Fatalf("无法连接数据库: %v", err)
}
defer db.Close()
var wg sync.WaitGroup
// 测试悲观锁
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := decreaseStockPessimistic(db, 1, 1)
if err != nil {
log.Printf("悲观锁更新失败: %v", err)
} else {
fmt.Println("悲观锁更新成功")
}
}()
}
// 测试乐观锁
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := decreaseStockOptimistic(db, 2, 1, 3)
if err != nil {
log.Printf("乐观锁更新失败: %v", err)
} else {
fmt.Println("乐观锁更新成功")
}
}()
}
wg.Wait()
fmt.Println("所有并发操作完成")
}
4.3 MVCC多版本并发控制¶
MVCC(Multi-Version Concurrency Control)是许多数据库(如PostgreSQL、MySQL InnoDB)采用的并发控制机制,它通过保存数据的多个版本来实现高并发访问。
在Go中使用MVCC特性主要通过数据库本身的支持,应用层不需要特殊处理:
package main
import (
"database/sql"
"fmt"
"log"
"sync"
_ "github.com/lib/pq" // PostgreSQL驱动
)
func readWithMVCC(db *sql.DB, wg *sync.WaitGroup, id int) {
defer wg.Done()
// 读取数据,MVCC会确保看到一致的版本
var name string
var value int
err := db.QueryRow("SELECT name, value FROM mvcc_demo WHERE id = $1", id).Scan(&name, &value)
if err != nil {
log.Printf("读取失败: %v", err)
return
}
fmt.Printf("读取到数据: id=%d, name=%s, value=%d\n", id, name, value)
}
func updateData(db *sql.DB, wg *sync.WaitGroup, id int, newValue int) {
defer wg.Done()
tx, err := db.Begin()
if err != nil {
log.Printf("开启事务失败: %v", err)
return
}
defer tx.Rollback()
// 更新数据,MVCC会创建新的版本
_, err = tx.Exec("UPDATE mvcc_demo SET value = $1 WHERE id = $2", newValue, id)
if err != nil {
log.Printf("更新失败: %v", err)
return
}
// 模拟长时间运行的事务
fmt.Printf("开始更新数据 id=%d 为 %d\n", id, newValue)
// time.Sleep(2 * time.Second)
if err := tx.Commit(); err != nil {
log.Printf("提交失败: %v", err)
return
}
fmt.Printf("完成更新数据 id=%d 为 %d\n", id, newValue)
}
func main() {
// 连接PostgreSQL数据库
db, err := sql.Open("postgres", "host=localhost port=5432 user=postgres password=password dbname=testdb sslmode=disable")
if err != nil {
log.Fatalf("无法连接数据库: %v", err)
}
defer db.Close()
// 准备测试表
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS mvcc_demo (
id SERIAL PRIMARY KEY,
name VARCHAR(50),
value INT
)
`)
if err != nil {
log.Fatalf("创建表失败: %v", err)
}
// 插入测试数据
_, err = db.Exec("INSERT INTO mvcc_demo (id, name, value) VALUES (1, 'test', 100) ON CONFLICT (id) DO NOTHING")
if err != nil {
log.Fatalf("插入数据失败: %v", err)
}
var wg sync.WaitGroup
// 启动多个读取操作
for i := 0; i < 3; i++ {
wg.Add(1)
go readWithMVCC(db, &wg, 1)
}
// 启动更新操作
wg.Add(1)
go updateData(db, &wg, 1, 200)
// 再启动多个读取操作
for i := 0; i < 3; i++ {
wg.Add(1)
go readWithMVCC(db, &wg, 1)
}
wg.Wait()
fmt.Println("所有操作完成")
}
5. 分布式事务¶
分布式事务用于处理跨多个数据库或服务的事务操作,确保数据一致性。
5.1 两阶段提交协议¶
两阶段提交(2PC)是最经典的分布式事务协议,分为准备阶段和提交阶段:
package main
import (
"database/sql"
"errors"
"fmt"
"log"
_ "github.com/go-sql-driver/mysql"
)
// 分布式事务管理器
type TwoPhaseCommitManager struct {
tx1 *sql.Tx // 第一个数据库事务
tx2 *sql.Tx // 第二个数据库事务
db1 *sql.DB // 第一个数据库连接
db2 *sql.DB // 第二个数据库连接
}
// 创建新的两阶段提交管理器
func NewTwoPhaseCommitManager(db1, db2 *sql.DB) (*TwoPhaseCommitManager, error) {
tx1, err := db1.Begin()
if err != nil {
return nil, err
}
tx2, err := db2.Begin()
if err != nil {
tx1.Rollback()
return nil, err
}
return &TwoPhaseCommitManager{
tx1: tx1,
tx2: tx2,
db1: db1,
db2: db2,
}, nil
}
// 准备阶段:执行操作但不提交
func (m *TwoPhaseCommitManager) Prepare() error {
// 在实际应用中,这里会执行具体的业务操作
// 这里仅作为示例
_, err := m.tx1.Exec("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
if err != nil {
return err
}
_, err = m.tx2.Exec("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
if err != nil {
return err
}
// 准备阶段成功
return nil
}
// 提交阶段:提交所有事务
func (m *TwoPhaseCommitManager) Commit() error {
// 先提交第一个事务
if err := m.tx1.Commit(); err != nil {
// 如果第一个事务提交失败,回滚第二个
m.tx2.Rollback()
return errors.New("提交第一个事务失败: " + err.Error())
}
// 再提交第二个事务
if err := m.tx2.Commit(); err != nil {
// 这里是一个问题点:第一个事务已经提交,第二个失败
// 实际应用中需要有补偿机制
log.Printf("警告: 第一个事务已提交,但第二个事务提交失败: %v", err)
return errors.New("提交第二个事务失败: " + err.Error())
}
return nil
}
// 回滚所有事务
func (m *TwoPhaseCommitManager) Rollback() error {
err1 := m.tx1.Rollback()
err2 := m.tx2.Rollback()
if err1 != nil && err2 != nil {
return errors.New(fmt.Sprintf("两个事务回滚都失败: %v 和 %v", err1, err2))
}
if err1 != nil {
return errors.New("第一个事务回滚失败: " + err1.Error())
}
if err2 != nil {
return errors.New("第二个事务回滚失败: " + err2.Error())
}
return nil
}
func main() {
// 连接两个不同的数据库
db1, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/bankdb1")
if err != nil {
log.Fatalf("无法连接数据库1: %v", err)
}
defer db1.Close()
db2, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/bankdb2")
if err != nil {
log.Fatalf("无法连接数据库2: %v", err)
}
defer db2.Close()
// 创建两阶段提交管理器
tpcManager, err := NewTwoPhaseCommitManager(db1, db2)
if err != nil {
log.Fatalf("创建两阶段提交管理器失败: %v", err)
}
// 执行准备阶段
if err := tpcManager.Prepare(); err != nil {
log.Printf("准备阶段失败,开始回滚: %v", err)
if rollbackErr := tpcManager.Rollback(); rollbackErr != nil {
log.Fatalf("回滚失败: %v", rollbackErr)
}
return
}
// 执行提交阶段
if err := tpcManager.Commit(); err != nil {
log.Fatalf("提交阶段失败: %v", err)
}
fmt.Println("分布式事务执行成功")
}
5.2 Saga事务模式¶
Saga模式将分布式事务分解为一系列本地事务,每个本地事务都有对应的补偿事务,用于在出错时回滚:
package main
import (
"database/sql"
"errors"
"fmt"
"log"
_ "github.com/go-sql-driver/mysql"
)
// Saga事务步骤接口
type SagaStep interface {
Execute() error // 执行步骤
Compensate() error // 补偿操作(回滚)
}
// 转账第一步:从账户A扣款
type DeductStep struct {
db *sql.DB
accountID int
amount float64
}
func NewDeductStep(db *sql.DB, accountID int, amount float64) *DeductStep {
return &DeductStep{
db: db,
accountID: accountID,
amount: amount,
}
}
func (s *DeductStep) Execute() error {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// 检查余额
var balance float64
err = tx.QueryRow("SELECT balance FROM accounts WHERE id = ?", s.accountID).Scan(&balance)
if err != nil {
return err
}
if balance < s.amount {
return errors.New("余额不足")
}
// 扣款
_, err = tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", s.amount, s.accountID)
if err != nil {
return err
}
return tx.Commit()
}
func (s *DeductStep) Compensate() error {
// 补偿操作:将扣除的金额加回去
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
_, err = tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", s.amount, s.accountID)
if err != nil {
return err
}
return tx.Commit()
}
// 转账第二步:向账户B存款
type DepositStep struct {
db *sql.DB
accountID int
amount float64
}
func NewDepositStep(db *sql.DB, accountID int, amount float64) *DepositStep {
return &DepositStep{
db: db,
accountID: accountID,
amount: amount,
}
}
func (s *DepositStep) Execute() error {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// 存款
_, err = tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", s.amount, s.accountID)
if err != nil {
return err
}
return tx.Commit()
}
func (s *DepositStep) Compensate() error {
// 补偿操作:将存入的金额扣回去
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// 先检查余额是否足够
var balance float64
err = tx.QueryRow("SELECT balance FROM accounts WHERE id = ?", s.accountID).Scan(&balance)
if err != nil {
return err
}
if balance < s.amount {
return errors.New("余额不足,无法补偿")
}
_, err = tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", s.amount, s.accountID)
if err != nil {
return err
}
return tx.Commit()
}
// Saga管理器
type SagaManager struct {
steps []SagaStep
}
func NewSagaManager() *SagaManager {
return &SagaManager{
steps: []SagaStep{},
}
}
func (m *SagaManager) AddStep(step SagaStep) {
m.steps = append(m.steps, step)
}
// 执行Saga事务
func (m *SagaManager) Execute() error {
// 记录已成功执行的步骤,用于出错时回滚
completedSteps := []SagaStep{}
for _, step := range m.steps {
// 执行步骤
if err := step.Execute(); err != nil {
log.Printf("步骤执行失败: %v,开始回滚", err)
// 回滚已完成的步骤
for i := len(completedSteps) - 1; i >= 0; i-- {
if err := completedSteps[i].Compensate(); err != nil {
log.Printf("补偿操作失败: %v", err)
// 补偿操作失败是严重错误,需要人工干预
return errors.New("补偿操作失败,需要人工干预: " + err.Error())
}
}
return err
}
completedSteps = append(completedSteps, step)
}
return nil
}
func main() {
// 连接数据库
db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/bankdb")
if err != nil {
log.Fatalf("无法连接数据库: %v", err)
}
defer db.Close()
// 创建Saga管理器
saga := NewSagaManager()
// 添加事务步骤
amount := 100.0
saga.AddStep(NewDeductStep(db, 1, amount)) // 从账户1扣款
saga.AddStep(NewDepositStep(db, 2, amount)) // 向账户2存款
// 执行Saga事务
if err := saga.Execute(); err != nil {
log.Fatalf("Saga事务执行失败: %v", err)
}
fmt.Println("Saga事务执行成功")
}
5.3 分布式事务框架应用¶
在实际开发中,我们通常会使用成熟的分布式事务框架,如Seata、TCC-Transaction等。以下是使用Go语言集成分布式事务框架的示例:
package main
import (
"fmt"
"log"
// 假设这是一个分布式事务框架
"github.com/example/distributed-transaction-framework/dtf"
"github.com/example/distributed-transaction-framework/dtf/transaction"
)
// 本地事务参与者1:扣款服务
type DeductParticipant struct {
AccountID int
Amount float64
}
func (p *DeductParticipant) Prepare() (bool, error) {
fmt.Printf("准备扣款: 账户 %d, 金额 %.2f\n", p.AccountID, p.Amount)
// 执行准备工作,检查余额等
return true, nil
}
func (p *DeductParticipant) Commit() error {
fmt.Printf("执行扣款: 账户 %d, 金额 %.2f\n", p.AccountID, p.Amount)
// 实际执行扣款操作
return nil
}
func (p *DeductParticipant) Rollback() error {
fmt.Printf("回滚扣款: 账户 %d, 金额 %.2f\n", p.AccountID, p.Amount)
// 执行回滚操作
return nil
}
// 本地事务参与者2:存款服务
type DepositParticipant struct {
AccountID int
Amount float64
}
func (p *DepositParticipant) Prepare() (bool, error) {
fmt.Printf("准备存款: 账户 %d, 金额 %.2f\n", p.AccountID, p.Amount)
// 执行准备工作
return true, nil
}
func (p *DepositParticipant) Commit() error {
fmt.Printf("执行存款: 账户 %d, 金额 %.2f\n", p.AccountID, p.Amount)
// 实际执行存款操作
return nil
}
func (p *DepositParticipant) Rollback() error {
fmt.Printf("回滚存款: 账户 %d, 金额 %.2f\n", p.AccountID, p.Amount)
// 执行回滚操作
return nil
}
func main() {
// 初始化分布式事务框架
err := dtf.Init(dtf.Config{
// 配置注册中心、事务日志等
Registry: "etcd://localhost:2379",
LogLevel: "info",
})
if err != nil {
log.Fatalf("初始化分布式事务框架失败: %v", err)
}
defer dtf.Shutdown()
// 创建全局事务
globalTx, err := transaction.NewGlobalTransaction("transfer-service", "transfer-tx-12345")
if err != nil {
log.Fatalf("创建全局事务失败: %v", err)
}
// 添加参与者
amount := 100.0
globalTx.AddParticipant(&DeductParticipant{AccountID: 1, Amount: amount})
globalTx.AddParticipant(&DepositParticipant{AccountID: 2, Amount: amount})
// 执行全局事务
result, err := globalTx.Execute()
if err != nil {
log.Fatalf("全局事务执行失败: %v", err)
}
if result.Success {
fmt.Println("分布式事务执行成功")
} else {
fmt.Println("分布式事务执行失败,已回滚")
}
}
实战练习¶
练习1:银行转账系统事务设计¶
设计一个银行转账系统,要求: 1. 实现账户之间的转账功能,确保事务的ACID特性 2. 处理并发转账场景,避免死锁 3. 实现转账失败时的完整回滚 4. 支持事务日志记录,用于审计和故障恢复
提示:使用本节学习的事务管理、锁机制和并发控制技术,确保转账过程中的数据一致性。
练习2:高并发场景下的并发控制¶
设计一个高并发的库存管理系统,要求: 1. 处理大量并发的库存扣减请求 2. 比较不同并发控制策略(悲观锁、乐观锁、MVCC)的性能 3. 实现防止超卖的机制 4. 设计合理的隔离级别,在一致性和性能间取得平衡
提示:可以使用Go的并发特性模拟大量并发请求,通过基准测试比较不同策略的性能。
练习3:分布式事务解决方案实现¶
实现一个跨多个服务的订单处理系统,要求: 1. 订单创建需要同时操作订单服务、库存服务和支付服务 2. 实现两阶段提交和Saga模式两种分布式事务方案 3. 比较两种方案在不同故障场景下的表现 4. 实现事务状态的持久化,支持系统崩溃后的恢复
提示:可以使用消息队列辅助实现最终一致性,考虑各种异常情况的处理。
详细内容待补充...