跳转至

7.5 事务处理与并发控制

学习目标

  • 深入理解事务的ACID特性
  • 掌握不同事务隔离级别的应用
  • 熟练处理并发控制问题
  • 学会设计分布式事务解决方案

核心内容

1. 事务基础理论

1.1 ACID特性详解

事务是数据库操作的基本单位,具有四个关键特性,通常称为ACID特性:

  • 原子性(Atomicity):事务中的所有操作要么全部完成,要么全部不完成,不会处于中间状态。如果事务执行过程中发生错误,会回滚到事务开始前的状态。

  • 一致性(Consistency):事务必须使数据库从一个一致性状态变换到另一个一致性状态。例如,在银行转账中,总金额应该保持不变。

  • 隔离性(Isolation):多个事务并发执行时,一个事务的执行不应影响其他事务的执行。数据库通过隔离级别控制并发事务之间的相互影响。

  • 持久性(Durability):一旦事务提交,其对数据库的修改就是永久性的,即使系统崩溃也不会丢失。

1.2 事务的生命周期

一个完整的事务生命周期包含以下阶段:

  1. 开始(Begin):事务开始,数据库记录当前状态。
  2. 执行(Execute):执行一系列数据库操作。
  3. 提交(Commit):所有操作成功完成,将修改永久保存到数据库。
  4. 回滚(Rollback):如果执行过程中发生错误,撤销所有已执行的操作,恢复到事务开始前的状态。

事务的状态转换:活跃状态 → 部分提交状态 → 提交状态 或 失败状态 → 中止状态。

1.3 事务日志机制

事务日志是保证事务ACID特性的关键机制,主要用于:

  • 崩溃恢复:系统崩溃后,通过日志恢复未完成的事务
  • 事务回滚:当需要撤销操作时,使用日志记录的信息
  • 数据复制:在主从复制中,日志用于同步数据

常见的日志类型包括: - 重做日志(Redo Log):记录数据修改后的值,用于崩溃后重做操作 - 撤销日志(Undo Log):记录数据修改前的值,用于事务回滚

2. Go语言事务操作

Go语言通过数据库驱动提供事务支持,标准库中的database/sql包定义了事务接口。

2.1 事务的开启与提交

下面是一个使用Go语言操作MySQL事务的示例:

package main

import (
    "database/sql"
    "fmt"
    "log"

    _ "github.com/go-sql-driver/mysql"
)

func main() {
    // 连接数据库
    db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/testdb")
    if err != nil {
        log.Fatalf("无法连接数据库: %v", err)
    }
    defer db.Close()

    // 验证连接
    err = db.Ping()
    if err != nil {
        log.Fatalf("无法验证数据库连接: %v", err)
    }

    // 开启事务
    tx, err := db.Begin()
    if err != nil {
        log.Fatalf("开启事务失败: %v", err)
    }

    // 执行SQL操作
    _, err = tx.Exec("INSERT INTO users (name, email) VALUES (?, ?)", "张三", "zhangsan@example.com")
    if err != nil {
        log.Printf("执行SQL失败: %v", err)
        // 发生错误时回滚
        if rollbackErr := tx.Rollback(); rollbackErr != nil {
            log.Printf("回滚事务失败: %v", rollbackErr)
        }
        return
    }

    // 提交事务
    if err := tx.Commit(); err != nil {
        log.Printf("提交事务失败: %v", err)
        // 提交失败时尝试回滚
        if rollbackErr := tx.Rollback(); rollbackErr != nil {
            log.Printf("回滚事务失败: %v", rollbackErr)
        }
        return
    }

    fmt.Println("事务执行成功并提交")
}

2.2 事务回滚处理

在Go中,事务回滚通常在发生错误时执行。以下示例展示了更完整的错误处理和回滚机制:

package main

import (
    "database/sql"
    "fmt"
    "log"

    _ "github.com/go-sql-driver/mysql"
)

