跳转至

7.7 数据库迁移与版本管理

学习目标

  • 理解数据库schema版本管理的重要性
  • 掌握数据库迁移脚本的编写技巧
  • 熟练使用迁移工具进行版本控制
  • 建立完整的数据库变更管理流程

核心内容

1. 数据库迁移概念

1.1 迁移的必要性与优势

数据库迁移是管理数据库结构变更的系统化方法,主要优势包括: - 版本控制:跟踪数据库结构的历史变更 - 一致性:确保所有环境(开发、测试、生产)的数据库结构一致 - 可重复性:迁移脚本可以多次执行,结果一致 - 团队协作:多个开发者可以并行工作而不会冲突

1.2 迁移策略与最佳实践

  • 每次迁移应该是幂等的(可重复执行)
  • 每个迁移脚本应该包含前进和回退两个方向
  • 迁移脚本应该小而专注,每次只做一个明确的变更
  • 在生产环境执行前,必须在测试环境充分验证

1.3 版本控制与回滚机制

  • 使用递增的版本号或时间戳来标识迁移
  • 维护迁移历史表来记录已执行的迁移
  • 回滚机制允许撤销失败的迁移,保证系统稳定性

2. 迁移工具选择

2.1 golang-migrate工具详解

golang-migrate是一个流行的数据库迁移工具,支持多种数据库。

安装:

go install -tags 'postgres' github.com/golang-migrate/migrate/v4/cmd/migrate@latest

基本用法:

# 创建迁移文件
migrate create -ext sql -dir db/migrations -seq create_users_table

# 执行迁移
migrate -path db/migrations -database "postgres://localhost:5432/dbname?sslmode=disable" up

# 回滚迁移
migrate -path db/migrations -database "postgres://localhost:5432/dbname?sslmode=disable" down

2.2 GORM AutoMigrate功能

GORM提供了AutoMigrate功能,可以自动根据模型结构创建或更新表。

package main

import (
    "gorm.io/driver/postgres"
    "gorm.io/gorm"
    "gorm.io/gorm/logger"
)

type User struct {
    ID        uint   `gorm:"primaryKey"`
    Name      string `gorm:"size:100"`
    Email     string `gorm:"uniqueIndex;size:255"`
    Age       int
    CreatedAt time.Time
    UpdatedAt time.Time
}

func main() {
    dsn := "host=localhost user=gorm password=gorm dbname=gorm port=5432 sslmode=disable"
    db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{
        Logger: logger.Default.LogMode(logger.Info),
    })
    if err != nil {
        panic("failed to connect database")
    }

    // 自动迁移
    err = db.AutoMigrate(&User{})
    if err != nil {
        panic("failed to migrate database")
    }

    fmt.Println("Migration completed successfully")
}

2.3 自定义迁移解决方案

对于特定需求,可以开发自定义迁移框架:

package main

import (
    "database/sql"
    "fmt"
    "log"
    "os"

    _ "github.com/lib/pq"
)

type Migration struct {
    ID        int
    Name      string
    UpSQL     string
    DownSQL   string
    AppliedAt sql.NullTime
}

type Migrator struct {
    db *sql.DB
}

func NewMigrator(db *sql.DB) *Migrator {
    return &Migrator{db: db}
}

func (m *Migrator) Setup() error {
    createTableSQL := `
    CREATE TABLE IF NOT EXISTS migrations (
        id SERIAL PRIMARY KEY,
        name VARCHAR(255) NOT NULL UNIQUE,
        applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    `
    _, err := m.db.Exec(createTableSQL)
    return err
}

