跳转至

7.8 Redis集成与基础操作

Redis是一个高性能的键值对存储数据库,广泛应用于缓存、会话存储、实时分析等场景。本章节将详细介绍在Go语言中如何集成Redis,以及各种常用操作的实现方法。

学习目标

  • 掌握Redis客户端的选择与配置
  • 熟练操作Redis的各种数据类型
  • 理解Redis连接池的管理机制
  • 建立Redis操作的最佳实践

核心内容

1. Redis客户端选择

在Go语言中,有多个优秀的Redis客户端库可供选择,其中最流行的是go-redisredigo

1.1 go-redis库详解

go-redis(现在通常指github.com/go-redis/redis/v8)是一个功能全面、类型安全的Redis客户端,支持Redis的所有特性。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    // 创建客户端
    rdb := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379", // Redis地址
        Password: "",               // 密码
        DB:       0,                // 使用默认数据库
    })

    // 测试连接
    ctx := context.Background()
    pong, err := rdb.Ping(ctx).Result()
    if err != nil {
        fmt.Println("连接Redis失败:", err)
        return
    }
    fmt.Println("连接成功:", pong)

    // 设置键值对
    err = rdb.Set(ctx, "name", "go-redis", 0).Err()
    if err != nil {
        fmt.Println("设置键值失败:", err)
        return
    }

    // 获取值
    val, err := rdb.Get(ctx, "name").Result()
    if err != nil {
        fmt.Println("获取值失败:", err)
        return
    }
    fmt.Println("name:", val)
}

go-redis的主要特点: - 完全支持Redis 6+的新特性 - 内置连接池管理 - 支持发布/订阅、事务、管道等高级特性 - 类型安全,每个命令都有对应的方法 - 支持上下文(Context)用于超时控制和取消操作

1.2 redigo库特性对比

redigogithub.com/gomodule/redigo/redis)是另一个广泛使用的Redis客户端,它更接近Redis的原始命令风格。

package main

import (
    "fmt"
    "github.com/gomodule/redigo/redis"
    "time"
)

func main() {
    // 建立连接
    conn, err := redis.Dial("tcp", "localhost:6379")
    if err != nil {
        fmt.Println("连接Redis失败:", err)
        return
    }
    defer conn.Close()

    // 测试连接
    pong, err := redis.String(conn.Do("PING"))
    if err != nil {
        fmt.Println("PING失败:", err)
        return
    }
    fmt.Println("连接成功:", pong)

    // 设置键值对
    _, err = conn.Do("SET", "name", "redigo")
    if err != nil {
        fmt.Println("设置键值失败:", err)
        return
    }

    // 获取值
    val, err := redis.String(conn.Do("GET", "name"))
    if err != nil {
        fmt.Println("获取值失败:", err)
        return
    }
    fmt.Println("name:", val)
}

redigo的主要特点: - 轻量级,API简洁 - 采用命令字符串方式调用,更接近Redis CLI - 需要手动处理类型转换 - 连接池实现简单高效 - 适合对Redis命令非常熟悉的开发者

1.3 客户端性能与功能评估

选择客户端时可以从以下几个维度进行评估:

  1. 性能:在高并发场景下的吞吐量和延迟
  2. 功能完整性:对Redis所有命令和特性的支持程度
  3. 易用性:API设计是否直观,是否类型安全
  4. 社区活跃度:更新频率,issue处理速度
  5. 文档质量:是否有完善的文档和示例

在大多数情况下,go-redis由于其类型安全和全面的功能支持,是更好的选择,尤其是对于大型项目。而redigo则适合追求简洁和对Redis命令非常熟悉的开发者。

2. Redis连接管理

2.1 单实例连接配置

单实例连接适用于开发环境或Redis单节点部署的场景。

使用go-redis配置单实例连接:

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
    "time"
)

func main() {
    // 配置Redis客户端
    rdb := redis.NewClient(&redis.Options{
        Addr:         "localhost:6379",  // Redis服务器地址
        Password:     "",                // 密码,如果没有则为空
        DB:           0,                 // 数据库编号
        MaxRetries:   3,                 // 最大重试次数
        MinRetryBackoff: 100 * time.Millisecond, // 最小重试间隔
        MaxRetryBackoff: 1 * time.Second,        // 最大重试间隔

        // 连接池配置
        PoolSize:     10,                // 连接池大小
        MinIdleConns: 5,                 // 最小空闲连接数
        MaxConnAge:   30 * time.Minute,  // 连接的最大生存期
        IdleTimeout:  5 * time.Minute,   // 连接的最大空闲时间
    })

    // 测试连接
    ctx := context.Background()
    _, err := rdb.Ping(ctx).Result()
    if err != nil {
        fmt.Println("连接失败:", err)
        return
    }
    fmt.Println("Redis单实例连接成功")
}

2.2 集群模式连接

Redis集群提供了高可用性和水平扩展能力,go-redis对集群模式有良好的支持。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    // 配置Redis集群客户端
    rdb := redis.NewClusterClient(&redis.ClusterOptions{
        Addrs: []string{
            "localhost:7000",
            "localhost:7001",
            "localhost:7002",
            "localhost:7003",
            "localhost:7004",
            "localhost:7005",
        },
        Password:    "",               // 密码
        MaxRetries:  3,                // 最大重试次数

        // 连接池配置
        PoolSize:    10,               // 每个节点的连接池大小
        MinIdleConns: 5,               // 每个节点的最小空闲连接数
    })

    // 测试连接
    ctx := context.Background()
    _, err := rdb.Ping(ctx).Result()
    if err != nil {
        fmt.Println("集群连接失败:", err)
        return
    }
    fmt.Println("Redis集群连接成功")
}

2.3 哨兵模式配置

