跳转至

7.11 实战练习与项目应用

用户管理系统的数据模型设计

在设计用户管理系统时,我们需要考虑用户的基本属性、安全性和扩展性。以下是一个基本的用户数据模型设计:

package main

import (
    "database/sql"
    "time"
)

// User 用户数据模型
type User struct {
    ID        int64        `json:"id" db:"id"`                 // 用户ID
    Username  string       `json:"username" db:"username"`     // 用户名
    Email     string       `json:"email" db:"email"`           // 邮箱
    Password  string       `json:"-" db:"password"`            // 密码(不序列化到JSON)
    Status    int          `json:"status" db:"status"`         // 状态(0:禁用,1:启用)
    CreatedAt time.Time    `json:"created_at" db:"created_at"` // 创建时间
    UpdatedAt time.Time    `json:"updated_at" db:"updated_at"` // 更新时间
    DeletedAt sql.NullTime `json:"-" db:"deleted_at"`          // 删除时间(软删除)
}

// UserProfile 用户详细信息模型
type UserProfile struct {
    ID        int64     `json:"id" db:"id"`
    UserID    int64     `json:"user_id" db:"user_id"`
    RealName  string    `json:"real_name" db:"real_name"`
    Age       int       `json:"age" db:"age"`
    Gender    string    `json:"gender" db:"gender"`
    Address   string    `json:"address" db:"address"`
    UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
}

// UserRole 用户角色关联模型
type UserRole struct {
    ID     int64 `json:"id" db:"id"`
    UserID int64 `json:"user_id" db:"user_id"`
    RoleID int64 `json:"role_id" db:"role_id"`
}

高并发场景下的数据库优化

在高并发场景下,数据库优化至关重要。以下是一些优化策略和实现示例:

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "sync"
    "time"

    _ "github.com/go-sql-driver/mysql"
    "github.com/jmoiron/sqlx"
)

// DBConfig 数据库配置
type DBConfig struct {
    Host            string
    Port            int
    User            string
    Password        string
    Database        string
    MaxOpenConns    int           // 最大打开连接数
    MaxIdleConns    int           // 最大空闲连接数
    ConnMaxLifetime time.Duration // 连接最大生命周期
}

// NewDatabase 创建数据库连接池
func NewDatabase(cfg DBConfig) (*sqlx.DB, error) {
    dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
        cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Database)

    db, err := sqlx.Open("mysql", dsn)
    if err != nil {
        return nil, fmt.Errorf("failed to open database: %w", err)
    }

    // 设置连接池参数
    db.SetMaxOpenConns(cfg.MaxOpenConns)
    db.SetMaxIdleConns(cfg.MaxIdleConns)
    db.SetConnMaxLifetime(cfg.ConnMaxLifetime)

    // 测试连接
    if err := db.Ping(); err != nil {
        return nil, fmt.Errorf("failed to ping database: %w", err)
    }

    return db, nil
}

// UserRepository 用户数据仓库
type UserRepository struct {
    db *sqlx.DB
}

// NewUserRepository 创建用户仓库实例
func NewUserRepository(db *sqlx.DB) *UserRepository {
    return &UserRepository{db: db}
}

// GetUserByID 使用缓存和连接池获取用户信息
func (r *UserRepository) GetUserByID(ctx context.Context, id int64) (*User, error) {
    var user User
    query := "SELECT id, username, email, status, created_at, updated_at FROM users WHERE id = ? AND deleted_at IS NULL"

    // 使用上下文控制查询超时
    err := r.db.GetContext(ctx, &user, query, id)
    if err != nil {
        if err == sql.ErrNoRows {
            return nil, fmt.Errorf("user not found")
        }
        return nil, fmt.Errorf("failed to get user: %w", err)
    }

    return &user, nil
}

// BatchGetUsers 批量获取用户信息,减少数据库查询次数
func (r *UserRepository) BatchGetUsers(ctx context.Context, ids []int64) (map[int64]*User, error) {
    if len(ids) == 0 {
        return map[int64]*User{}, nil
    }

    query, args, err := sqlx.In(
        "SELECT id, username, email, status, created_at, updated_at FROM users WHERE id IN (?) AND deleted_at IS NULL",
        ids,
    )
    if err != nil {
        return nil, fmt.Errorf("failed to build query: %w", err)
    }

    query = r.db.Rebind(query)
    var users []*User
    err = r.db.SelectContext(ctx, &users, query, args...)
    if err != nil {
        return nil, fmt.Errorf("failed to get users: %w", err)
    }

    result := make(map[int64]*User)
    for _, user := range users {
        result[user.ID] = user
    }

    return result, nil
}

// ConcurrentUserProcessor 并发用户处理器
type ConcurrentUserProcessor struct {
    userRepo *UserRepository
    workers  int
}