func (m *Migrator) RunMigrations(migrations []Migration) error {
    for _, migration := range migrations {
        var count int
        err := m.db.QueryRow("SELECT COUNT(*) FROM migrations WHERE name = $1", migration.Name).Scan(&count)
        if err != nil {
            return err
        }

        if count == 0 {
            log.Printf("Applying migration: %s", migration.Name)
            _, err := m.db.Exec(migration.UpSQL)
            if err != nil {
                return fmt.Errorf("failed to apply migration %s: %v", migration.Name, err)
            }

            _, err = m.db.Exec("INSERT INTO migrations (name) VALUES ($1)", migration.Name)
            if err != nil {
                return fmt.Errorf("failed to record migration %s: %v", migration.Name, err)
            }
        }
    }
    return nil
}

func main() {
    db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    migrator := NewMigrator(db)
    if err := migrator.Setup(); err != nil {
        log.Fatal(err)
    }

    migrations := []Migration{
        {
            Name: "create_users_table",
            UpSQL: `
            CREATE TABLE users (
                id SERIAL PRIMARY KEY,
                name VARCHAR(100) NOT NULL,
                email VARCHAR(255) UNIQUE NOT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            `,
            DownSQL: "DROP TABLE IF EXISTS users",
        },
        {
            Name: "add_age_to_users",
            UpSQL: "ALTER TABLE users ADD COLUMN age INTEGER",
            DownSQL: "ALTER TABLE users DROP COLUMN IF EXISTS age",
        },
    }

    if err := migrator.RunMigrations(migrations); err != nil {
        log.Fatal(err)
    }

    log.Println("All migrations applied successfully")
}

3. 迁移脚本编写

3.1 SQL迁移脚本规范

SQL迁移脚本应该遵循以下规范:

-- migrations/000001_create_users_table.up.sql
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_username ON users(username);

-- migrations/000001_create_users_table.down.sql
DROP TABLE IF EXISTS users;

-- migrations/000002_add_profile_to_users.up.sql
ALTER TABLE users 
ADD COLUMN profile_json JSONB,
ADD COLUMN last_login TIMESTAMP WITH TIME ZONE;

CREATE INDEX idx_users_last_login ON users(last_login);

-- migrations/000002_add_profile_to_users.down.sql
ALTER TABLE users 
DROP COLUMN IF EXISTS profile_json,
DROP COLUMN IF EXISTS last_login;

3.2 Go代码迁移实现

对于复杂的迁移逻辑,可以使用Go代码实现:

package main

import (
    "context"
    "database/sql"
    "encoding/json"
    "fmt"
    "log"
    "time"

    _ "github.com/lib/pq"
)

type UserProfile struct {
    Bio      string `json:"bio"`
    Location string `json:"location"`
    Website  string `json:"website"`
}

func main() {
    db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    ctx := context.Background()

    // 迁移1:添加profile列
    log.Println("Adding profile column...")
    _, err = db.ExecContext(ctx, `
        ALTER TABLE users 
        ADD COLUMN IF NOT EXISTS profile_json JSONB
    `)
    if err != nil {
        log.Fatal("Failed to add profile column:", err)
    }

    // 迁移2:为现有用户创建默认profile
    log.Println("Creating default profiles for existing users...")
    rows, err := db.QueryContext(ctx, "SELECT id FROM users WHERE profile_json IS NULL")
    if err != nil {
        log.Fatal("Failed to query users:", err)
    }
    defer rows.Close()

    var userIDs []int64
    for rows.Next() {
        var id int64
        if err := rows.Scan(&id); err != nil {
            log.Fatal("Failed to scan user ID:", err)
        }
        userIDs = append(userIDs, id)
    }

    defaultProfile := UserProfile{
        Bio:      "Welcome to my profile!",
        Location: "Unknown",
        Website:  "",
    }
    profileJSON, err := json.Marshal(defaultProfile)
    if err != nil {
        log.Fatal("Failed to marshal profile:", err)
    }

    for _, id := range userIDs {
        _, err = db.ExecContext(ctx, `
            UPDATE users SET profile_json = $1 WHERE id = $2
        `, profileJSON, id)
        if err != nil {
            log.Fatal("Failed to update user profile:", err)
        }
    }

    log.Println("Migration completed successfully")
}