哨兵模式用于实现Redis的高可用,go-redis同样支持通过哨兵连接Redis主从集群。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    // 配置哨兵模式客户端
    rdb := redis.NewFailoverClient(&redis.FailoverOptions{
        MasterName:    "mymaster",       // 主节点名称
        SentinelAddrs: []string{         // 哨兵地址列表
            "localhost:26379",
            "localhost:26380",
            "localhost:26381",
        },
        Password:    "",                 // 密码
        DB:          0,                  // 数据库编号

        // 连接池配置
        PoolSize:    10,                 // 连接池大小
        MinIdleConns: 5,                 // 最小空闲连接数
    })

    // 测试连接
    ctx := context.Background()
    _, err := rdb.Ping(ctx).Result()
    if err != nil {
        fmt.Println("哨兵模式连接失败:", err)
        return
    }
    fmt.Println("Redis哨兵模式连接成功")
}

3. 基本数据类型操作

3.1 String类型操作

String是Redis最基本的数据类型,一个键对应一个值。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
    "time"
)

func main() {
    // 创建客户端
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer rdb.Close()

    ctx := context.Background()

    // 设置字符串
    err := rdb.Set(ctx, "username", "john_doe", 10*time.Minute).Err()
    if err != nil {
        fmt.Println("Set error:", err)
        return
    }

    // 获取字符串
    val, err := rdb.Get(ctx, "username").Result()
    if err != nil {
        fmt.Println("Get error:", err)
        return
    }
    fmt.Println("username:", val)

    // 自增
    err = rdb.Incr(ctx, "counter").Err()
    if err != nil {
        fmt.Println("Incr error:", err)
        return
    }

    // 获取自增后的值
    counter, err := rdb.Get(ctx, "counter").Int64()
    if err != nil {
        fmt.Println("Get counter error:", err)
        return
    }
    fmt.Println("counter:", counter)

    // 批量设置
    err = rdb.MSet(ctx, "name", "John", "age", "30").Err()
    if err != nil {
        fmt.Println("MSet error:", err)
        return
    }

    // 批量获取
    vals, err := rdb.MGet(ctx, "name", "age").Result()
    if err != nil {
        fmt.Println("MGet error:", err)
        return
    }
    fmt.Println("name and age:", vals)
}

3.2 Hash类型应用

Hash类型适合存储对象,可以将一个对象的多个字段存储在一个键中。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer rdb.Close()

    ctx := context.Background()
    key := "user:1001"

    // 设置哈希字段
    err := rdb.HSet(ctx, key, map[string]interface{}{
        "name":  "John Doe",
        "email": "john@example.com",
        "age":   30,
    }).Err()
    if err != nil {
        fmt.Println("HSet error:", err)
        return
    }

    // 获取单个字段
    name, err := rdb.HGet(ctx, key, "name").Result()
    if err != nil {
        fmt.Println("HGet error:", err)
        return
    }
    fmt.Println("Name:", name)

    // 获取多个字段
    fields, err := rdb.HMGet(ctx, key, "name", "age").Result()
    if err != nil {
        fmt.Println("HMGet error:", err)
        return
    }
    fmt.Println("Name and Age:", fields)

    // 获取所有字段和值
    all, err := rdb.HGetAll(ctx, key).Result()
    if err != nil {
        fmt.Println("HGetAll error:", err)
        return
    }
    fmt.Println("All fields:", all)

    // 检查字段是否存在
    exists, err := rdb.HExists(ctx, key, "email").Result()
    if err != nil {
        fmt.Println("HExists error:", err)
        return
    }
    fmt.Println("Email exists:", exists)

    // 删除字段
    deleted, err := rdb.HDel(ctx, key, "age").Result()
    if err != nil {
        fmt.Println("HDel error:", err)
        return
    }
    fmt.Println("Deleted fields:", deleted)
}

3.3 List队列操作

List是有序的字符串列表,适合实现队列、栈等数据结构。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer rdb.Close()

    ctx := context.Background()
    key := "tasks"

    // 向列表左侧添加元素
    err := rdb.LPush(ctx, key, "task1", "task2").Err()
    if err != nil {
        fmt.Println("LPush error:", err)
        return
    }

    // 向列表右侧添加元素
    err = rdb.RPush(ctx, key, "task3").Err()
    if err != nil {
        fmt.Println("RPush error:", err)
        return
    }

    // 获取列表长度
    length, err := rdb.LLen(ctx, key).Result()
    if err != nil {
        fmt.Println("LLen error:", err)
        return
    }
    fmt.Println("List length:", length)

    // 获取列表元素(0表示第一个,-1表示最后一个)
    tasks, err := rdb.LRange(ctx, key, 0, -1).Result()
    if err != nil {
        fmt.Println("LRange error:", err)
        return
    }
    fmt.Println("All tasks:", tasks)

    // 从右侧弹出元素(队列操作)
    task, err := rdb.RPop(ctx, key).Result()
    if err != nil {
        fmt.Println("RPop error:", err)
        return
    }
    fmt.Println("Processed task:", task)

    // 从左侧弹出元素(栈操作)
    task, err = rdb.LPop(ctx, key).Result()
    if err != nil {
        fmt.Println("LPop error:", err)
        return
    }
    fmt.Println("Processed task:", task)
}

3.4 Set集合操作