func transferMoney(db *sql.DB, fromAccount, toAccount int, amount float64) error {
    // 开启事务
    tx, err := db.Begin()
    if err != nil {
        return fmt.Errorf("开启事务失败: %v", err)
    }
    defer func() {
        // 如果发生panic,回滚事务
        if r := recover(); r != nil {
            log.Printf("发生panic,回滚事务: %v", r)
            tx.Rollback()
        }
    }()

    // 检查转出账户余额
    var balance float64
    err = tx.QueryRow("SELECT balance FROM accounts WHERE id = ?", fromAccount).Scan(&balance)
    if err != nil {
        tx.Rollback()
        return fmt.Errorf("查询余额失败: %v", err)
    }

    // 检查余额是否充足
    if balance < amount {
        tx.Rollback()
        return fmt.Errorf("余额不足")
    }

    // 减少转出账户余额
    _, err = tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, fromAccount)
    if err != nil {
        tx.Rollback()
        return fmt.Errorf("更新转出账户失败: %v", err)
    }

    // 增加转入账户余额
    _, err = tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, toAccount)
    if err != nil {
        tx.Rollback()
        return fmt.Errorf("更新转入账户失败: %v", err)
    }

    // 提交事务
    if err := tx.Commit(); err != nil {
        tx.Rollback()
        return fmt.Errorf("提交事务失败: %v", err)
    }

    return nil
}

func main() {
    // 连接数据库
    db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/bankdb")
    if err != nil {
        log.Fatalf("无法连接数据库: %v", err)
    }
    defer db.Close()

    // 执行转账
    err = transferMoney(db, 1, 2, 100.0)
    if err != nil {
        log.Fatalf("转账失败: %v", err)
    }

    fmt.Println("转账成功")
}

2.3 嵌套事务管理

Go的database/sql包本身不直接支持嵌套事务,但可以通过保存点(savepoint)实现类似功能:

package main

import (
    "database/sql"
    "fmt"
    "log"

    _ "github.com/go-sql-driver/mysql"
)

func main() {
    // 连接数据库
    db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/testdb")
    if err != nil {
        log.Fatalf("无法连接数据库: %v", err)
    }
    defer db.Close()

    // 开启外层事务
    tx, err := db.Begin()
    if err != nil {
        log.Fatalf("开启事务失败: %v", err)
    }

    // 执行外层操作
    _, err = tx.Exec("INSERT INTO orders (user_id, total) VALUES (?, ?)", 1, 200)
    if err != nil {
        tx.Rollback()
        log.Fatalf("插入订单失败: %v", err)
    }

    // 创建保存点,模拟嵌套事务开始
    _, err = tx.Exec("SAVEPOINT order_items_savepoint")
    if err != nil {
        tx.Rollback()
        log.Fatalf("创建保存点失败: %v", err)
    }

    // 执行"嵌套事务"中的操作
    _, err = tx.Exec("INSERT INTO order_items (order_id, product_id, quantity) VALUES (?, ?, ?)", 1, 101, 2)
    if err != nil {
        // 回滚到保存点,而不是整个事务
        _, rollbackErr := tx.Exec("ROLLBACK TO SAVEPOINT order_items_savepoint")
        if rollbackErr != nil {
            tx.Rollback()
            log.Fatalf("回滚到保存点失败: %v", rollbackErr)
        }
        log.Printf("插入订单项失败,已回滚到保存点: %v", err)
    }

    // 提交整个事务
    if err := tx.Commit(); err != nil {
        log.Fatalf("提交事务失败: %v", err)
    }

    fmt.Println("事务执行成功")
}

3. 事务隔离级别

数据库通过隔离级别控制并发事务之间的相互影响,不同的隔离级别提供不同的一致性保证和性能特性。

3.1 四种隔离级别对比

  1. 读未提交(Read Uncommitted)
  2. 最低的隔离级别
  3. 允许事务查看其他未提交事务的修改
  4. 可能导致脏读、不可重复读和幻读

  5. 读已提交(Read Committed)

  6. 保证事务只能看到已提交事务的修改
  7. 避免脏读,但可能出现不可重复读和幻读
  8. 是许多数据库的默认隔离级别(如PostgreSQL、SQL Server)

  9. 可重复读(Repeatable Read)

  10. 保证事务在整个过程中看到相同的数据
  11. 避免脏读和不可重复读,但可能出现幻读
  12. 是MySQL的默认隔离级别

  13. 串行化(Serializable)

  14. 最高的隔离级别
  15. 通过强制事务串行执行避免所有并发问题
  16. 完全避免脏读、不可重复读和幻读,但性能最差