3.3 数据迁移与结构迁移

数据迁移需要特别注意数据一致性和性能:

package main

import (
    "context"
    "database/sql"
    "log"
    "time"

    _ "github.com/lib/pqs"
)

func main() {
    db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    ctx := context.Background()

    // 开始事务
    tx, err := db.BeginTx(ctx, nil)
    if err != nil {
        log.Fatal(err)
    }
    defer tx.Rollback()

    // 创建新表
    log.Println("Creating new user_profiles table...")
    _, err = tx.ExecContext(ctx, `
        CREATE TABLE user_profiles (
            id SERIAL PRIMARY KEY,
            user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
            bio TEXT,
            location VARCHAR(100),
            website VARCHAR(255),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    `)
    if err != nil {
        log.Fatal("Failed to create user_profiles table:", err)
    }

    // 迁移数据(分批处理,避免锁表时间过长)
    log.Println("Migrating profile data...")
    offset := 0
    batchSize := 1000

    for {
        rows, err := tx.QueryContext(ctx, `
            SELECT id, profile_json 
            FROM users 
            WHERE profile_json IS NOT NULL
            ORDER BY id
            LIMIT $1 OFFSET $2
        `, batchSize, offset)
        if err != nil {
            log.Fatal("Failed to query users:", err)
        }

        var count int
        for rows.Next() {
            count++
            var userID int
            var profileJSON []byte
            if err := rows.Scan(&userID, &profileJSON); err != nil {
                rows.Close()
                log.Fatal("Failed to scan user:", err)
            }

            // 解析JSON并插入新表
            _, err := tx.ExecContext(ctx, `
                INSERT INTO user_profiles (user_id, bio, location, website)
                VALUES ($1, $2, $3, $4)
            `, userID, extractBio(profileJSON), extractLocation(profileJSON), extractWebsite(profileJSON))
            if err != nil {
                rows.Close()
                log.Fatal("Failed to insert profile:", err)
            }
        }
        rows.Close()

        if count < batchSize {
            break // 已处理所有数据
        }
        offset += batchSize

        // 小批量提交,减少事务大小
        if err := tx.Commit(); err != nil {
            log.Fatal("Failed to commit batch:", err)
        }

        // 开始新的事务
        tx, err = db.BeginTx(ctx, nil)
        if err != nil {
            log.Fatal(err)
        }

        time.Sleep(100 * time.Millisecond) // 短暂暂停,减少数据库负载
    }

    // 删除旧列(在确认数据迁移成功后)
    log.Println("Dropping old profile column...")
    _, err = tx.ExecContext(ctx, `ALTER TABLE users DROP COLUMN profile_json`)
    if err != nil {
        log.Fatal("Failed to drop profile column:", err)
    }

    // 提交最终事务
    if err := tx.Commit(); err != nil {
        log.Fatal("Failed to commit migration:", err)
    }

    log.Println("Data migration completed successfully")
}

// 辅助函数用于从JSON中提取字段
func extractBio(profileJSON []byte) string {
    // 实际实现应该解析JSON
    return "Default bio"
}

func extractLocation(profileJSON []byte) string {
    return "Unknown"
}

func extractWebsite(profileJSON []byte) string {
    return ""
}

4. 版本管理策略

4.1 迁移版本号管理

使用时间戳或顺序号作为版本标识:

package main

import (
    "fmt"
    "sort"
    "strconv"
    "strings"
    "time"
)

type MigrationFile struct {
    Version int64
    Name    string
    Path    string
}

func SortMigrationFiles(files []MigrationFile) []MigrationFile {
    sort.Slice(files, func(i, j int) bool {
        return files[i].Version < files[j].Version
    })
    return files
}