Set是无序的字符串集合,不允许重复元素,适合存储唯一值的集合。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer rdb.Close()

    ctx := context.Background()
    key := "tags"

    // 向集合添加元素
    added, err := rdb.SAdd(ctx, key, "go", "redis", "database").Result()
    if err != nil {
        fmt.Println("SAdd error:", err)
        return
    }
    fmt.Println("Added elements:", added)

    // 检查元素是否在集合中
    isMember, err := rdb.SIsMember(ctx, key, "go").Result()
    if err != nil {
        fmt.Println("SIsMember error:", err)
        return
    }
    fmt.Println("Is 'go' a member:", isMember)

    // 获取集合所有元素
    members, err := rdb.SMembers(ctx, key).Result()
    if err != nil {
        fmt.Println("SMembers error:", err)
        return
    }
    fmt.Println("All members:", members)

    // 获取集合大小
    size, err := rdb.SCard(ctx, key).Result()
    if err != nil {
        fmt.Println("SCard error:", err)
        return
    }
    fmt.Println("Set size:", size)

    // 从集合中移除元素
    removed, err := rdb.SRem(ctx, key, "database").Result()
    if err != nil {
        fmt.Println("SRem error:", err)
        return
    }
    fmt.Println("Removed elements:", removed)

    // 创建另一个集合用于集合操作
    rdb.SAdd(ctx, "tags2", "redis", "nosql", "cache")

    // 计算两个集合的交集
    intersection, err := rdb.SInter(ctx, key, "tags2").Result()
    if err != nil {
        fmt.Println("SInter error:", err)
        return
    }
    fmt.Println("Intersection:", intersection)

    // 计算两个集合的并集
    union, err := rdb.SUnion(ctx, key, "tags2").Result()
    if err != nil {
        fmt.Println("SUnion error:", err)
        return
    }
    fmt.Println("Union:", union)
}

3.5 ZSet有序集合

ZSet是有序的集合,每个元素都关联一个分数,用于排序。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer rdb.Close()

    ctx := context.Background()
    key := "leaderboard"

    // 添加有序集合元素
    added, err := rdb.ZAdd(ctx, key, &redis.Z{Score: 90, Member: "Alice"},
        &redis.Z{Score: 85, Member: "Bob"},
        &redis.Z{Score: 95, Member: "Charlie"}).Result()
    if err != nil {
        fmt.Println("ZAdd error:", err)
        return
    }
    fmt.Println("Added elements:", added)

    // 获取元素的分数
    score, err := rdb.ZScore(ctx, key, "Bob").Result()
    if err != nil {
        fmt.Println("ZScore error:", err)
        return
    }
    fmt.Println("Bob's score:", score)

    // 增加元素的分数
    newScore, err := rdb.ZIncrBy(ctx, key, 5, "Bob").Result()
    if err != nil {
        fmt.Println("ZIncrBy error:", err)
        return
    }
    fmt.Println("Bob's new score:", newScore)

    // 获取有序集合的长度
    size, err := rdb.ZCard(ctx, key).Result()
    if err != nil {
        fmt.Println("ZCard error:", err)
        return
    }
    fmt.Println("ZSet size:", size)

    // 获取排名(从低到高)
    rank, err := rdb.ZRank(ctx, key, "Bob").Result()
    if err != nil {
        fmt.Println("ZRank error:", err)
        return
    }
    fmt.Println("Bob's rank (asc):", rank)

    // 获取排名(从高到低)
    revRank, err := rdb.ZRevRank(ctx, key, "Bob").Result()
    if err != nil {
        fmt.Println("ZRevRank error:", err)
        return
    }
    fmt.Println("Bob's rank (desc):", revRank)

    // 获取分数范围内的元素(升序)
    rangeByScore, err := rdb.ZRangeByScore(ctx, key, &redis.ZRangeBy{
        Min: "80",
        Max: "95",
    }).Result()
    if err != nil {
        fmt.Println("ZRangeByScore error:", err)
        return
    }
    fmt.Println("Elements between 80-95:", rangeByScore)

    // 获取前两名(降序)
    topTwo, err := rdb.ZRevRange(ctx, key, 0, 1).Result()
    if err != nil {
        fmt.Println("ZRevRange error:", err)
        return
    }
    fmt.Println("Top two:", topTwo)
}

4. 高级数据结构

4.1 Bitmap位图操作

Bitmap是一种特殊的字符串,用于高效存储和操作位数据,适合实现布隆过滤器、用户在线状态等功能。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer rdb.Close()

    ctx := context.Background()
    key := "user:login:20231001"

    // 设置位(用户ID为10086的用户登录)
    err := rdb.SetBit(ctx, key, 10086, 1).Err()
    if err != nil {
        fmt.Println("SetBit error:", err)
        return
    }

    // 设置另一个位(用户ID为10010的用户登录)
    err = rdb.SetBit(ctx, key, 10010, 1).Err()
    if err != nil {
        fmt.Println("SetBit error:", err)
        return
    }

    // 检查位是否设置(用户是否登录)
    isLogin, err := rdb.GetBit(ctx, key, 10086).Result()
    if err != nil {
        fmt.Println("GetBit error:", err)
        return
    }
    fmt.Println("User 10086 logged in:", isLogin == 1)

    // 计算当天登录用户总数
    count, err := rdb.BitCount(ctx, key, &redis.BitCount{Start: 0, End: -1}).Result()
    if err != nil {
        fmt.Println("BitCount error:", err)
        return
    }
    fmt.Println("Total logged in users:", count)

    // 另一天的登录数据
    key2 := "user:login:20231002"
    rdb.SetBit(ctx, key2, 10086, 1)
    rdb.SetBit(ctx, key2, 10000, 1)

    // 计算两天都登录的用户数(交集)
    destKey := "user:login:both_days"
    err = rdb.BitOpAnd(ctx, destKey, key, key2).Err()
    if err != nil {
        fmt.Println("BitOpAnd error:", err)
        return
    }

    bothDaysCount, err := rdb.BitCount(ctx, destKey, nil).Result()
    if err != nil {
        fmt.Println("BitCount error:", err)
        return
    }
    fmt.Println("Users logged in both days:", bothDaysCount)
}

