7.7 数据库迁移与版本管理¶
学习目标¶
- 理解数据库schema版本管理的重要性
- 掌握数据库迁移脚本的编写技巧
- 熟练使用迁移工具进行版本控制
- 建立完整的数据库变更管理流程
核心内容¶
1. 数据库迁移概念¶
1.1 迁移的必要性与优势¶
数据库迁移是管理数据库结构变更的系统化方法,主要优势包括: - 版本控制:跟踪数据库结构的历史变更 - 一致性:确保所有环境(开发、测试、生产)的数据库结构一致 - 可重复性:迁移脚本可以多次执行,结果一致 - 团队协作:多个开发者可以并行工作而不会冲突
1.2 迁移策略与最佳实践¶
- 每次迁移应该是幂等的(可重复执行)
- 每个迁移脚本应该包含前进和回退两个方向
- 迁移脚本应该小而专注,每次只做一个明确的变更
- 在生产环境执行前,必须在测试环境充分验证
1.3 版本控制与回滚机制¶
- 使用递增的版本号或时间戳来标识迁移
- 维护迁移历史表来记录已执行的迁移
- 回滚机制允许撤销失败的迁移,保证系统稳定性
2. 迁移工具选择¶
2.1 golang-migrate工具详解¶
golang-migrate是一个流行的数据库迁移工具,支持多种数据库。
安装:
基本用法:
# 创建迁移文件
migrate create -ext sql -dir db/migrations -seq create_users_table
# 执行迁移
migrate -path db/migrations -database "postgres://localhost:5432/dbname?sslmode=disable" up
# 回滚迁移
migrate -path db/migrations -database "postgres://localhost:5432/dbname?sslmode=disable" down
2.2 GORM AutoMigrate功能¶
GORM提供了AutoMigrate功能,可以自动根据模型结构创建或更新表。
package main
import (
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
type User struct {
ID uint `gorm:"primaryKey"`
Name string `gorm:"size:100"`
Email string `gorm:"uniqueIndex;size:255"`
Age int
CreatedAt time.Time
UpdatedAt time.Time
}
func main() {
dsn := "host=localhost user=gorm password=gorm dbname=gorm port=5432 sslmode=disable"
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{
Logger: logger.Default.LogMode(logger.Info),
})
if err != nil {
panic("failed to connect database")
}
// 自动迁移
err = db.AutoMigrate(&User{})
if err != nil {
panic("failed to migrate database")
}
fmt.Println("Migration completed successfully")
}
2.3 自定义迁移解决方案¶
对于特定需求,可以开发自定义迁移框架:
package main
import (
"database/sql"
"fmt"
"log"
"os"
_ "github.com/lib/pq"
)
type Migration struct {
ID int
Name string
UpSQL string
DownSQL string
AppliedAt sql.NullTime
}
type Migrator struct {
db *sql.DB
}
func NewMigrator(db *sql.DB) *Migrator {
return &Migrator{db: db}
}
func (m *Migrator) Setup() error {
createTableSQL := `
CREATE TABLE IF NOT EXISTS migrations (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL UNIQUE,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`
_, err := m.db.Exec(createTableSQL)
return err
}
func (m *Migrator) RunMigrations(migrations []Migration) error {
for _, migration := range migrations {
var count int
err := m.db.QueryRow("SELECT COUNT(*) FROM migrations WHERE name = $1", migration.Name).Scan(&count)
if err != nil {
return err
}
if count == 0 {
log.Printf("Applying migration: %s", migration.Name)
_, err := m.db.Exec(migration.UpSQL)
if err != nil {
return fmt.Errorf("failed to apply migration %s: %v", migration.Name, err)
}
_, err = m.db.Exec("INSERT INTO migrations (name) VALUES ($1)", migration.Name)
if err != nil {
return fmt.Errorf("failed to record migration %s: %v", migration.Name, err)
}
}
}
return nil
}
func main() {
db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
migrator := NewMigrator(db)
if err := migrator.Setup(); err != nil {
log.Fatal(err)
}
migrations := []Migration{
{
Name: "create_users_table",
UpSQL: `
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`,
DownSQL: "DROP TABLE IF EXISTS users",
},
{
Name: "add_age_to_users",
UpSQL: "ALTER TABLE users ADD COLUMN age INTEGER",
DownSQL: "ALTER TABLE users DROP COLUMN IF EXISTS age",
},
}
if err := migrator.RunMigrations(migrations); err != nil {
log.Fatal(err)
}
log.Println("All migrations applied successfully")
}
3. 迁移脚本编写¶
3.1 SQL迁移脚本规范¶
SQL迁移脚本应该遵循以下规范:
-- migrations/000001_create_users_table.up.sql
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_username ON users(username);
-- migrations/000001_create_users_table.down.sql
DROP TABLE IF EXISTS users;
-- migrations/000002_add_profile_to_users.up.sql
ALTER TABLE users
ADD COLUMN profile_json JSONB,
ADD COLUMN last_login TIMESTAMP WITH TIME ZONE;
CREATE INDEX idx_users_last_login ON users(last_login);
-- migrations/000002_add_profile_to_users.down.sql
ALTER TABLE users
DROP COLUMN IF EXISTS profile_json,
DROP COLUMN IF EXISTS last_login;
3.2 Go代码迁移实现¶
对于复杂的迁移逻辑,可以使用Go代码实现:
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"time"
_ "github.com/lib/pq"
)
type UserProfile struct {
Bio string `json:"bio"`
Location string `json:"location"`
Website string `json:"website"`
}
func main() {
db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
ctx := context.Background()
// 迁移1:添加profile列
log.Println("Adding profile column...")
_, err = db.ExecContext(ctx, `
ALTER TABLE users
ADD COLUMN IF NOT EXISTS profile_json JSONB
`)
if err != nil {
log.Fatal("Failed to add profile column:", err)
}
// 迁移2:为现有用户创建默认profile
log.Println("Creating default profiles for existing users...")
rows, err := db.QueryContext(ctx, "SELECT id FROM users WHERE profile_json IS NULL")
if err != nil {
log.Fatal("Failed to query users:", err)
}
defer rows.Close()
var userIDs []int64
for rows.Next() {
var id int64
if err := rows.Scan(&id); err != nil {
log.Fatal("Failed to scan user ID:", err)
}
userIDs = append(userIDs, id)
}
defaultProfile := UserProfile{
Bio: "Welcome to my profile!",
Location: "Unknown",
Website: "",
}
profileJSON, err := json.Marshal(defaultProfile)
if err != nil {
log.Fatal("Failed to marshal profile:", err)
}
for _, id := range userIDs {
_, err = db.ExecContext(ctx, `
UPDATE users SET profile_json = $1 WHERE id = $2
`, profileJSON, id)
if err != nil {
log.Fatal("Failed to update user profile:", err)
}
}
log.Println("Migration completed successfully")
}
3.3 数据迁移与结构迁移¶
数据迁移需要特别注意数据一致性和性能:
package main
import (
"context"
"database/sql"
"log"
"time"
_ "github.com/lib/pqs"
)
func main() {
db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
ctx := context.Background()
// 开始事务
tx, err := db.BeginTx(ctx, nil)
if err != nil {
log.Fatal(err)
}
defer tx.Rollback()
// 创建新表
log.Println("Creating new user_profiles table...")
_, err = tx.ExecContext(ctx, `
CREATE TABLE user_profiles (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
bio TEXT,
location VARCHAR(100),
website VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`)
if err != nil {
log.Fatal("Failed to create user_profiles table:", err)
}
// 迁移数据(分批处理,避免锁表时间过长)
log.Println("Migrating profile data...")
offset := 0
batchSize := 1000
for {
rows, err := tx.QueryContext(ctx, `
SELECT id, profile_json
FROM users
WHERE profile_json IS NOT NULL
ORDER BY id
LIMIT $1 OFFSET $2
`, batchSize, offset)
if err != nil {
log.Fatal("Failed to query users:", err)
}
var count int
for rows.Next() {
count++
var userID int
var profileJSON []byte
if err := rows.Scan(&userID, &profileJSON); err != nil {
rows.Close()
log.Fatal("Failed to scan user:", err)
}
// 解析JSON并插入新表
_, err := tx.ExecContext(ctx, `
INSERT INTO user_profiles (user_id, bio, location, website)
VALUES ($1, $2, $3, $4)
`, userID, extractBio(profileJSON), extractLocation(profileJSON), extractWebsite(profileJSON))
if err != nil {
rows.Close()
log.Fatal("Failed to insert profile:", err)
}
}
rows.Close()
if count < batchSize {
break // 已处理所有数据
}
offset += batchSize
// 小批量提交,减少事务大小
if err := tx.Commit(); err != nil {
log.Fatal("Failed to commit batch:", err)
}
// 开始新的事务
tx, err = db.BeginTx(ctx, nil)
if err != nil {
log.Fatal(err)
}
time.Sleep(100 * time.Millisecond) // 短暂暂停,减少数据库负载
}
// 删除旧列(在确认数据迁移成功后)
log.Println("Dropping old profile column...")
_, err = tx.ExecContext(ctx, `ALTER TABLE users DROP COLUMN profile_json`)
if err != nil {
log.Fatal("Failed to drop profile column:", err)
}
// 提交最终事务
if err := tx.Commit(); err != nil {
log.Fatal("Failed to commit migration:", err)
}
log.Println("Data migration completed successfully")
}
// 辅助函数用于从JSON中提取字段
func extractBio(profileJSON []byte) string {
// 实际实现应该解析JSON
return "Default bio"
}
func extractLocation(profileJSON []byte) string {
return "Unknown"
}
func extractWebsite(profileJSON []byte) string {
return ""
}
4. 版本管理策略¶
4.1 迁移版本号管理¶
使用时间戳或顺序号作为版本标识:
package main
import (
"fmt"
"sort"
"strconv"
"strings"
"time"
)
type MigrationFile struct {
Version int64
Name string
Path string
}
func SortMigrationFiles(files []MigrationFile) []MigrationFile {
sort.Slice(files, func(i, j int) bool {
return files[i].Version < files[j].Version
})
return files
}
func ParseVersionFromFilename(filename string) (int64, error) {
// 假设文件名格式: 000001_name.up.sql 或 20230101120000_name.up.sql
parts := strings.SplitN(filename, "_", 2)
if len(parts) < 1 {
return 0, fmt.Errorf("invalid filename format")
}
// 尝试解析为数字版本号
versionStr := parts[0]
version, err := strconv.ParseInt(versionStr, 10, 64)
if err != nil {
// 尝试解析为时间戳
if len(versionStr) == 14 { // YYYYMMDDHHMMSS
t, err := time.Parse("20060102150405", versionStr)
if err != nil {
return 0, fmt.Errorf("invalid version format")
}
return t.Unix(), nil
}
return 0, fmt.Errorf("invalid version format")
}
return version, nil
}
func main() {
files := []MigrationFile{
{Name: "000002_add_email.sql"},
{Name: "000001_create_users.sql"},
{Name: "20230101120000_initial_schema.sql"},
}
for i := range files {
version, err := ParseVersionFromFilename(files[i].Name)
if err != nil {
fmt.Printf("Error parsing %s: %v\n", files[i].Name, err)
continue
}
files[i].Version = version
}
sorted := SortMigrationFiles(files)
for _, file := range sorted {
fmt.Printf("%d: %s\n", file.Version, file.Name)
}
}
4.2 分支合并时的冲突处理¶
建立团队迁移协作规范:
package main
import (
"bufio"
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
)
type MigrationConflictResolver struct {
migrationsDir string
}
func (r *MigrationConflictResolver) CheckForConflicts() error {
files, err := filepath.Glob(filepath.Join(r.migrationsDir, "*.sql"))
if err != nil {
return err
}
versions := make(map[int64][]string)
for _, file := range files {
filename := filepath.Base(file)
version, err := strconv.ParseInt(strings.Split(filename, "_")[0], 10, 64)
if err != nil {
continue
}
versions[version] = append(versions[version], filename)
}
// 检查重复版本号
for version, files := range versions {
if len(files) > 1 {
fmt.Printf("冲突发现! 版本号 %d 被多个文件使用:\n", version)
for _, file := range files {
fmt.Printf(" - %s\n", file)
}
fmt.Println("请重命名文件来解决冲突")
return fmt.Errorf("migration version conflict")
}
}
return nil
}
func (r *MigrationConflictResolver) ResolveConflict() error {
// 获取当前最高版本号
files, err := filepath.Glob(filepath.Join(r.migrationsDir, "*.sql"))
if err != nil {
return err
}
var maxVersion int64
for _, file := range files {
filename := filepath.Base(file)
version, err := strconv.ParseInt(strings.Split(filename, "_")[0], 10, 64)
if err == nil && version > maxVersion {
maxVersion = version
}
}
// 为新迁移生成下一个版本号
newVersion := maxVersion + 1
fmt.Printf("建议使用版本号: %06d\n", newVersion)
reader := bufio.NewReader(os.Stdin)
fmt.Print("请输入新迁移文件的名称(不含版本号): ")
name, _ := reader.ReadString('\n')
name = strings.TrimSpace(name)
upFilename := fmt.Sprintf("%06d_%s.up.sql", newVersion, name)
downFilename := fmt.Sprintf("%06d_%s.down.sql", newVersion, name)
// 创建迁移文件模板
upContent := "-- Migration UP content here\n"
downContent := "-- Migration DOWN content here\n"
if err := os.WriteFile(filepath.Join(r.migrationsDir, upFilename), []byte(upContent), 0644); err != nil {
return err
}
if err := os.WriteFile(filepath.Join(r.migrationsDir, downFilename), []byte(downContent), 0644); err != nil {
return err
}
fmt.Printf("创建迁移文件: %s 和 %s\n", upFilename, downFilename)
return nil
}
func main() {
resolver := &MigrationConflictResolver{migrationsDir: "./migrations"}
if err := resolver.CheckForConflicts(); err != nil {
fmt.Println("发现迁移冲突,需要解决...")
if err := resolver.ResolveConflict(); err != nil {
fmt.Printf("解决冲突时出错: %v\n", err)
}
} else {
fmt.Println("没有发现迁移冲突")
}
}
4.3 生产环境迁移策略¶
生产环境迁移需要特别谨慎:
package main
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"time"
_ "github.com/lib/pq"
)
type ProductionMigrator struct {
db *sql.DB
dryRun bool
backupFirst bool
}
func NewProductionMigrator(db *sql.DB, dryRun, backupFirst bool) *ProductionMigrator {
return &ProductionMigrator{
db: db,
dryRun: dryRun,
backupFirst: backupFirst,
}
}
func (m *ProductionMigrator) BackupDatabase(ctx context.Context) error {
if !m.backupFirst {
log.Println("跳过数据库备份(配置关闭)")
return nil
}
log.Println("开始数据库备份...")
// 实际实现应该调用数据库的备份工具,如pg_dump
// 这里只是示例
backupFile := fmt.Sprintf("backup_%s.sql", time.Now().Format("20060102_150405"))
log.Printf("创建备份文件: %s", backupFile)
// 模拟备份过程
time.Sleep(2 * time.Second)
log.Println("数据库备份完成")
return nil
}
func (m *ProductionMigrator) ValidateMigrations(ctx context.Context, migrations []string) error {
log.Println("验证迁移脚本...")
for _, migration := range migrations {
log.Printf("验证: %s", migration)
// 实际实现应该检查SQL语法、外键约束等
time.Sleep(100 * time.Millisecond)
}
log.Println("迁移验证完成")
return nil
}
func (m *ProductionMigrator) ExecuteMigration(ctx context.Context, sql string) error {
if m.dryRun {
log.Printf("干跑模式 - 将执行: %s", sql)
return nil
}
log.Printf("执行: %s", sql)
_, err := m.db.ExecContext(ctx, sql)
return err
}
func (m *ProductionMigrator) RunProductionMigration(ctx context.Context) error {
log.Println("开始生产环境迁移流程")
// 1. 备份数据库
if err := m.BackupDatabase(ctx); err != nil {
return fmt.Errorf("备份失败: %v", err)
}
// 2. 验证迁移
migrations := []string{
"ALTER TABLE users ADD COLUMN IF NOT EXISTS last_active TIMESTAMP",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_last_active ON users(last_active)",
}
if err := m.ValidateMigrations(ctx, migrations); err != nil {
return fmt.Errorf("验证失败: %v", err)
}
// 3. 在维护窗口执行迁移
if m.isMaintenanceWindow() {
log.Println("检测到维护窗口,开始执行迁移...")
for _, migration := range migrations {
if err := m.ExecuteMigration(ctx, migration); err != nil {
return fmt.Errorf("迁移执行失败: %v", err)
}
time.Sleep(1 * time.Second) // 迁移间短暂暂停
}
log.Println("生产环境迁移完成")
} else {
log.Println("当前不在维护窗口,迁移未执行")
return fmt.Errorf("不在维护窗口内")
}
return nil
}
func (m *ProductionMigrator) isMaintenanceWindow() bool {
// 检查当前时间是否在维护窗口内(例如凌晨2点到4点)
now := time.Now().UTC()
start := time.Date(now.Year(), now.Month(), now.Day(), 2, 0, 0, 0, time.UTC)
end := time.Date(now.Year(), now.Month(), now.Day(), 4, 0, 0, 0, time.UTC)
return now.After(start) && now.Before(end)
}
func main() {
db, err := sql.Open("postgres", os.Getenv("DATABASE_URL"))
if err != nil {
log.Fatal(err)
}
defer db.Close()
// 生产环境应该从环境变量获取这些配置
migrator := NewProductionMigrator(db, false, true)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
if err := migrator.RunProductionMigration(ctx); err != nil {
log.Fatalf("生产环境迁移失败: %v", err)
}
}
5. 高级迁移技巧¶
5.1 大表迁移的性能优化¶
package main
import (
"context"
"database/sql"
"fmt"
"log"
"time"
_ "github.com/lib/pq"
)
type LargeTableMigrator struct {
db *sql.DB
}
func (m *LargeTableMigrator) MigrateLargeTable(ctx context.Context) error {
log.Println("开始大表迁移...")
// 1. 创建新表结构
log.Println("创建新表...")
_, err := m.db.ExecContext(ctx, `
CREATE TABLE users_new (
id BIGSERIAL PRIMARY KEY,
username VARCHAR(100) NOT NULL,
email VARCHAR(255) NOT NULL,
-- 新增字段
last_login TIMESTAMP,
login_count INTEGER DEFAULT 0,
-- 索引
CONSTRAINT users_new_email_key UNIQUE (email)
)
`)
if err != nil {
return fmt.Errorf("创建新表失败: %v", err)
}
// 2. 分批迁移数据
log.Println("开始分批迁移数据...")
batchSize := 10000
lastID := 0
for {
var count int
err := m.db.QueryRowContext(ctx, `
SELECT COUNT(*) FROM users WHERE id > $1
`, lastID).Scan(&count)
if err != nil {
return fmt.Errorf("查询剩余数据失败: %v", err)
}
if count == 0 {
break
}
log.Printf("迁移批次, 最后ID: %d, 剩余记录: %d", lastID, count)
// 使用事务处理每个批次
tx, err := m.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("开始事务失败: %v", err)
}
rows, err := tx.QueryContext(ctx, `
SELECT id, username, email
FROM users
WHERE id > $1
ORDER BY id
LIMIT $2
`, lastID, batchSize)
if err != nil {
tx.Rollback()
return fmt.Errorf("查询数据失败: %v", err)
}
var currentBatch []struct {
ID int
Username string
Email string
}
for rows.Next() {
var item struct {
ID int
Username string
Email string
}
if err := rows.Scan(&item.ID, &item.Username, &item.Email); err != nil {
rows.Close()
tx.Rollback()
return fmt.Errorf("扫描数据失败: %v", err)
}
currentBatch = append(currentBatch, item)
lastID = item.ID
}
rows.Close()
// 插入新表
for _, item := range currentBatch {
_, err := tx.ExecContext(ctx, `
INSERT INTO users_new (id, username, email, login_count)
VALUES ($1, $2, $3, 0)
`, item.ID, item.Username, item.Email)
if err != nil {
tx.Rollback()
return fmt.Errorf("插入数据失败: %v", err)
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("提交事务失败: %v", err)
}
// 避免对生产数据库造成太大压力
time.Sleep(100 * time.Millisecond)
}
// 3. 切换表(在事务中执行)
log.Println("切换表...")
tx, err := m.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("开始切换事务失败: %v", err)
}
// 重命名旧表
_, err = tx.ExecContext(ctx, `ALTER TABLE users RENAME TO users_old`)
if err != nil {
tx.Rollback()
return fmt.Errorf("重命名旧表失败: %v", err)
}
// 重命名新表
_, err = tx.ExecContext(ctx, `ALTER TABLE users_new RENAME TO users`)
if err != nil {
tx.Rollback()
return fmt.Errorf("重命名新表失败: %v", err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("提交切换事务失败: %v", err)
}
log.Println("大表迁移完成")
return nil
}
func main() {
db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
migrator := &LargeTableMigrator{db: db}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Hour)
defer cancel()
if err := migrator.MigrateLargeTable(ctx); err != nil {
log.Fatalf("迁移失败: %v", err)
}
}
5.2 零停机迁移实现¶
package main
import (
"context"
"database/sql"
"log"
"sync"
"time"
_ "github.com/lib/pq"
)
type ZeroDowntimeMigrator struct {
db *sql.DB
oldSchema string
newSchema string
}
func (m *ZeroDowntimeMigrator) CreateNewSchema(ctx context.Context) error {
log.Println("创建新schema...")
_, err := m.db.ExecContext(ctx, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", m.newSchema))
if err != nil {
return err
}
// 在新schema中创建表结构
_, err = m.db.ExecContext(ctx, fmt.Sprintf(`
CREATE TABLE %s.users (
id SERIAL PRIMARY KEY,
username VARCHAR(100) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE,
profile_data JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`, m.newSchema))
return err
}
func (m *ZeroDowntimeMigrator) StartDoubleWrite(ctx context.Context) error {
log.Println("开始双写模式...")
// 这里应该是应用程序层面的双写逻辑
// 示例:修改应用代码,同时写入新旧两个schema
return nil
}
func (m *ZeroDowntimeMigrator) BackfillData(ctx context.Context) error {
log.Println("回填数据到新schema...")
// 分批从旧表复制数据到新表
batchSize := 5000
offset := 0
for {
log.Printf("处理批次,偏移量: %d", offset)
_, err := m.db.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO %s.users (id, username, email, created_at)
SELECT id, username, email, created_at
FROM %s.users
ORDER BY id
LIMIT $1 OFFSET $2
ON CONFLICT (id) DO NOTHING
`, m.newSchema, m.oldSchema), batchSize, offset)
if err != nil {
return err
}
// 检查是否还有更多数据
var remaining int
err = m.db.QueryRowContext(ctx, fmt.Sprintf(`
SELECT COUNT(*) FROM %s.users
WHERE id > (SELECT COALESCE(MAX(id), 0) FROM %s.users)
`, m.oldSchema, m.newSchema)).Scan(&remaining)
if err != nil {
return err
}
if remaining == 0 {
break
}
offset += batchSize
time.Sleep(1 * time.Second) // 避免对生产系统造成太大压力
}
return nil
}
func (m *ZeroDowntimeMigrator) SwitchTraffic(ctx context.Context) error {
log.Println("切换流量到新schema...")
// 1. 停止应用(在负载均衡器层面)
// 2. 确保所有进行中的请求完成
// 3. 切换数据库连接指向新schema
// 4. 重启应用
log.Println("流量切换完成")
return nil
}
func (m *ZeroDowntimeMigrator) CleanupOldSchema(ctx context.Context) error {
log.Println("清理旧schema...")
// 等待一段时间确认新schema工作正常
time.Sleep(5 * time.Minute)
// 删除旧schema
_, err := m.db.ExecContext(ctx, fmt.Sprintf("DROP SCHEMA %s CASCADE", m.oldSchema))
return err
}
func (m *ZeroDowntimeMigrator) Execute() error {
ctx := context.Background()
var wg sync.WaitGroup
errors := make(chan error, 5)
steps := []func(context.Context) error{
m.CreateNewSchema,
m.StartDoubleWrite,
m.BackfillData,
m.SwitchTraffic,
m.CleanupOldSchema,
}
for _, step := range steps {
wg.Add(1)
go func(step func(context.Context) error) {
defer wg.Done()
if err := step(ctx); err != nil {
errors <- err
}
}(step)
// 步骤间短暂暂停
time.Sleep(1 * time.Second)
}
wg.Wait()
close(errors)
if len(errors) > 0 {
return <-errors
}
return nil
}
func main() {
db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
migrator := &ZeroDowntimeMigrator{
db: db,
oldSchema: "public",
newSchema: "public_v2",
}
if err := migrator.Execute(); err != nil {
log.Fatalf("零停机迁移失败: %v", err)
}
log.Println("零停机迁移完成")
}
实战练习¶
练习1:用户系统的迁移脚本设计¶
设计一个用户系统的迁移方案,包括: 1. 初始用户表创建 2. 添加用户资料字段 3. 添加登录历史表 4. 添加索引优化查询性能
-- 000001_init_users.up.sql
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_users_created_at ON users(created_at);
-- 000002_add_user_profile.up.sql
ALTER TABLE users
ADD COLUMN full_name VARCHAR(100),
ADD COLUMN avatar_url VARCHAR(255),
ADD COLUMN date_of_birth DATE;
CREATE INDEX idx_users_dob ON users(date_of_birth);
-- 000003_create_login_history.up.sql
CREATE TABLE user_login_history (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
login_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
ip_address VARCHAR(45),
user_agent TEXT
);
CREATE INDEX idx_login_history_user_id ON user_login_history(user_id);
CREATE INDEX idx_login_history_login_at ON user_login_history(login_at);
-- 000004_optimize_indexes.up.sql
-- 添加复合索引用于常用查询
CREATE INDEX idx_users_email_username ON users(email, username);
CREATE INDEX idx_login_history_user_login ON user_login_history(user_id, login_at DESC);
-- 添加分区表(如果支持)
-- CREATE TABLE user_login_history_partitioned (...) PARTITION BY RANGE (login_at);
练习2:复杂表结构变更的迁移策略¶
处理包含外键约束和大量数据的复杂表变更:
package main
import (
"context"
"database/sql"
"fmt"
"log"
"time"
_ "github.com/lib/pq"
)
func main() {
db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
ctx := context.Background()
// 复杂迁移:将订单系统从单表拆分为多表
log.Println("开始复杂表结构迁移...")
// 步骤1: 创建新表结构
log.Println("创建新表...")
_, err = db.ExecContext(ctx, `
CREATE TABLE orders_new (
order_id BIGSERIAL PRIMARY KEY,
customer_id INTEGER NOT NULL,
order_date TIMESTAMP NOT NULL,
total_amount DECIMAL(10,2) NOT NULL,
status VARCHAR(20) NOT NULL
);
CREATE TABLE order_items_new (
item_id BIGSERIAL PRIMARY KEY,
order_id BIGINT NOT NULL REFERENCES orders_new(order_id),
product_id INTEGER NOT NULL,
quantity INTEGER NOT NULL,
price DECIMAL(10,2) NOT NULL
);
CREATE INDEX idx_orders_customer ON orders_new(customer_id);
CREATE INDEX idx_order_items_order ON order_items_new(order_id);
`)
if err != nil {
log.Fatal("创建新表失败:", err)
}
// 步骤2: 禁用外键约束(如果存在)
}
详细内容待补充...