func ParseVersionFromFilename(filename string) (int64, error) {
    // 假设文件名格式: 000001_name.up.sql 或 20230101120000_name.up.sql
    parts := strings.SplitN(filename, "_", 2)
    if len(parts) < 1 {
        return 0, fmt.Errorf("invalid filename format")
    }

    // 尝试解析为数字版本号
    versionStr := parts[0]
    version, err := strconv.ParseInt(versionStr, 10, 64)
    if err != nil {
        // 尝试解析为时间戳
        if len(versionStr) == 14 { // YYYYMMDDHHMMSS
            t, err := time.Parse("20060102150405", versionStr)
            if err != nil {
                return 0, fmt.Errorf("invalid version format")
            }
            return t.Unix(), nil
        }
        return 0, fmt.Errorf("invalid version format")
    }

    return version, nil
}

func main() {
    files := []MigrationFile{
        {Name: "000002_add_email.sql"},
        {Name: "000001_create_users.sql"},
        {Name: "20230101120000_initial_schema.sql"},
    }

    for i := range files {
        version, err := ParseVersionFromFilename(files[i].Name)
        if err != nil {
            fmt.Printf("Error parsing %s: %v\n", files[i].Name, err)
            continue
        }
        files[i].Version = version
    }

    sorted := SortMigrationFiles(files)
    for _, file := range sorted {
        fmt.Printf("%d: %s\n", file.Version, file.Name)
    }
}

4.2 分支合并时的冲突处理

建立团队迁移协作规范:

package main

import (
    "bufio"
    "fmt"
    "os"
    "path/filepath"
    "sort"
    "strconv"
    "strings"
)

type MigrationConflictResolver struct {
    migrationsDir string
}

func (r *MigrationConflictResolver) CheckForConflicts() error {
    files, err := filepath.Glob(filepath.Join(r.migrationsDir, "*.sql"))
    if err != nil {
        return err
    }

    versions := make(map[int64][]string)
    for _, file := range files {
        filename := filepath.Base(file)
        version, err := strconv.ParseInt(strings.Split(filename, "_")[0], 10, 64)
        if err != nil {
            continue
        }

        versions[version] = append(versions[version], filename)
    }

    // 检查重复版本号
    for version, files := range versions {
        if len(files) > 1 {
            fmt.Printf("冲突发现! 版本号 %d 被多个文件使用:\n", version)
            for _, file := range files {
                fmt.Printf("  - %s\n", file)
            }
            fmt.Println("请重命名文件来解决冲突")
            return fmt.Errorf("migration version conflict")
        }
    }

    return nil
}

func (r *MigrationConflictResolver) ResolveConflict() error {
    // 获取当前最高版本号
    files, err := filepath.Glob(filepath.Join(r.migrationsDir, "*.sql"))
    if err != nil {
        return err
    }

    var maxVersion int64
    for _, file := range files {
        filename := filepath.Base(file)
        version, err := strconv.ParseInt(strings.Split(filename, "_")[0], 10, 64)
        if err == nil && version > maxVersion {
            maxVersion = version
        }
    }

    // 为新迁移生成下一个版本号
    newVersion := maxVersion + 1
    fmt.Printf("建议使用版本号: %06d\n", newVersion)

    reader := bufio.NewReader(os.Stdin)
    fmt.Print("请输入新迁移文件的名称(不含版本号): ")
    name, _ := reader.ReadString('\n')
    name = strings.TrimSpace(name)

    upFilename := fmt.Sprintf("%06d_%s.up.sql", newVersion, name)
    downFilename := fmt.Sprintf("%06d_%s.down.sql", newVersion, name)

    // 创建迁移文件模板
    upContent := "-- Migration UP content here\n"
    downContent := "-- Migration DOWN content here\n"

    if err := os.WriteFile(filepath.Join(r.migrationsDir, upFilename), []byte(upContent), 0644); err != nil {
        return err
    }
    if err := os.WriteFile(filepath.Join(r.migrationsDir, downFilename), []byte(downContent), 0644); err != nil {
        return err
    }

    fmt.Printf("创建迁移文件: %s 和 %s\n", upFilename, downFilename)
    return nil
}