4.2 HyperLogLog基数统计

HyperLogLog是一种概率数据结构,用于高效估算集合的基数(元素个数),适合统计页面UV等场景。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer rdb.Close()

    ctx := context.Background()
    key := "page:uv:homepage"

    // 添加访问用户ID
    added, err := rdb.PFAdd(ctx, key, "user1", "user2", "user3").Result()
    if err != nil {
        fmt.Println("PFAdd error:", err)
        return
    }
    fmt.Println("Added to HyperLogLog:", added)

    // 再添加一些用户,包括重复的
    rdb.PFAdd(ctx, key, "user3", "user4", "user5")

    // 估算基数(独立用户数)
    count, err := rdb.PFCount(ctx, key).Result()
    if err != nil {
        fmt.Println("PFCount error:", err)
        return
    }
    fmt.Println("Estimated unique users:", count)

    // 创建另一个HyperLogLog
    key2 := "page:uv:productpage"
    rdb.PFAdd(ctx, key2, "user3", "user5", "user6", "user7")

    // 合并两个HyperLogLog
    err = rdb.PFMerge(ctx, "page:uv:combined", key, key2).Err()
    if err != nil {
        fmt.Println("PFMerge error:", err)
        return
    }

    // 估算合并后的基数
    combinedCount, err := rdb.PFCount(ctx, "page:uv:combined").Result()
    if err != nil {
        fmt.Println("PFCount error:", err)
        return
    }
    fmt.Println("Estimated unique users (combined):", combinedCount)
}

4.3 GEO地理位置功能

GEO功能用于存储和操作地理位置信息,适合实现附近的人、商家等功能。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer rdb.Close()

    ctx := context.Background()
    key := "restaurants"

    // 添加地理位置信息(经度,纬度,名称)
    added, err := rdb.GeoAdd(ctx, key, &redis.GeoLocation{
        Longitude: 116.403874,
        Latitude:  39.914885,
        Name:      "Restaurant A",
    }, &redis.GeoLocation{
        Longitude: 116.410088,
        Latitude:  39.907404,
        Name:      "Restaurant B",
    }, &redis.GeoLocation{
        Longitude: 116.395645,
        Latitude:  39.929986,
        Name:      "Restaurant C",
    }).Result()
    if err != nil {
        fmt.Println("GeoAdd error:", err)
        return
    }
    fmt.Println("Added locations:", added)

    // 获取地理位置的经纬度
    pos, err := rdb.GeoPos(ctx, key, "Restaurant A").Result()
    if err != nil {
        fmt.Println("GeoPos error:", err)
        return
    }
    fmt.Println("Restaurant A position:", pos[0])

    // 计算两个位置之间的距离(单位:米)
    distance, err := rdb.GeoDist(ctx, key, "Restaurant A", "Restaurant B", "m").Result()
    if err != nil {
        fmt.Println("GeoDist error:", err)
        return
    }
    fmt.Println("Distance between A and B (meters):", distance)

    // 查找指定位置附近的元素
    // 参数:经度,纬度,半径,单位
    nearby, err := rdb.GeoRadius(ctx, key, 116.403874, 39.914885, &redis.GeoRadiusQuery{
        Radius: 2000,
        Unit:   "m",
        Count:  10,
        Sort:   "ASC",
    }).Result()
    if err != nil {
        fmt.Println("GeoRadius error:", err)
        return
    }

    fmt.Println("Restaurants within 2km:")
    for _, place := range nearby {
        fmt.Printf("- %s: %.2f meters\n", place.Name, place.Distance)
    }
}

5. Redis连接池优化

连接池是高效使用Redis连接的关键,合理配置连接池可以显著提升性能。

5.1 连接池参数配置

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
    "time"
)

func main() {
    // 优化的连接池配置
    rdb := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "",
        DB:       0,

        // 连接池配置
        PoolSize:        100,          // 连接池最大连接数,根据并发量调整
        MinIdleConns:    20,           // 最小空闲连接数,保持一定数量的热连接
        MaxConnAge:      30 * time.Minute, // 连接的最大生存期,避免长时间连接
        IdleTimeout:     5 * time.Minute,  // 连接的最大空闲时间
        ReadTimeout:     3 * time.Second,  // 读超时
        WriteTimeout:    3 * time.Second,  // 写超时
        PoolTimeout:     5 * time.Second,  // 获取连接的超时时间

        // 重试策略
        MaxRetries:      3,            // 最大重试次数
        MinRetryBackoff: 100 * time.Millisecond, // 最小重试间隔
        MaxRetryBackoff: 1 * time.Second,        // 最大重试间隔
    })

    ctx := context.Background()
    _, err := rdb.Ping(ctx).Result()
    if err != nil {
        fmt.Println("Connection error:", err)
        return
    }
    fmt.Println("Connected with optimized pool settings")

    // 打印连接池状态
    printPoolStats(rdb.PoolStats())
}

// 打印连接池状态
func printPoolStats(stats *redis.PoolStats) {
    fmt.Println("\nConnection Pool Stats:")
    fmt.Printf("Total connections: %d\n", stats.TotalConns)
    fmt.Printf("Idle connections: %d\n", stats.IdleConns)
    fmt.Printf("Stale connections: %d\n", stats.StaleConns)
}

5.2 连接复用策略

连接复用是提升Redis性能的关键,以下是一些最佳实践:

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
    "sync"
)

// 全局Redis客户端实例,确保单例
var (
    rdb     *redis.Client
    once    sync.Once
)

// 获取Redis客户端实例(单例模式)
func getRedisClient() *redis.Client {
    once.Do(func() {
        rdb = redis.NewClient(&redis.Options{
            Addr:         "localhost:6379",
            PoolSize:     50,
            MinIdleConns: 10,
            IdleTimeout:  5 * 60 * time.Second,
        })
    })
    return rdb
}