3.2 隔离级别的选择策略

在Go中设置事务隔离级别:

package main

import (
    "database/sql"
    "fmt"
    "log"

    _ "github.com/go-sql-driver/mysql"
)

func main() {
    // 连接数据库
    db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/testdb")
    if err != nil {
        log.Fatalf("无法连接数据库: %v", err)
    }
    defer db.Close()

    // 查看数据库当前默认隔离级别
    var isolationLevel string
    err = db.QueryRow("SELECT @@tx_isolation").Scan(&isolationLevel)
    if err != nil {
        log.Fatalf("查询隔离级别失败: %v", err)
    }
    fmt.Printf("默认隔离级别: %s\n", isolationLevel)

    // 开启事务并设置隔离级别
    tx, err := db.Begin()
    if err != nil {
        log.Fatalf("开启事务失败: %v", err)
    }

    // 设置隔离级别为可重复读
    _, err = tx.Exec("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
    if err != nil {
        tx.Rollback()
        log.Fatalf("设置隔离级别失败: %v", err)
    }

    // 执行数据库操作...
    _, err = tx.Exec("INSERT INTO products (name, price) VALUES (?, ?)", "测试商品", 99.99)
    if err != nil {
        tx.Rollback()
        log.Fatalf("插入数据失败: %v", err)
    }

    // 提交事务
    if err := tx.Commit(); err != nil {
        log.Fatalf("提交事务失败: %v", err)
    }

    fmt.Println("事务执行成功")
}

3.3 隔离级别与性能权衡

隔离级别与性能的关系通常是:隔离级别越高,一致性保证越强,但并发性能越差。

选择隔离级别时应考虑:

  1. 应用的一致性需求:金融交易系统通常需要更高的隔离级别
  2. 并发访问量:高并发系统可能需要在一致性和性能间做平衡
  3. 数据修改频率:频繁修改的数据可能需要更高的隔离级别

以下是一个在不同场景下选择隔离级别的示例:

package main

import (
    "database/sql"
    "fmt"
    "log"

    _ "github.com/go-sql-driver/mysql"
)

// 根据业务场景设置合适的隔离级别
func beginTransactionWithIsolation(db *sql.DB, scenario string) (*sql.Tx, error) {
    tx, err := db.Begin()
    if err != nil {
        return nil, err
    }

    var isolationLevel string
    switch scenario {
    case "financial": // 金融交易,需要高隔离级别
        isolationLevel = "SERIALIZABLE"
    case "reporting": // 报表生成,可容忍较低隔离级别
        isolationLevel = "READ COMMITTED"
    case "inventory": // 库存管理,需要中等隔离级别
        isolationLevel = "REPEATABLE READ"
    default:
        isolationLevel = "REPEATABLE READ" // 默认级别
    }

    _, err = tx.Exec(fmt.Sprintf("SET TRANSACTION ISOLATION LEVEL %s", isolationLevel))
    if err != nil {
        tx.Rollback()
        return nil, err
    }

    return tx, nil
}

func main() {
    // 连接数据库
    db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/testdb")
    if err != nil {
        log.Fatalf("无法连接数据库: %v", err)
    }
    defer db.Close()

    // 金融交易场景,使用最高隔离级别
    tx, err := beginTransactionWithIsolation(db, "financial")
    if err != nil {
        log.Fatalf("开启事务失败: %v", err)
    }

    // 执行金融交易操作...
    fmt.Println("执行金融交易操作")

    // 提交事务
    if err := tx.Commit(); err != nil {
        log.Fatalf("提交事务失败: %v", err)
    }

    fmt.Println("事务执行成功")
}

4. 并发控制机制

并发控制是数据库管理系统处理多个并发事务的关键技术,确保事务的隔离性和一致性。

4.1 锁机制与死锁预防

锁是最常用的并发控制机制,Go中可以通过数据库提供的锁机制或应用层锁实现。

package main

import (
    "database/sql"
    "fmt"
    "log"
    "sync"
    "time"

    _ "github.com/go-sql-driver/mysql"
)

// 使用数据库行级锁
func updateWithRowLock(db *sql.DB, productID int, quantity int) error {
    tx, err := db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 使用SELECT ... FOR UPDATE获取行级锁
    var currentStock int
    err = tx.QueryRow("SELECT stock FROM products WHERE id = ? FOR UPDATE", productID).Scan(&currentStock)
    if err != nil {
        return err
    }

    if currentStock < quantity {
        return fmt.Errorf("库存不足")
    }

    // 更新库存
    _, err = tx.Exec("UPDATE products SET stock = stock - ? WHERE id = ?", quantity, productID)
    if err != nil {
        return err
    }

    return tx.Commit()
}

// 应用层锁防止死锁
var (
    lockMap = make(map[int]*sync.Mutex)
    mapLock sync.Mutex
)

func getLock(resourceID int) *sync.Mutex {
    mapLock.Lock()
    defer mapLock.Unlock()

    if _, exists := lockMap[resourceID]; !exists {
        lockMap[resourceID] = &sync.Mutex{}
    }
    return lockMap[resourceID]
}

// 按资源ID顺序获取锁,预防死锁
func safeLock(resources ...int) []*sync.Mutex {
    // 排序资源ID,确保获取锁的顺序一致
    for i := 0; i < len(resources); i++ {
        for j := i + 1; j < len(resources); j++ {
            if resources[i] > resources[j] {
                resources[i], resources[j] = resources[j], resources[i]
            }
        }
    }

    // 按顺序获取锁
    locks := make([]*sync.Mutex, len(resources))
    for i, id := range resources {
        lock := getLock(id)
        lock.Lock()
        locks[i] = lock
    }

    return locks
}

func main() {
    db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/testdb")
    if err != nil {
        log.Fatalf("无法连接数据库: %v", err)
    }
    defer db.Close()

    // 模拟并发更新
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            err := updateWithRowLock(db, 1, 1)
            if err != nil {
                log.Printf("更新失败: %v", err)
            } else {
                fmt.Println("更新成功")
            }
        }()
    }

    wg.Wait()
    fmt.Println("所有并发操作完成")
}