func main() {
    resolver := &MigrationConflictResolver{migrationsDir: "./migrations"}
    if err := resolver.CheckForConflicts(); err != nil {
        fmt.Println("发现迁移冲突,需要解决...")
        if err := resolver.ResolveConflict(); err != nil {
            fmt.Printf("解决冲突时出错: %v\n", err)
        }
    } else {
        fmt.Println("没有发现迁移冲突")
    }
}

4.3 生产环境迁移策略

生产环境迁移需要特别谨慎:

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "os"
    "time"

    _ "github.com/lib/pq"
)

type ProductionMigrator struct {
    db          *sql.DB
    dryRun      bool
    backupFirst bool
}

func NewProductionMigrator(db *sql.DB, dryRun, backupFirst bool) *ProductionMigrator {
    return &ProductionMigrator{
        db:          db,
        dryRun:      dryRun,
        backupFirst: backupFirst,
    }
}

func (m *ProductionMigrator) BackupDatabase(ctx context.Context) error {
    if !m.backupFirst {
        log.Println("跳过数据库备份(配置关闭)")
        return nil
    }

    log.Println("开始数据库备份...")
    // 实际实现应该调用数据库的备份工具,如pg_dump
    // 这里只是示例
    backupFile := fmt.Sprintf("backup_%s.sql", time.Now().Format("20060102_150405"))
    log.Printf("创建备份文件: %s", backupFile)

    // 模拟备份过程
    time.Sleep(2 * time.Second)
    log.Println("数据库备份完成")
    return nil
}

func (m *ProductionMigrator) ValidateMigrations(ctx context.Context, migrations []string) error {
    log.Println("验证迁移脚本...")
    for _, migration := range migrations {
        log.Printf("验证: %s", migration)
        // 实际实现应该检查SQL语法、外键约束等
        time.Sleep(100 * time.Millisecond)
    }
    log.Println("迁移验证完成")
    return nil
}

func (m *ProductionMigrator) ExecuteMigration(ctx context.Context, sql string) error {
    if m.dryRun {
        log.Printf("干跑模式 - 将执行: %s", sql)
        return nil
    }

    log.Printf("执行: %s", sql)
    _, err := m.db.ExecContext(ctx, sql)
    return err
}

func (m *ProductionMigrator) RunProductionMigration(ctx context.Context) error {
    log.Println("开始生产环境迁移流程")

    // 1. 备份数据库
    if err := m.BackupDatabase(ctx); err != nil {
        return fmt.Errorf("备份失败: %v", err)
    }

    // 2. 验证迁移
    migrations := []string{
        "ALTER TABLE users ADD COLUMN IF NOT EXISTS last_active TIMESTAMP",
        "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_last_active ON users(last_active)",
    }

    if err := m.ValidateMigrations(ctx, migrations); err != nil {
        return fmt.Errorf("验证失败: %v", err)
    }

    // 3. 在维护窗口执行迁移
    if m.isMaintenanceWindow() {
        log.Println("检测到维护窗口,开始执行迁移...")

        for _, migration := range migrations {
            if err := m.ExecuteMigration(ctx, migration); err != nil {
                return fmt.Errorf("迁移执行失败: %v", err)
            }
            time.Sleep(1 * time.Second) // 迁移间短暂暂停
        }

        log.Println("生产环境迁移完成")
    } else {
        log.Println("当前不在维护窗口,迁移未执行")
        return fmt.Errorf("不在维护窗口内")
    }

    return nil
}

func (m *ProductionMigrator) isMaintenanceWindow() bool {
    // 检查当前时间是否在维护窗口内(例如凌晨2点到4点)
    now := time.Now().UTC()
    start := time.Date(now.Year(), now.Month(), now.Day(), 2, 0, 0, 0, time.UTC)
    end := time.Date(now.Year(), now.Month(), now.Day(), 4, 0, 0, 0, time.UTC)

    return now.After(start) && now.Before(end)
}