// 封装Redis操作,确保连接正确使用
func redisOperation(ctx context.Context, fn func(*redis.Client) error) error {
    client := getRedisClient()
    return fn(client)
}

func main() {
    ctx := context.Background()

    // 使用封装的方法进行Redis操作
    err := redisOperation(ctx, func(client *redis.Client) error {
        return client.Set(ctx, "example", "connection_pool", 0).Err()
    })
    if err != nil {
        fmt.Println("Redis operation error:", err)
        return
    }

    val, err := redisOperationWithResult(ctx, func(client *redis.Client) (interface{}, error) {
        return client.Get(ctx, "example").Result()
    })
    if err != nil {
        fmt.Println("Redis get error:", err)
        return
    }

    fmt.Println("Value:", val)
}

// 带返回值的Redis操作封装
func redisOperationWithResult(ctx context.Context, fn func(*redis.Client) (interface{}, error)) (interface{}, error) {
    client := getRedisClient()
    return fn(client)
}

5.3 性能监控与调优

监控Redis连接池性能并进行调优:

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
    "time"
)

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr:         "localhost:6379",
        PoolSize:     50,
        MinIdleConns: 10,
    })

    ctx := context.Background()

    // 定期打印连接池状态
    go func() {
        ticker := time.NewTicker(5 * time.Second)
        defer ticker.Stop()

        for {
            select {
            case <-ticker.C:
                printPoolStats(rdb.PoolStats())
            }
        }
    }()

    // 模拟负载测试
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                rdb.Incr(ctx, "counter").Err()
                time.Sleep(time.Microsecond * 100)
            }
        }()
    }

    wg.Wait()
    fmt.Println("Load test completed")
}

func printPoolStats(stats *redis.PoolStats) {
    fmt.Printf("\nPool stats at %s:\n", time.Now().Format(time.RFC3339))
    fmt.Printf("Total connections: %d\n", stats.TotalConns)
    fmt.Printf("Idle connections: %d\n", stats.IdleConns)
    fmt.Printf("Stale connections: %d\n", stats.StaleConns)
}

调优建议: 1. 根据并发量调整PoolSize,通常设置为CPU核心数的5-10倍 2. MinIdleConns设置为预期的稳定并发量,避免频繁创建新连接 3. 监控StaleConns数量,如果持续增长,可能是网络问题或Redis服务器不稳定 4. 观察IdleConns,如果长期为0,说明连接池太小 5. 如果获取连接经常超时,需要增大PoolSizePoolTimeout

6. 管道与事务

6.1 Pipeline批量操作

Pipeline用于批量执行多个命令,减少网络往返次数,提高性能。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
    "time"
)

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer rdb.Close()

    ctx := context.Background()

    // 普通方式执行多个命令
    start := time.Now()
    for i := 0; i < 1000; i++ {
        key := fmt.Sprintf("key%d", i)
        rdb.Set(ctx, key, i, 0)
    }
    fmt.Printf("Normal execution time: %v\n", time.Since(start))

    // 使用Pipeline批量执行
    start = time.Now()
    pipe := rdb.Pipeline()

    for i := 1000; i < 2000; i++ {
        key := fmt.Sprintf("key%d", i)
        pipe.Set(ctx, key, i, 0)
    }

    // 执行Pipeline中的所有命令
    _, err := pipe.Exec(ctx)
    if err != nil {
        fmt.Println("Pipeline error:", err)
        return
    }

    fmt.Printf("Pipeline execution time: %v\n", time.Since(start))

    // 读取Pipeline执行结果
    pipe = rdb.Pipeline()
    pipe.Get(ctx, "key500")
    pipe.Get(ctx, "key1500")

    results, err := pipe.Exec(ctx)
    if err != nil {
        fmt.Println("Pipeline get error:", err)
        return
    }

    for _, result := range results {
        val, _ := result.(*redis.StringCmd).Result()
        fmt.Println("Value:", val)
    }
}

6.2 事务操作实现

Redis事务可以确保一组命令原子性执行,要么全部执行,要么全部不执行。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer rdb.Close()

    ctx := context.Background()

    // 清除测试数据
    rdb.Del(ctx, "balance", "debt")

    // 开启事务
    tx := rdb.TxPipeline()

    // 事务中的命令
    tx.IncrBy(ctx, "balance", 100)
    tx.IncrBy(ctx, "debt", 50)

    // 执行事务
    _, err := tx.Exec(ctx)
    if err != nil {
        fmt.Println("Transaction error:", err)
        return
    }

    // 查看结果
    balance, _ := rdb.Get(ctx, "balance").Int64()
    debt, _ := rdb.Get(ctx, "debt").Int64()

    fmt.Println("Balance:", balance)
    fmt.Println("Debt:", debt)

    // 使用Watch实现乐观锁
    // 监控balance键,如果在事务执行前被修改,事务会失败
    err = rdb.Watch(ctx, func(tx *redis.Tx) error {
        // 获取当前值
        val, err := tx.Get(ctx, "balance").Int64()
        if err != nil && err != redis.Nil {
            return err
        }

        // 准备事务
        pipe := tx.Pipeline()
        pipe.Set(ctx, "balance", val+100, 0)

        // 执行事务
        _, err = pipe.Exec(ctx)
        return err
    }, "balance") // 监控的键

    if err != nil {
        fmt.Println("Watch transaction error:", err)
        return
    }

    newBalance, _ := rdb.Get(ctx, "balance").Int64()
    fmt.Println("New balance after watch transaction:", newBalance)
}

6.3 Lua脚本应用