4.2 乐观锁与悲观锁

乐观锁和悲观锁是两种不同的并发控制策略:

  • 悲观锁:假设冲突经常发生,在操作数据前先获取锁,阻止其他事务修改
  • 乐观锁:假设冲突很少发生,在更新时检查数据是否被修改过,如果被修改则重试
package main

import (
    "database/sql"
    "errors"
    "fmt"
    "log"
    "sync"

    _ "github.com/go-sql-driver/mysql"
)

// 悲观锁实现
func decreaseStockPessimistic(db *sql.DB, productID int, quantity int) error {
    tx, err := db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 获取悲观锁
    var stock int
    err = tx.QueryRow("SELECT stock FROM products WHERE id = ? FOR UPDATE", productID).Scan(&stock)
    if err != nil {
        return err
    }

    if stock < quantity {
        return errors.New("库存不足")
    }

    // 更新库存
    _, err = tx.Exec("UPDATE products SET stock = stock - ? WHERE id = ?", quantity, productID)
    if err != nil {
        return err
    }

    return tx.Commit()
}

// 乐观锁实现
func decreaseStockOptimistic(db *sql.DB, productID int, quantity int, maxRetries int) error {
    for retry := 0; retry < maxRetries; retry++ {
        tx, err := db.Begin()
        if err != nil {
            return err
        }

        // 获取当前库存和版本号
        var stock, version int
        err = tx.QueryRow("SELECT stock, version FROM products WHERE id = ?", productID).Scan(&stock, &version)
        if err != nil {
            tx.Rollback()
            return err
        }

        if stock < quantity {
            tx.Rollback()
            return errors.New("库存不足")
        }

        // 使用版本号作为乐观锁,只有版本号匹配时才更新
        result, err := tx.Exec(
            "UPDATE products SET stock = stock - ?, version = version + 1 WHERE id = ? AND version = ?",
            quantity, productID, version,
        )
        if err != nil {
            tx.Rollback()
            return err
        }

        // 检查是否有行被更新
        rowsAffected, err := result.RowsAffected()
        if err != nil {
            tx.Rollback()
            return err
        }

        if rowsAffected == 0 {
            // 没有行被更新,说明数据已被修改,重试
            tx.Rollback()
            continue
        }

        // 提交事务成功
        if err := tx.Commit(); err != nil {
            return err
        }
        return nil
    }

    return errors.New("达到最大重试次数,更新失败")
}