func main() {
    db, err := sql.Open("postgres", os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    // 生产环境应该从环境变量获取这些配置
    migrator := NewProductionMigrator(db, false, true)

    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
    defer cancel()

    if err := migrator.RunProductionMigration(ctx); err != nil {
        log.Fatalf("生产环境迁移失败: %v", err)
    }
}

5. 高级迁移技巧

5.1 大表迁移的性能优化

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "time"

    _ "github.com/lib/pq"
)

type LargeTableMigrator struct {
    db *sql.DB
}

func (m *LargeTableMigrator) MigrateLargeTable(ctx context.Context) error {
    log.Println("开始大表迁移...")

    // 1. 创建新表结构
    log.Println("创建新表...")
    _, err := m.db.ExecContext(ctx, `
        CREATE TABLE users_new (
            id BIGSERIAL PRIMARY KEY,
            username VARCHAR(100) NOT NULL,
            email VARCHAR(255) NOT NULL,
            -- 新增字段
            last_login TIMESTAMP,
            login_count INTEGER DEFAULT 0,
            -- 索引
            CONSTRAINT users_new_email_key UNIQUE (email)
        )
    `)
    if err != nil {
        return fmt.Errorf("创建新表失败: %v", err)
    }

    // 2. 分批迁移数据
    log.Println("开始分批迁移数据...")
    batchSize := 10000
    lastID := 0

    for {
        var count int
        err := m.db.QueryRowContext(ctx, `
            SELECT COUNT(*) FROM users WHERE id > $1
        `, lastID).Scan(&count)
        if err != nil {
            return fmt.Errorf("查询剩余数据失败: %v", err)
        }

        if count == 0 {
            break
        }

        log.Printf("迁移批次, 最后ID: %d, 剩余记录: %d", lastID, count)

        // 使用事务处理每个批次
        tx, err := m.db.BeginTx(ctx, nil)
        if err != nil {
            return fmt.Errorf("开始事务失败: %v", err)
        }

        rows, err := tx.QueryContext(ctx, `
            SELECT id, username, email 
            FROM users 
            WHERE id > $1 
            ORDER BY id 
            LIMIT $2
        `, lastID, batchSize)
        if err != nil {
            tx.Rollback()
            return fmt.Errorf("查询数据失败: %v", err)
        }

        var currentBatch []struct {
            ID       int
            Username string
            Email    string
        }

        for rows.Next() {
            var item struct {
                ID       int
                Username string
                Email    string
            }
            if err := rows.Scan(&item.ID, &item.Username, &item.Email); err != nil {
                rows.Close()
                tx.Rollback()
                return fmt.Errorf("扫描数据失败: %v", err)
            }
            currentBatch = append(currentBatch, item)
            lastID = item.ID
        }
        rows.Close()

        // 插入新表
        for _, item := range currentBatch {
            _, err := tx.ExecContext(ctx, `
                INSERT INTO users_new (id, username, email, login_count)
                VALUES ($1, $2, $3, 0)
            `, item.ID, item.Username, item.Email)
            if err != nil {
                tx.Rollback()
                return fmt.Errorf("插入数据失败: %v", err)
            }
        }

        if err := tx.Commit(); err != nil {
            return fmt.Errorf("提交事务失败: %v", err)
        }

        // 避免对生产数据库造成太大压力
        time.Sleep(100 * time.Millisecond)
    }

    // 3. 切换表(在事务中执行)
    log.Println("切换表...")
    tx, err := m.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("开始切换事务失败: %v", err)
    }

    // 重命名旧表
    _, err = tx.ExecContext(ctx, `ALTER TABLE users RENAME TO users_old`)
    if err != nil {
        tx.Rollback()
        return fmt.Errorf("重命名旧表失败: %v", err)
    }

    // 重命名新表
    _, err = tx.ExecContext(ctx, `ALTER TABLE users_new RENAME TO users`)
    if err != nil {
        tx.Rollback()
        return fmt.Errorf("重命名新表失败: %v", err)
    }

    if err := tx.Commit(); err != nil {
        return fmt.Errorf("提交切换事务失败: %v", err)
    }

    log.Println("大表迁移完成")
    return nil
}