// ProcessUsers 并发处理用户数据
func (p *ConcurrentUserProcessor) ProcessUsers(ctx context.Context, userIDs []int64) ([]*User, error) {
    var (
        wg     sync.WaitGroup
        mu     sync.Mutex
        result []*User
        errors []error
    )

    // 创建工作通道
    idCh := make(chan int64, len(userIDs))
    for _, id := range userIDs {
        idCh <- id
    }
    close(idCh)

    // 启动工作goroutine
    wg.Add(p.workers)
    for i := 0; i < p.workers; i++ {
        go func() {
            defer wg.Done()

            for id := range idCh {
                user, err := p.userRepo.GetUserByID(ctx, id)

                mu.Lock()
                if err != nil {
                    errors = append(errors, err)
                } else {
                    result = append(result, user)
                }
                mu.Unlock()
            }
        }()
    }

    wg.Wait()

    if len(errors) > 0 {
        return nil, fmt.Errorf("encountered %d errors: %v", len(errors), errors[0])
    }

    return result, nil
}

完整的数据持久化解决方案

下面是一个完整的用户管理系统数据持久层实现,包含事务处理、错误处理和性能优化:

package main

import (
    "context"
    "database/sql"
    "errors"
    "fmt"
    "log"
    "time"

    "github.com/google/uuid"
    "golang.org/x/crypto/bcrypt"
)

// UserService 用户服务
type UserService struct {
    repo *UserRepository
}

// NewUserService 创建用户服务
func NewUserService(repo *UserRepository) *UserService {
    return &UserService{repo: repo}
}

// CreateUser 创建用户
func (s *UserService) CreateUser(ctx context.Context, username, email, password string) (*User, error) {
    // 验证输入
    if username == "" || email == "" || password == "" {
        return nil, errors.New("username, email and password are required")
    }

    // 检查用户是否已存在
    exists, err := s.repo.ExistsByUsernameOrEmail(ctx, username, email)
    if err != nil {
        return nil, fmt.Errorf("failed to check user existence: %w", err)
    }
    if exists {
        return nil, errors.New("user already exists")
    }

    // 加密密码
    hashedPassword, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost)
    if err != nil {
        return nil, fmt.Errorf("failed to hash password: %w", err)
    }

    // 创建用户对象
    now := time.Now()
    user := &User{
        Username:  username,
        Email:     email,
        Password:  string(hashedPassword),
        Status:    1,
        CreatedAt: now,
        UpdatedAt: now,
    }

    // 保存用户
    err = s.repo.Create(ctx, user)
    if err != nil {
        return nil, fmt.Errorf("failed to create user: %w", err)
    }

    return user, nil
}

// UserRepository 用户数据仓库(完整实现)
type UserRepository struct {
    db *sqlx.DB
}

// ExistsByUsernameOrEmail 检查用户名或邮箱是否存在
func (r *UserRepository) ExistsByUsernameOrEmail(ctx context.Context, username, email string) (bool, error) {
    var exists bool
    query := `SELECT EXISTS(
        SELECT 1 FROM users 
        WHERE (username = ? OR email = ?) AND deleted_at IS NULL
    )`

    err := r.db.GetContext(ctx, &exists, query, username, email)
    if err != nil {
        return false, fmt.Errorf("failed to check user existence: %w", err)
    }

    return exists, nil
}

// Create 创建用户
func (r *UserRepository) Create(ctx context.Context, user *User) error {
    query := `INSERT INTO users 
        (username, email, password, status, created_at, updated_at) 
        VALUES (?, ?, ?, ?, ?, ?)`

    result, err := r.db.ExecContext(ctx, query,
        user.Username, user.Email, user.Password, user.Status,
        user.CreatedAt, user.UpdatedAt)
    if err != nil {
        return fmt.Errorf("failed to insert user: %w", err)
    }

    user.ID, err = result.LastInsertId()
    if err != nil {
        return fmt.Errorf("failed to get last insert ID: %w", err)
    }

    return nil
}

// Update 更新用户信息
func (r *UserRepository) Update(ctx context.Context, user *User) error {
    user.UpdatedAt = time.Now()

    query := `UPDATE users SET 
        username = ?, email = ?, status = ?, updated_at = ?
        WHERE id = ? AND deleted_at IS NULL`

    result, err := r.db.ExecContext(ctx, query,
        user.Username, user.Email, user.Status, user.UpdatedAt, user.ID)
    if err != nil {
        return fmt.Errorf("failed to update user: %w", err)
    }

    rowsAffected, err := result.RowsAffected()
    if err != nil {
        return fmt.Errorf("failed to get rows affected: %w", err)
    }

    if rowsAffected == 0 {
        return errors.New("user not found or already deleted")
    }

    return nil
}