Lua脚本可以在Redis服务器端原子性地执行多个命令,适合实现复杂的业务逻辑。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
)

// 定义Lua脚本:转账操作
const transferScript = `
-- 检查余额是否充足
local balance = tonumber(redis.call('get', KEYS[1]) or "0")
local amount = tonumber(ARGV[1])

if balance >= amount then
    -- 扣减余额
    redis.call('decrby', KEYS[1], amount)
    -- 增加目标账户余额
    redis.call('incrby', KEYS[2], amount)
    return 1  -- 成功
else
    return 0  -- 失败
end
`

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer rdb.Close()

    ctx := context.Background()

    // 初始化账户余额
    rdb.Set(ctx, "account:1001", 1000, 0)
    rdb.Set(ctx, "account:1002", 500, 0)

    // 加载Lua脚本并获取SHA1哈希
    script := redis.NewScript(transferScript)
    sha, err := script.Load(ctx, rdb).Result()
    if err != nil {
        fmt.Println("Load script error:", err)
        return
    }
    fmt.Println("Script SHA:", sha)

    // 执行Lua脚本
    // KEYS: 源账户,目标账户
    // ARGV: 转账金额
    result, err := rdb.EvalSha(ctx, sha, []string{"account:1001", "account:1002"}, 300).Int64()
    if err != nil {
        fmt.Println("Eval script error:", err)
        return
    }

    if result == 1 {
        fmt.Println("Transfer successful")
    } else {
        fmt.Println("Transfer failed: insufficient balance")
    }

    // 查看转账后余额
    acc1, _ := rdb.Get(ctx, "account:1001").Int64()
    acc2, _ := rdb.Get(ctx, "account:1002").Int64()

    fmt.Println("Account 1001 balance:", acc1)
    fmt.Println("Account 1002 balance:", acc2)

    // 尝试转账超出余额的金额
    result, err = rdb.EvalSha(ctx, sha, []string{"account:1001", "account:1002"}, 2000).Int64()
    if err != nil {
        fmt.Println("Eval script error:", err)
        return
    }

    if result == 1 {
        fmt.Println("Second transfer successful")
    } else {
        fmt.Println("Second transfer failed: insufficient balance")
    }
}

实战练习

练习1:用户会话管理系统

实现一个基于Redis的用户会话管理系统,支持会话创建、验证、更新和销毁。

package main

import (
    "context"
    "crypto/rand"
    "encoding/base64"
    "fmt"
    "github.com/go-redis/redis/v8"
    "time"
)

// SessionManager 会话管理器
type SessionManager struct {
    rdb        *redis.Client
    prefix     string        // 键前缀
    expiration time.Duration // 会话过期时间
}

// NewSessionManager 创建新的会话管理器
func NewSessionManager(addr string, prefix string, expiration time.Duration) *SessionManager {
    rdb := redis.NewClient(&redis.Options{
        Addr: addr,
    })
    return &SessionManager{
        rdb:        rdb,
        prefix:     prefix,
        expiration: expiration,
    }
}

// generateSessionID 生成随机会话ID
func (sm *SessionManager) generateSessionID() string {
    b := make([]byte, 32)
    rand.Read(b)
    return base64.URLEncoding.EncodeToString(b)
}

// CreateSession 创建新会话
func (sm *SessionManager) CreateSession(ctx context.Context, userID string, data map[string]interface{}) (string, error) {
    sessionID := sm.generateSessionID()
    key := fmt.Sprintf("%s:%s", sm.prefix, sessionID)

    // 使用事务设置会话数据和过期时间
    tx := sm.rdb.TxPipeline()

    // 设置用户ID
    tx.HSet(ctx, key, "user_id", userID)

    // 设置额外数据
    for k, v := range data {
        tx.HSet(ctx, key, k, v)
    }

    // 设置过期时间
    tx.Expire(ctx, key, sm.expiration)

    // 执行事务
    _, err := tx.Exec(ctx)
    if err != nil {
        return "", err
    }

    return sessionID, nil
}

// GetSession 获取会话数据
func (sm *SessionManager) GetSession(ctx context.Context, sessionID string) (map[string]string, error) {
    key := fmt.Sprintf("%s:%s", sm.prefix, sessionID)

    // 获取所有会话数据
    data, err := sm.rdb.HGetAll(ctx, key).Result()
    if err != nil {
        return nil, err
    }

    // 如果会话不存在
    if len(data) == 0 {
        return nil, fmt.Errorf("session not found")
    }

    // 延长会话有效期
    sm.rdb.Expire(ctx, key, sm.expiration)

    return data, nil
}

// UpdateSession 更新会话数据
func (sm *SessionManager) UpdateSession(ctx context.Context, sessionID string, data map[string]interface{}) error {
    key := fmt.Sprintf("%s:%s", sm.prefix, sessionID)

    // 检查会话是否存在
    exists, err := sm.rdb.Exists(ctx, key).Result()
    if err != nil {
        return err
    }
    if exists == 0 {
        return fmt.Errorf("session not found")
    }

    // 更新会话数据
    tx := sm.rdb.TxPipeline()
    for k, v := range data {
        tx.HSet(ctx, key, k, v)
    }
    tx.Expire(ctx, key, sm.expiration)

    _, err = tx.Exec(ctx)
    return err
}

// DestroySession 销毁会话
func (sm *SessionManager) DestroySession(ctx context.Context, sessionID string) error {
    key := fmt.Sprintf("%s:%s", sm.prefix, sessionID)
    return sm.rdb.Del(ctx, key).Err()
}

// GetUserSessions 获取用户的所有会话
func (sm *SessionManager) GetUserSessions(ctx context.Context, userID string) ([]string, error) {
    // 注意:在实际应用中,可能需要维护用户与会话的映射关系
    // 这里简化处理,实际应用中应考虑使用Set存储用户的所有会话ID
    return []string{}, nil
}