func main() {
    db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/testdb")
    if err != nil {
        log.Fatalf("无法连接数据库: %v", err)
    }
    defer db.Close()

    var wg sync.WaitGroup

    // 测试悲观锁
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            err := decreaseStockPessimistic(db, 1, 1)
            if err != nil {
                log.Printf("悲观锁更新失败: %v", err)
            } else {
                fmt.Println("悲观锁更新成功")
            }
        }()
    }

    // 测试乐观锁
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            err := decreaseStockOptimistic(db, 2, 1, 3)
            if err != nil {
                log.Printf("乐观锁更新失败: %v", err)
            } else {
                fmt.Println("乐观锁更新成功")
            }
        }()
    }

    wg.Wait()
    fmt.Println("所有并发操作完成")
}

4.3 MVCC多版本并发控制

MVCC(Multi-Version Concurrency Control)是许多数据库(如PostgreSQL、MySQL InnoDB)采用的并发控制机制,它通过保存数据的多个版本来实现高并发访问。

在Go中使用MVCC特性主要通过数据库本身的支持,应用层不需要特殊处理:

package main

import (
    "database/sql"
    "fmt"
    "log"
    "sync"

    _ "github.com/lib/pq" // PostgreSQL驱动
)

func readWithMVCC(db *sql.DB, wg *sync.WaitGroup, id int) {
    defer wg.Done()

    // 读取数据,MVCC会确保看到一致的版本
    var name string
    var value int
    err := db.QueryRow("SELECT name, value FROM mvcc_demo WHERE id = $1", id).Scan(&name, &value)
    if err != nil {
        log.Printf("读取失败: %v", err)
        return
    }

    fmt.Printf("读取到数据: id=%d, name=%s, value=%d\n", id, name, value)
}

func updateData(db *sql.DB, wg *sync.WaitGroup, id int, newValue int) {
    defer wg.Done()

    tx, err := db.Begin()
    if err != nil {
        log.Printf("开启事务失败: %v", err)
        return
    }
    defer tx.Rollback()

    // 更新数据,MVCC会创建新的版本
    _, err = tx.Exec("UPDATE mvcc_demo SET value = $1 WHERE id = $2", newValue, id)
    if err != nil {
        log.Printf("更新失败: %v", err)
        return
    }

    // 模拟长时间运行的事务
    fmt.Printf("开始更新数据 id=%d 为 %d\n", id, newValue)
    // time.Sleep(2 * time.Second)

    if err := tx.Commit(); err != nil {
        log.Printf("提交失败: %v", err)
        return
    }

    fmt.Printf("完成更新数据 id=%d 为 %d\n", id, newValue)
}

func main() {
    // 连接PostgreSQL数据库
    db, err := sql.Open("postgres", "host=localhost port=5432 user=postgres password=password dbname=testdb sslmode=disable")
    if err != nil {
        log.Fatalf("无法连接数据库: %v", err)
    }
    defer db.Close()

    // 准备测试表
    _, err = db.Exec(`
        CREATE TABLE IF NOT EXISTS mvcc_demo (
            id SERIAL PRIMARY KEY,
            name VARCHAR(50),
            value INT
        )
    `)
    if err != nil {
        log.Fatalf("创建表失败: %v", err)
    }

    // 插入测试数据
    _, err = db.Exec("INSERT INTO mvcc_demo (id, name, value) VALUES (1, 'test', 100) ON CONFLICT (id) DO NOTHING")
    if err != nil {
        log.Fatalf("插入数据失败: %v", err)
    }

    var wg sync.WaitGroup

    // 启动多个读取操作
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go readWithMVCC(db, &wg, 1)
    }

    // 启动更新操作
    wg.Add(1)
    go updateData(db, &wg, 1, 200)

    // 再启动多个读取操作
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go readWithMVCC(db, &wg, 1)
    }

    wg.Wait()
    fmt.Println("所有操作完成")
}