// Delete 软删除用户
func (r *UserRepository) Delete(ctx context.Context, id int64) error {
    query := `UPDATE users SET deleted_at = ? WHERE id = ? AND deleted_at IS NULL`

    result, err := r.db.ExecContext(ctx, query, time.Now(), id)
    if err != nil {
        return fmt.Errorf("failed to delete user: %w", err)
    }

    rowsAffected, err := result.RowsAffected()
    if err != nil {
        return fmt.Errorf("failed to get rows affected: %w", err)
    }

    if rowsAffected == 0 {
        return errors.New("user not found or already deleted")
    }

    return nil
}

// Transaction 执行数据库事务
func (r *UserRepository) Transaction(ctx context.Context, fn func(tx *sqlx.Tx) error) error {
    tx, err := r.db.BeginTxx(ctx, nil)
    if err != nil {
        return fmt.Errorf("failed to begin transaction: %w", err)
    }

    defer func() {
        if p := recover(); p != nil {
            tx.Rollback()
            panic(p) // 重新抛出panic after rolling back
        }
    }()

    if err := fn(tx); err != nil {
        if rbErr := tx.Rollback(); rbErr != nil {
            return fmt.Errorf("transaction failed: %v, rollback also failed: %w", err, rbErr)
        }
        return err
    }

    if err := tx.Commit(); err != nil {
        return fmt.Errorf("failed to commit transaction: %w", err)
    }

    return nil
}

// CreateUserWithProfile 创建用户及其详细信息(事务示例)
func (r *UserRepository) CreateUserWithProfile(ctx context.Context, user *User, profile *UserProfile) error {
    return r.Transaction(ctx, func(tx *sqlx.Tx) error {
        // 插入用户
        userQuery := `INSERT INTO users 
            (username, email, password, status, created_at, updated_at) 
            VALUES (?, ?, ?, ?, ?, ?)`

        result, err := tx.ExecContext(ctx, userQuery,
            user.Username, user.Email, user.Password, user.Status,
            user.CreatedAt, user.UpdatedAt)
        if err != nil {
            return fmt.Errorf("failed to insert user: %w", err)
        }

        userID, err := result.LastInsertId()
        if err != nil {
            return fmt.Errorf("failed to get last insert ID: %w", err)
        }
        user.ID = userID

        // 插入用户详情
        profile.UserID = userID
        profile.UpdatedAt = time.Now()

        profileQuery := `INSERT INTO user_profiles 
            (user_id, real_name, age, gender, address, updated_at) 
            VALUES (?, ?, ?, ?, ?, ?)`

        _, err = tx.ExecContext(ctx, profileQuery,
            profile.UserID, profile.RealName, profile.Age,
            profile.Gender, profile.Address, profile.UpdatedAt)
        if err != nil {
            return fmt.Errorf("failed to insert user profile: %w", err)
        }

        return nil
    })
}

// Main 函数和初始化
func main() {
    // 初始化数据库连接
    dbConfig := DBConfig{
        Host:            "localhost",
        Port:            3306,
        User:            "root",
        Password:        "password",
        Database:        "userdb",
        MaxOpenConns:    100,
        MaxIdleConns:    10,
        ConnMaxLifetime: time.Hour,
    }

    db, err := NewDatabase(dbConfig)
    if err != nil {
        log.Fatalf("Failed to initialize database: %v", err)
    }
    defer db.Close()

    // 初始化仓库和服务
    userRepo := NewUserRepository(db)
    userService := NewUserService(userRepo)

    // 示例:创建用户
    ctx := context.Background()
    user, err := userService.CreateUser(ctx, "johndoe", "john@example.com", "securepassword")
    if err != nil {
        log.Printf("Failed to create user: %v", err)
    } else {
        log.Printf("Created user: %+v", user)
    }

    // 示例:批量获取用户
    userIDs := []int64{1, 2, 3, 4, 5}
    users, err := userRepo.BatchGetUsers(ctx, userIDs)
    if err != nil {
        log.Printf("Failed to batch get users: %v", err)
    } else {
        log.Printf("Fetched %d users", len(users))
    }
}

总结

本章节通过实现一个完整的用户管理系统,涵盖了以下关键知识点:

  1. 数据模型设计:设计了用户核心数据模型及相关扩展模型
  2. 数据库优化
  3. 使用连接池管理数据库连接
  4. 实现批量操作减少查询次数
  5. 使用并发处理提高性能
  6. 完整持久化方案
  7. 实现基本的CRUD操作
  8. 添加事务支持保证数据一致性
  9. 实现软删除和业务逻辑验证

在实际项目中,还需要考虑以下扩展点:

  1. 添加缓存层(如Redis)减少数据库压力
  2. 实现分页查询优化大数据量查询
  3. 添加数据库索引优化查询性能
  4. 实现数据迁移和版本管理
  5. 添加监控和日志记录数据库操作

这些实践可以帮助您构建高性能、可扩展的用户管理系统数据层。