func main() {
    // 创建会话管理器,会话有效期30分钟
    sm := NewSessionManager("localhost:6379", "session", 30*time.Minute)
    ctx := context.Background()

    // 创建新会话
    sessionID, err := sm.CreateSession(ctx, "user123", map[string]interface{}{
        "username": "john_doe",
        "email":    "john@example.com",
        "role":     "user",
    })
    if err != nil {
        fmt.Println("Create session error:", err)
        return
    }
    fmt.Println("Created session ID:", sessionID)

    // 获取会话数据
    sessionData, err := sm.GetSession(ctx, sessionID)
    if err != nil {
        fmt.Println("Get session error:", err)
        return
    }
    fmt.Println("Session data:", sessionData)

    // 更新会话数据
    err = sm.UpdateSession(ctx, sessionID, map[string]interface{}{
        "role": "admin",
    })
    if err != nil {
        fmt.Println("Update session error:", err)
        return
    }

    // 再次获取会话数据,验证更新
    updatedData, err := sm.GetSession(ctx, sessionID)
    if err != nil {
        fmt.Println("Get updated session error:", err)
        return
    }
    fmt.Println("Updated session data:", updatedData)

    // 销毁会话
    err = sm.DestroySession(ctx, sessionID)
    if err != nil {
        fmt.Println("Destroy session error:", err)
        return
    }
    fmt.Println("Session destroyed")

    // 验证会话已销毁
    _, err = sm.GetSession(ctx, sessionID)
    if err != nil {
        fmt.Println("Expected error after destruction:", err)
    }
}

练习2:实时排行榜实现

使用Redis的ZSet实现一个实时排行榜系统,支持添加分数、查询排名和获取排行榜。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
    "time"
)

// Leaderboard 排行榜
type Leaderboard struct {
    rdb  *redis.Client
    key  string // 存储排行榜的键名
}

// NewLeaderboard 创建新的排行榜
func NewLeaderboard(addr string, key string) *Leaderboard {
    rdb := redis.NewClient(&redis.Options{
        Addr: addr,
    })
    return &Leaderboard{
        rdb: rdb,
        key: key,
    }
}

// AddScore 添加或更新用户分数
func (lb *Leaderboard) AddScore(ctx context.Context, userID string, score float64) error {
    return lb.rdb.ZAdd(ctx, lb.key, &redis.Z{
        Score:  score,
        Member: userID,
    }).Err()
}

// IncrScore 增加用户分数
func (lb *Leaderboard) IncrScore(ctx context.Context, userID string, increment float64) (float64, error) {
    return lb.rdb.ZIncrBy(ctx, lb.key, increment, userID).Result()
}

// GetRank 获取用户排名(从1开始,分数高的排名靠前)
func (lb *Leaderboard) GetRank(ctx context.Context, userID string) (int64, error) {
    return lb.rdb.ZRevRank(ctx, lb.key, userID).Result() + 1
}

// GetScore 获取用户分数
func (lb *Leaderboard) GetScore(ctx context.Context, userID string) (float64, error) {
    return lb.rdb.ZScore(ctx, lb.key, userID).Result()
}

// GetTopN 获取排名前N的用户
func (lb *Leaderboard) GetTopN(ctx context.Context, n int64) ([]*redis.Z, error) {
    return lb.rdb.ZRevRangeWithScores(ctx, lb.key, 0, n-1).Result()
}

// GetUserAround 获取用户周围的用户(包括自己)
func (lb *Leaderboard) GetUserAround(ctx context.Context, userID string, count int64) ([]*redis.Z, error) {
    rank, err := lb.rdb.ZRevRank(ctx, lb.key, userID).Result()
    if err != nil {
        return nil, err
    }

    start := rank - count
    if start < 0 {
        start = 0
    }

    end := rank + count
    return lb.rdb.ZRevRangeWithScores(ctx, lb.key, start, end).Result()
}

// GetTotalUsers 获取总用户数
func (lb *Leaderboard) GetTotalUsers(ctx context.Context) (int64, error) {
    return lb.rdb.ZCard(ctx, lb.key).Result()
}

func main() {
    // 创建排行榜实例
    lb := NewLeaderboard("localhost:6379", "game:leaderboard")
    ctx := context.Background()

    // 清除测试数据
    lb.rdb.Del(ctx, lb.key)

    // 添加一些分数
    users := []struct {
        id    string
        score float64
    }{
        {"user1", 850},
        {"user2", 920},
        {"user3", 780},
        {"user4", 950},
        {"user5", 880},
    }

    for _, u := range users {
        err := lb.AddScore(ctx, u.id, u.score)
        if err != nil {
            fmt.Printf("Error adding score for %s: %v\n", u.id, err)
        }
    }

    // 增加某个用户的分数
    newScore, err := lb.IncrScore(ctx, "user3", 100)
    if err != nil {
        fmt.Println("Error incrementing score:", err)
    } else {
        fmt.Printf("User3's new score: %.0f\n", newScore)
    }

    // 获取用户排名
    rank, err := lb.GetRank(ctx, "user2")
    if err != nil {
        fmt.Println("Error getting rank:", err)
    } else {
        fmt.Printf("User2's rank: %d\n", rank)
    }

    // 获取前三名
    top3, err := lb.GetTopN(ctx, 3)
    if err != nil {
        fmt.Println("Error getting top 3:", err)
    } else {
        fmt.Println("Top 3 users:")
        for i, z := range top3 {
            fmt.Printf("%d. %s - %.0f\n", i+1, z.Member, z.Score)
        }
    }

    // 获取user2周围的用户
    around, err := lb.GetUserAround(ctx, "user2", 1)
    if err != nil {
        fmt.Println("Error getting around users:", err)
    } else {
        fmt.Println("\nUsers around user2:")
        for _, z := range around {
            rank, _ := lb.GetRank(ctx, z.Member.(string))
            fmt.Printf("Rank %d: %s - %.0f\n", rank, z.Member, z.Score)
        }
    }

    // 获取总用户数
    total, err := lb.GetTotalUsers(ctx)
    if err != nil {
        fmt.Println("Error getting total users:", err)
    } else {
        fmt.Printf("\nTotal users in leaderboard: %d\n", total)
    }
}