5. 分布式事务

分布式事务用于处理跨多个数据库或服务的事务操作,确保数据一致性。

5.1 两阶段提交协议

两阶段提交(2PC)是最经典的分布式事务协议,分为准备阶段和提交阶段:

package main

import (
    "database/sql"
    "errors"
    "fmt"
    "log"

    _ "github.com/go-sql-driver/mysql"
)

// 分布式事务管理器
type TwoPhaseCommitManager struct {
    tx1 *sql.Tx // 第一个数据库事务
    tx2 *sql.Tx // 第二个数据库事务
    db1 *sql.DB // 第一个数据库连接
    db2 *sql.DB // 第二个数据库连接
}

// 创建新的两阶段提交管理器
func NewTwoPhaseCommitManager(db1, db2 *sql.DB) (*TwoPhaseCommitManager, error) {
    tx1, err := db1.Begin()
    if err != nil {
        return nil, err
    }

    tx2, err := db2.Begin()
    if err != nil {
        tx1.Rollback()
        return nil, err
    }

    return &TwoPhaseCommitManager{
        tx1: tx1,
        tx2: tx2,
        db1: db1,
        db2: db2,
    }, nil
}

// 准备阶段:执行操作但不提交
func (m *TwoPhaseCommitManager) Prepare() error {
    // 在实际应用中,这里会执行具体的业务操作
    // 这里仅作为示例
    _, err := m.tx1.Exec("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    if err != nil {
        return err
    }

    _, err = m.tx2.Exec("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    if err != nil {
        return err
    }

    // 准备阶段成功
    return nil
}

// 提交阶段:提交所有事务
func (m *TwoPhaseCommitManager) Commit() error {
    // 先提交第一个事务
    if err := m.tx1.Commit(); err != nil {
        // 如果第一个事务提交失败,回滚第二个
        m.tx2.Rollback()
        return errors.New("提交第一个事务失败: " + err.Error())
    }

    // 再提交第二个事务
    if err := m.tx2.Commit(); err != nil {
        // 这里是一个问题点:第一个事务已经提交,第二个失败
        // 实际应用中需要有补偿机制
        log.Printf("警告: 第一个事务已提交,但第二个事务提交失败: %v", err)
        return errors.New("提交第二个事务失败: " + err.Error())
    }

    return nil
}

// 回滚所有事务
func (m *TwoPhaseCommitManager) Rollback() error {
    err1 := m.tx1.Rollback()
    err2 := m.tx2.Rollback()

    if err1 != nil && err2 != nil {
        return errors.New(fmt.Sprintf("两个事务回滚都失败: %v 和 %v", err1, err2))
    }
    if err1 != nil {
        return errors.New("第一个事务回滚失败: " + err1.Error())
    }
    if err2 != nil {
        return errors.New("第二个事务回滚失败: " + err2.Error())
    }

    return nil
}

func main() {
    // 连接两个不同的数据库
    db1, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/bankdb1")
    if err != nil {
        log.Fatalf("无法连接数据库1: %v", err)
    }
    defer db1.Close()

    db2, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/bankdb2")
    if err != nil {
        log.Fatalf("无法连接数据库2: %v", err)
    }
    defer db2.Close()

    // 创建两阶段提交管理器
    tpcManager, err := NewTwoPhaseCommitManager(db1, db2)
    if err != nil {
        log.Fatalf("创建两阶段提交管理器失败: %v", err)
    }

    // 执行准备阶段
    if err := tpcManager.Prepare(); err != nil {
        log.Printf("准备阶段失败,开始回滚: %v", err)
        if rollbackErr := tpcManager.Rollback(); rollbackErr != nil {
            log.Fatalf("回滚失败: %v", rollbackErr)
        }
        return
    }

    // 执行提交阶段
    if err := tpcManager.Commit(); err != nil {
        log.Fatalf("提交阶段失败: %v", err)
    }

    fmt.Println("分布式事务执行成功")
}

5.2 Saga事务模式

Saga模式将分布式事务分解为一系列本地事务,每个本地事务都有对应的补偿事务,用于在出错时回滚:

package main

import (
    "database/sql"
    "errors"
    "fmt"
    "log"

    _ "github.com/go-sql-driver/mysql"
)