func main() {
    db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    migrator := &LargeTableMigrator{db: db}
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Hour)
    defer cancel()

    if err := migrator.MigrateLargeTable(ctx); err != nil {
        log.Fatalf("迁移失败: %v", err)
    }
}

5.2 零停机迁移实现

package main

import (
    "context"
    "database/sql"
    "log"
    "sync"
    "time"

    _ "github.com/lib/pq"
)

type ZeroDowntimeMigrator struct {
    db        *sql.DB
    oldSchema string
    newSchema string
}

func (m *ZeroDowntimeMigrator) CreateNewSchema(ctx context.Context) error {
    log.Println("创建新schema...")
    _, err := m.db.ExecContext(ctx, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", m.newSchema))
    if err != nil {
        return err
    }

    // 在新schema中创建表结构
    _, err = m.db.ExecContext(ctx, fmt.Sprintf(`
        CREATE TABLE %s.users (
            id SERIAL PRIMARY KEY,
            username VARCHAR(100) NOT NULL,
            email VARCHAR(255) NOT NULL UNIQUE,
            profile_data JSONB,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    `, m.newSchema))
    return err
}

func (m *ZeroDowntimeMigrator) StartDoubleWrite(ctx context.Context) error {
    log.Println("开始双写模式...")

    // 这里应该是应用程序层面的双写逻辑
    // 示例:修改应用代码,同时写入新旧两个schema
    return nil
}

func (m *ZeroDowntimeMigrator) BackfillData(ctx context.Context) error {
    log.Println("回填数据到新schema...")

    // 分批从旧表复制数据到新表
    batchSize := 5000
    offset := 0

    for {
        log.Printf("处理批次,偏移量: %d", offset)

        _, err := m.db.ExecContext(ctx, fmt.Sprintf(`
            INSERT INTO %s.users (id, username, email, created_at)
            SELECT id, username, email, created_at
            FROM %s.users
            ORDER BY id
            LIMIT $1 OFFSET $2
            ON CONFLICT (id) DO NOTHING
        `, m.newSchema, m.oldSchema), batchSize, offset)

        if err != nil {
            return err
        }

        // 检查是否还有更多数据
        var remaining int
        err = m.db.QueryRowContext(ctx, fmt.Sprintf(`
            SELECT COUNT(*) FROM %s.users 
            WHERE id > (SELECT COALESCE(MAX(id), 0) FROM %s.users)
        `, m.oldSchema, m.newSchema)).Scan(&remaining)

        if err != nil {
            return err
        }

        if remaining == 0 {
            break
        }

        offset += batchSize
        time.Sleep(1 * time.Second) // 避免对生产系统造成太大压力
    }

    return nil
}

func (m *ZeroDowntimeMigrator) SwitchTraffic(ctx context.Context) error {
    log.Println("切换流量到新schema...")

    // 1. 停止应用(在负载均衡器层面)
    // 2. 确保所有进行中的请求完成
    // 3. 切换数据库连接指向新schema
    // 4. 重启应用

    log.Println("流量切换完成")
    return nil
}

func (m *ZeroDowntimeMigrator) CleanupOldSchema(ctx context.Context) error {
    log.Println("清理旧schema...")

    // 等待一段时间确认新schema工作正常
    time.Sleep(5 * time.Minute)

    // 删除旧schema
    _, err := m.db.ExecContext(ctx, fmt.Sprintf("DROP SCHEMA %s CASCADE", m.oldSchema))
    return err
}

func (m *ZeroDowntimeMigrator) Execute() error {
    ctx := context.Background()

    var wg sync.WaitGroup
    errors := make(chan error, 5)

    steps := []func(context.Context) error{
        m.CreateNewSchema,
        m.StartDoubleWrite,
        m.BackfillData,
        m.SwitchTraffic,
        m.CleanupOldSchema,
    }

    for _, step := range steps {
        wg.Add(1)
        go func(step func(context.Context) error) {
            defer wg.Done()
            if err := step(ctx); err != nil {
                errors <- err
            }
        }(step)

        // 步骤间短暂暂停
        time.Sleep(1 * time.Second)
    }

    wg.Wait()
    close(errors)

    if len(errors) > 0 {
        return <-errors
    }

    return nil
}

func main() {
    db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    migrator := &ZeroDowntimeMigrator{
        db:        db,
        oldSchema: "public",
        newSchema: "public_v2",
    }

    if err := migrator.Execute(); err != nil {
        log.Fatalf("零停机迁移失败: %v", err)
    }

    log.Println("零停机迁移完成")
}

实战练习

练习1:用户系统的迁移脚本设计

设计一个用户系统的迁移方案,包括: 1. 初始用户表创建 2. 添加用户资料字段 3. 添加登录历史表 4. 添加索引优化查询性能

-- 000001_init_users.up.sql
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_users_created_at ON users(created_at);

-- 000002_add_user_profile.up.sql
ALTER TABLE users 
ADD COLUMN full_name VARCHAR(100),
ADD COLUMN avatar_url VARCHAR(255),
ADD COLUMN date_of_birth DATE;

CREATE INDEX idx_users_dob ON users(date_of_birth);

-- 000003_create_login_history.up.sql
CREATE TABLE user_login_history (
    id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    login_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    ip_address VARCHAR(45),
    user_agent TEXT
);

CREATE INDEX idx_login_history_user_id ON user_login_history(user_id);
CREATE INDEX idx_login_history_login_at ON user_login_history(login_at);

-- 000004_optimize_indexes.up.sql
-- 添加复合索引用于常用查询
CREATE INDEX idx_users_email_username ON users(email, username);
CREATE INDEX idx_login_history_user_login ON user_login_history(user_id, login_at DESC);

-- 添加分区表(如果支持)
-- CREATE TABLE user_login_history_partitioned (...) PARTITION BY RANGE (login_at);

练习2:复杂表结构变更的迁移策略

处理包含外键约束和大量数据的复杂表变更:

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "time"

    _ "github.com/lib/pq"
)

func main() {
    db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    ctx := context.Background()

    // 复杂迁移:将订单系统从单表拆分为多表
    log.Println("开始复杂表结构迁移...")

    // 步骤1: 创建新表结构
    log.Println("创建新表...")
    _, err = db.ExecContext(ctx, `
        CREATE TABLE orders_new (
            order_id BIGSERIAL PRIMARY KEY,
            customer_id INTEGER NOT NULL,
            order_date TIMESTAMP NOT NULL,
            total_amount DECIMAL(10,2) NOT NULL,
            status VARCHAR(20) NOT NULL
        );

        CREATE TABLE order_items_new (
            item_id BIGSERIAL PRIMARY KEY,
            order_id BIGINT NOT NULL REFERENCES orders_new(order_id),
            product_id INTEGER NOT NULL,
            quantity INTEGER NOT NULL,
            price DECIMAL(10,2) NOT NULL
        );

        CREATE INDEX idx_orders_customer ON orders_new(customer_id);
        CREATE INDEX idx_order_items_order ON order_items_new(order_id);
    `)
    if err != nil {
        log.Fatal("创建新表失败:", err)
    }

    // 步骤2: 禁用外键约束(如果存在)

}

详细内容待补充...