练习3:分布式锁的Redis实现

实现一个基于Redis的分布式锁,用于在分布式系统中保证资源的互斥访问。

package main

import (
    "context"
    "crypto/rand"
    "encoding/base64"
    "fmt"
    "github.com/go-redis/redis/v8"
    "sync"
    "time"
)

// RedisLock 基于Redis的分布式锁
type RedisLock struct {
    rdb        *redis.Client
    lockKey    string        // 锁的键名
    lockValue  string        // 锁的值,用于唯一标识锁的持有者
    expiration time.Duration // 锁的过期时间,防止死锁
}

// NewRedisLock 创建新的分布式锁实例
func NewRedisLock(addr string, lockKey string, expiration time.Duration) *RedisLock {
    rdb := redis.NewClient(&redis.Options{
        Addr: addr,
    })
    return &RedisLock{
        rdb:        rdb,
        lockKey:    lockKey,
        lockValue:  generateRandomValue(),
        expiration: expiration,
    }
}

// generateRandomValue 生成随机值,用于唯一标识锁的持有者
func generateRandomValue() string {
    b := make([]byte, 16)
    _, err := rand.Read(b)
    if err != nil {
        panic(err)
    }
    return base64.URLEncoding.EncodeToString(b)
}

// Lock 获取锁
// blocking表示是否阻塞等待,timeout为等待超时时间
func (rl *RedisLock) Lock(ctx context.Context, blocking bool, timeout time.Duration) (bool, error) {
    startTime := time.Now()

    for {
        // 尝试获取锁
        // 使用SET NX (Not Exist)命令,只有当键不存在时才设置成功
        // 同时设置过期时间,防止死锁
        success, err := rl.rdb.SetNX(ctx, rl.lockKey, rl.lockValue, rl.expiration).Result()
        if err != nil {
            return false, err
        }

        if success {
            // 获取锁成功
            return true, nil
        }

        // 如果不阻塞,直接返回失败
        if !blocking {
            return false, nil
        }

        // 检查是否超时
        if time.Since(startTime) >= timeout {
            return false, fmt.Errorf("timeout waiting for lock")
        }

        // 短暂休眠后重试
        time.Sleep(100 * time.Millisecond)
    }
}

// Unlock 释放锁
// 使用Lua脚本确保释放锁的原子性,只有持有锁的客户端才能释放锁
func (rl *RedisLock) Unlock(ctx context.Context) (bool, error) {
    // Lua脚本:检查锁的值是否匹配,如果匹配则删除锁
    script := redis.NewScript(`
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
    `)

    result, err := script.Run(ctx, rl.rdb, []string{rl.lockKey}, rl.lockValue).Int64()
    if err != nil {
        return false, err
    }

    return result == 1, nil
}

// Renew 延长锁的有效期
func (rl *RedisLock) Renew(ctx context.Context) (bool, error) {
    // Lua脚本:检查锁的值是否匹配,如果匹配则延长过期时间
    script := redis.NewScript(`
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('expire', KEYS[1], ARGV[2])
        else
            return 0
        end
    `)

    result, err := script.Run(ctx, rl.rdb, []string{rl.lockKey}, 
        rl.lockValue, int(rl.expiration.Seconds())).Int64()
    if err != nil {
        return false, err
    }

    return result == 1, nil
}

// 模拟需要加锁的资源操作
func accessResource(lock *RedisLock, clientID int) {
    ctx := context.Background()
    fmt.Printf("Client %d trying to acquire lock...\n", clientID)

    // 尝试获取锁,最多等待5秒
    acquired, err := lock.Lock(ctx, true, 5*time.Second)
    if err != nil {
        fmt.Printf("Client %d error acquiring lock: %v\n", clientID, err)
        return
    }

    if !acquired {
        fmt.Printf("Client %d failed to acquire lock\n", clientID)
        return
    }

    defer func() {
        // 释放锁
        released, err := lock.Unlock(ctx)
        if err != nil {
            fmt.Printf("Client %d error releasing lock: %v\n", clientID, err)
        } else if released {
            fmt.Printf("Client %d released lock\n", clientID)
        } else {
            fmt.Printf("Client %d failed to release lock (not owner)\n", clientID)
        }
    }()

    fmt.Printf("Client %d acquired lock, accessing resource...\n", clientID)

    // 模拟资源操作
    time.Sleep(2 * time.Second)

    fmt.Printf("Client %d finished accessing resource\n", clientID)
}

func main() {
    // 创建分布式锁实例,锁过期时间为5秒
    lock := NewRedisLock("localhost:6379", "resource:lock", 5*time.Second)

    // 模拟多个客户端同时竞争锁
    var wg sync.WaitGroup
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go func(clientID int) {
            defer wg.Done()
            accessResource(lock, clientID)
        }(i)
    }

    wg.Wait()
    fmt.Println("All clients finished")
}

以上就是Redis集成与基础操作的完整教程,涵盖了客户端选择、连接管理、数据类型操作、高级数据结构、连接池优化以及管道与事务等内容,并通过三个实战练习展示了Redis在实际项目中的应用。掌握这些知识,可以帮助你在Go语言开发中高效地使用Redis。