// Saga事务步骤接口
type SagaStep interface {
    Execute() error   // 执行步骤
    Compensate() error // 补偿操作(回滚)
}

// 转账第一步:从账户A扣款
type DeductStep struct {
    db        *sql.DB
    accountID int
    amount    float64
}

func NewDeductStep(db *sql.DB, accountID int, amount float64) *DeductStep {
    return &DeductStep{
        db:        db,
        accountID: accountID,
        amount:    amount,
    }
}

func (s *DeductStep) Execute() error {
    tx, err := s.db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 检查余额
    var balance float64
    err = tx.QueryRow("SELECT balance FROM accounts WHERE id = ?", s.accountID).Scan(&balance)
    if err != nil {
        return err
    }

    if balance < s.amount {
        return errors.New("余额不足")
    }

    // 扣款
    _, err = tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", s.amount, s.accountID)
    if err != nil {
        return err
    }

    return tx.Commit()
}

func (s *DeductStep) Compensate() error {
    // 补偿操作:将扣除的金额加回去
    tx, err := s.db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()

    _, err = tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", s.amount, s.accountID)
    if err != nil {
        return err
    }

    return tx.Commit()
}

// 转账第二步:向账户B存款
type DepositStep struct {
    db        *sql.DB
    accountID int
    amount    float64
}

func NewDepositStep(db *sql.DB, accountID int, amount float64) *DepositStep {
    return &DepositStep{
        db:        db,
        accountID: accountID,
        amount:    amount,
    }
}

func (s *DepositStep) Execute() error {
    tx, err := s.db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 存款
    _, err = tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", s.amount, s.accountID)
    if err != nil {
        return err
    }

    return tx.Commit()
}

func (s *DepositStep) Compensate() error {
    // 补偿操作:将存入的金额扣回去
    tx, err := s.db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 先检查余额是否足够
    var balance float64
    err = tx.QueryRow("SELECT balance FROM accounts WHERE id = ?", s.accountID).Scan(&balance)
    if err != nil {
        return err
    }

    if balance < s.amount {
        return errors.New("余额不足,无法补偿")
    }

    _, err = tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", s.amount, s.accountID)
    if err != nil {
        return err
    }

    return tx.Commit()
}

// Saga管理器
type SagaManager struct {
    steps []SagaStep
}

func NewSagaManager() *SagaManager {
    return &SagaManager{
        steps: []SagaStep{},
    }
}

func (m *SagaManager) AddStep(step SagaStep) {
    m.steps = append(m.steps, step)
}

// 执行Saga事务
func (m *SagaManager) Execute() error {
    // 记录已成功执行的步骤,用于出错时回滚
    completedSteps := []SagaStep{}

    for _, step := range m.steps {
        // 执行步骤
        if err := step.Execute(); err != nil {
            log.Printf("步骤执行失败: %v,开始回滚", err)
            // 回滚已完成的步骤
            for i := len(completedSteps) - 1; i >= 0; i-- {
                if err := completedSteps[i].Compensate(); err != nil {
                    log.Printf("补偿操作失败: %v", err)
                    // 补偿操作失败是严重错误,需要人工干预
                    return errors.New("补偿操作失败,需要人工干预: " + err.Error())
                }
            }
            return err
        }
        completedSteps = append(completedSteps, step)
    }

    return nil
}

func main() {
    // 连接数据库
    db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/bankdb")
    if err != nil {
        log.Fatalf("无法连接数据库: %v", err)
    }
    defer db.Close()

    // 创建Saga管理器
    saga := NewSagaManager()

    // 添加事务步骤
    amount := 100.0
    saga.AddStep(NewDeductStep(db, 1, amount))   // 从账户1扣款
    saga.AddStep(NewDepositStep(db, 2, amount)) // 向账户2存款

    // 执行Saga事务
    if err := saga.Execute(); err != nil {
        log.Fatalf("Saga事务执行失败: %v", err)
    }

    fmt.Println("Saga事务执行成功")
}

5.3 分布式事务框架应用

在实际开发中,我们通常会使用成熟的分布式事务框架,如Seata、TCC-Transaction等。以下是使用Go语言集成分布式事务框架的示例:

package main

import (
    "fmt"
    "log"

    // 假设这是一个分布式事务框架
    "github.com/example/distributed-transaction-framework/dtf"
    "github.com/example/distributed-transaction-framework/dtf/transaction"
)

// 本地事务参与者1:扣款服务
type DeductParticipant struct {
    AccountID int
    Amount    float64
}

func (p *DeductParticipant) Prepare() (bool, error) {
    fmt.Printf("准备扣款: 账户 %d, 金额 %.2f\n", p.AccountID, p.Amount)
    // 执行准备工作,检查余额等
    return true, nil
}

func (p *DeductParticipant) Commit() error {
    fmt.Printf("执行扣款: 账户 %d, 金额 %.2f\n", p.AccountID, p.Amount)
    // 实际执行扣款操作
    return nil
}

func (p *DeductParticipant) Rollback() error {
    fmt.Printf("回滚扣款: 账户 %d, 金额 %.2f\n", p.AccountID, p.Amount)
    // 执行回滚操作
    return nil
}

// 本地事务参与者2:存款服务
type DepositParticipant struct {
    AccountID int
    Amount    float64
}

func (p *DepositParticipant) Prepare() (bool, error) {
    fmt.Printf("准备存款: 账户 %d, 金额 %.2f\n", p.AccountID, p.Amount)
    // 执行准备工作
    return true, nil
}

func (p *DepositParticipant) Commit() error {
    fmt.Printf("执行存款: 账户 %d, 金额 %.2f\n", p.AccountID, p.Amount)
    // 实际执行存款操作
    return nil
}

func (p *DepositParticipant) Rollback() error {
    fmt.Printf("回滚存款: 账户 %d, 金额 %.2f\n", p.AccountID, p.Amount)
    // 执行回滚操作
    return nil
}

func main() {
    // 初始化分布式事务框架
    err := dtf.Init(dtf.Config{
        // 配置注册中心、事务日志等
        Registry: "etcd://localhost:2379",
        LogLevel: "info",
    })
    if err != nil {
        log.Fatalf("初始化分布式事务框架失败: %v", err)
    }
    defer dtf.Shutdown()

    // 创建全局事务
    globalTx, err := transaction.NewGlobalTransaction("transfer-service", "transfer-tx-12345")
    if err != nil {
        log.Fatalf("创建全局事务失败: %v", err)
    }

    // 添加参与者
    amount := 100.0
    globalTx.AddParticipant(&DeductParticipant{AccountID: 1, Amount: amount})
    globalTx.AddParticipant(&DepositParticipant{AccountID: 2, Amount: amount})

    // 执行全局事务
    result, err := globalTx.Execute()
    if err != nil {
        log.Fatalf("全局事务执行失败: %v", err)
    }

    if result.Success {
        fmt.Println("分布式事务执行成功")
    } else {
        fmt.Println("分布式事务执行失败,已回滚")
    }
}

实战练习

练习1:银行转账系统事务设计

设计一个银行转账系统,要求: 1. 实现账户之间的转账功能,确保事务的ACID特性 2. 处理并发转账场景,避免死锁 3. 实现转账失败时的完整回滚 4. 支持事务日志记录,用于审计和故障恢复

提示:使用本节学习的事务管理、锁机制和并发控制技术,确保转账过程中的数据一致性。

练习2:高并发场景下的并发控制

设计一个高并发的库存管理系统,要求: 1. 处理大量并发的库存扣减请求 2. 比较不同并发控制策略(悲观锁、乐观锁、MVCC)的性能 3. 实现防止超卖的机制 4. 设计合理的隔离级别,在一致性和性能间取得平衡

提示:可以使用Go的并发特性模拟大量并发请求,通过基准测试比较不同策略的性能。

练习3:分布式事务解决方案实现

实现一个跨多个服务的订单处理系统,要求: 1. 订单创建需要同时操作订单服务、库存服务和支付服务 2. 实现两阶段提交和Saga模式两种分布式事务方案 3. 比较两种方案在不同故障场景下的表现 4. 实现事务状态的持久化,支持系统崩溃后的恢复

提示:可以使用消息队列辅助实现最终一致性,考虑各种异常情况的处理。


详细内容待补充...