7.8 Redis集成与基础操作¶
Redis是一个高性能的键值对存储数据库,广泛应用于缓存、会话存储、实时分析等场景。本章节将详细介绍在Go语言中如何集成Redis,以及各种常用操作的实现方法。
学习目标¶
- 掌握Redis客户端的选择与配置
- 熟练操作Redis的各种数据类型
- 理解Redis连接池的管理机制
- 建立Redis操作的最佳实践
核心内容¶
1. Redis客户端选择¶
在Go语言中,有多个优秀的Redis客户端库可供选择,其中最流行的是go-redis和redigo。
1.1 go-redis库详解¶
go-redis(现在通常指github.com/go-redis/redis/v8)是一个功能全面、类型安全的Redis客户端,支持Redis的所有特性。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
// 创建客户端
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis地址
Password: "", // 密码
DB: 0, // 使用默认数据库
})
// 测试连接
ctx := context.Background()
pong, err := rdb.Ping(ctx).Result()
if err != nil {
fmt.Println("连接Redis失败:", err)
return
}
fmt.Println("连接成功:", pong)
// 设置键值对
err = rdb.Set(ctx, "name", "go-redis", 0).Err()
if err != nil {
fmt.Println("设置键值失败:", err)
return
}
// 获取值
val, err := rdb.Get(ctx, "name").Result()
if err != nil {
fmt.Println("获取值失败:", err)
return
}
fmt.Println("name:", val)
}
go-redis的主要特点: - 完全支持Redis 6+的新特性 - 内置连接池管理 - 支持发布/订阅、事务、管道等高级特性 - 类型安全,每个命令都有对应的方法 - 支持上下文(Context)用于超时控制和取消操作
1.2 redigo库特性对比¶
redigo(github.com/gomodule/redigo/redis)是另一个广泛使用的Redis客户端,它更接近Redis的原始命令风格。
package main
import (
"fmt"
"github.com/gomodule/redigo/redis"
"time"
)
func main() {
// 建立连接
conn, err := redis.Dial("tcp", "localhost:6379")
if err != nil {
fmt.Println("连接Redis失败:", err)
return
}
defer conn.Close()
// 测试连接
pong, err := redis.String(conn.Do("PING"))
if err != nil {
fmt.Println("PING失败:", err)
return
}
fmt.Println("连接成功:", pong)
// 设置键值对
_, err = conn.Do("SET", "name", "redigo")
if err != nil {
fmt.Println("设置键值失败:", err)
return
}
// 获取值
val, err := redis.String(conn.Do("GET", "name"))
if err != nil {
fmt.Println("获取值失败:", err)
return
}
fmt.Println("name:", val)
}
redigo的主要特点: - 轻量级,API简洁 - 采用命令字符串方式调用,更接近Redis CLI - 需要手动处理类型转换 - 连接池实现简单高效 - 适合对Redis命令非常熟悉的开发者
1.3 客户端性能与功能评估¶
选择客户端时可以从以下几个维度进行评估:
- 性能:在高并发场景下的吞吐量和延迟
- 功能完整性:对Redis所有命令和特性的支持程度
- 易用性:API设计是否直观,是否类型安全
- 社区活跃度:更新频率,issue处理速度
- 文档质量:是否有完善的文档和示例
在大多数情况下,go-redis由于其类型安全和全面的功能支持,是更好的选择,尤其是对于大型项目。而redigo则适合追求简洁和对Redis命令非常熟悉的开发者。
2. Redis连接管理¶
2.1 单实例连接配置¶
单实例连接适用于开发环境或Redis单节点部署的场景。
使用go-redis配置单实例连接:
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"time"
)
func main() {
// 配置Redis客户端
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis服务器地址
Password: "", // 密码,如果没有则为空
DB: 0, // 数据库编号
MaxRetries: 3, // 最大重试次数
MinRetryBackoff: 100 * time.Millisecond, // 最小重试间隔
MaxRetryBackoff: 1 * time.Second, // 最大重试间隔
// 连接池配置
PoolSize: 10, // 连接池大小
MinIdleConns: 5, // 最小空闲连接数
MaxConnAge: 30 * time.Minute, // 连接的最大生存期
IdleTimeout: 5 * time.Minute, // 连接的最大空闲时间
})
// 测试连接
ctx := context.Background()
_, err := rdb.Ping(ctx).Result()
if err != nil {
fmt.Println("连接失败:", err)
return
}
fmt.Println("Redis单实例连接成功")
}
2.2 集群模式连接¶
Redis集群提供了高可用性和水平扩展能力,go-redis对集群模式有良好的支持。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
// 配置Redis集群客户端
rdb := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{
"localhost:7000",
"localhost:7001",
"localhost:7002",
"localhost:7003",
"localhost:7004",
"localhost:7005",
},
Password: "", // 密码
MaxRetries: 3, // 最大重试次数
// 连接池配置
PoolSize: 10, // 每个节点的连接池大小
MinIdleConns: 5, // 每个节点的最小空闲连接数
})
// 测试连接
ctx := context.Background()
_, err := rdb.Ping(ctx).Result()
if err != nil {
fmt.Println("集群连接失败:", err)
return
}
fmt.Println("Redis集群连接成功")
}
2.3 哨兵模式配置¶
哨兵模式用于实现Redis的高可用,go-redis同样支持通过哨兵连接Redis主从集群。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
// 配置哨兵模式客户端
rdb := redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: "mymaster", // 主节点名称
SentinelAddrs: []string{ // 哨兵地址列表
"localhost:26379",
"localhost:26380",
"localhost:26381",
},
Password: "", // 密码
DB: 0, // 数据库编号
// 连接池配置
PoolSize: 10, // 连接池大小
MinIdleConns: 5, // 最小空闲连接数
})
// 测试连接
ctx := context.Background()
_, err := rdb.Ping(ctx).Result()
if err != nil {
fmt.Println("哨兵模式连接失败:", err)
return
}
fmt.Println("Redis哨兵模式连接成功")
}
3. 基本数据类型操作¶
3.1 String类型操作¶
String是Redis最基本的数据类型,一个键对应一个值。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"time"
)
func main() {
// 创建客户端
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
ctx := context.Background()
// 设置字符串
err := rdb.Set(ctx, "username", "john_doe", 10*time.Minute).Err()
if err != nil {
fmt.Println("Set error:", err)
return
}
// 获取字符串
val, err := rdb.Get(ctx, "username").Result()
if err != nil {
fmt.Println("Get error:", err)
return
}
fmt.Println("username:", val)
// 自增
err = rdb.Incr(ctx, "counter").Err()
if err != nil {
fmt.Println("Incr error:", err)
return
}
// 获取自增后的值
counter, err := rdb.Get(ctx, "counter").Int64()
if err != nil {
fmt.Println("Get counter error:", err)
return
}
fmt.Println("counter:", counter)
// 批量设置
err = rdb.MSet(ctx, "name", "John", "age", "30").Err()
if err != nil {
fmt.Println("MSet error:", err)
return
}
// 批量获取
vals, err := rdb.MGet(ctx, "name", "age").Result()
if err != nil {
fmt.Println("MGet error:", err)
return
}
fmt.Println("name and age:", vals)
}
3.2 Hash类型应用¶
Hash类型适合存储对象,可以将一个对象的多个字段存储在一个键中。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
ctx := context.Background()
key := "user:1001"
// 设置哈希字段
err := rdb.HSet(ctx, key, map[string]interface{}{
"name": "John Doe",
"email": "john@example.com",
"age": 30,
}).Err()
if err != nil {
fmt.Println("HSet error:", err)
return
}
// 获取单个字段
name, err := rdb.HGet(ctx, key, "name").Result()
if err != nil {
fmt.Println("HGet error:", err)
return
}
fmt.Println("Name:", name)
// 获取多个字段
fields, err := rdb.HMGet(ctx, key, "name", "age").Result()
if err != nil {
fmt.Println("HMGet error:", err)
return
}
fmt.Println("Name and Age:", fields)
// 获取所有字段和值
all, err := rdb.HGetAll(ctx, key).Result()
if err != nil {
fmt.Println("HGetAll error:", err)
return
}
fmt.Println("All fields:", all)
// 检查字段是否存在
exists, err := rdb.HExists(ctx, key, "email").Result()
if err != nil {
fmt.Println("HExists error:", err)
return
}
fmt.Println("Email exists:", exists)
// 删除字段
deleted, err := rdb.HDel(ctx, key, "age").Result()
if err != nil {
fmt.Println("HDel error:", err)
return
}
fmt.Println("Deleted fields:", deleted)
}
3.3 List队列操作¶
List是有序的字符串列表,适合实现队列、栈等数据结构。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
ctx := context.Background()
key := "tasks"
// 向列表左侧添加元素
err := rdb.LPush(ctx, key, "task1", "task2").Err()
if err != nil {
fmt.Println("LPush error:", err)
return
}
// 向列表右侧添加元素
err = rdb.RPush(ctx, key, "task3").Err()
if err != nil {
fmt.Println("RPush error:", err)
return
}
// 获取列表长度
length, err := rdb.LLen(ctx, key).Result()
if err != nil {
fmt.Println("LLen error:", err)
return
}
fmt.Println("List length:", length)
// 获取列表元素(0表示第一个,-1表示最后一个)
tasks, err := rdb.LRange(ctx, key, 0, -1).Result()
if err != nil {
fmt.Println("LRange error:", err)
return
}
fmt.Println("All tasks:", tasks)
// 从右侧弹出元素(队列操作)
task, err := rdb.RPop(ctx, key).Result()
if err != nil {
fmt.Println("RPop error:", err)
return
}
fmt.Println("Processed task:", task)
// 从左侧弹出元素(栈操作)
task, err = rdb.LPop(ctx, key).Result()
if err != nil {
fmt.Println("LPop error:", err)
return
}
fmt.Println("Processed task:", task)
}
3.4 Set集合操作¶
Set是无序的字符串集合,不允许重复元素,适合存储唯一值的集合。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
ctx := context.Background()
key := "tags"
// 向集合添加元素
added, err := rdb.SAdd(ctx, key, "go", "redis", "database").Result()
if err != nil {
fmt.Println("SAdd error:", err)
return
}
fmt.Println("Added elements:", added)
// 检查元素是否在集合中
isMember, err := rdb.SIsMember(ctx, key, "go").Result()
if err != nil {
fmt.Println("SIsMember error:", err)
return
}
fmt.Println("Is 'go' a member:", isMember)
// 获取集合所有元素
members, err := rdb.SMembers(ctx, key).Result()
if err != nil {
fmt.Println("SMembers error:", err)
return
}
fmt.Println("All members:", members)
// 获取集合大小
size, err := rdb.SCard(ctx, key).Result()
if err != nil {
fmt.Println("SCard error:", err)
return
}
fmt.Println("Set size:", size)
// 从集合中移除元素
removed, err := rdb.SRem(ctx, key, "database").Result()
if err != nil {
fmt.Println("SRem error:", err)
return
}
fmt.Println("Removed elements:", removed)
// 创建另一个集合用于集合操作
rdb.SAdd(ctx, "tags2", "redis", "nosql", "cache")
// 计算两个集合的交集
intersection, err := rdb.SInter(ctx, key, "tags2").Result()
if err != nil {
fmt.Println("SInter error:", err)
return
}
fmt.Println("Intersection:", intersection)
// 计算两个集合的并集
union, err := rdb.SUnion(ctx, key, "tags2").Result()
if err != nil {
fmt.Println("SUnion error:", err)
return
}
fmt.Println("Union:", union)
}
3.5 ZSet有序集合¶
ZSet是有序的集合,每个元素都关联一个分数,用于排序。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
ctx := context.Background()
key := "leaderboard"
// 添加有序集合元素
added, err := rdb.ZAdd(ctx, key, &redis.Z{Score: 90, Member: "Alice"},
&redis.Z{Score: 85, Member: "Bob"},
&redis.Z{Score: 95, Member: "Charlie"}).Result()
if err != nil {
fmt.Println("ZAdd error:", err)
return
}
fmt.Println("Added elements:", added)
// 获取元素的分数
score, err := rdb.ZScore(ctx, key, "Bob").Result()
if err != nil {
fmt.Println("ZScore error:", err)
return
}
fmt.Println("Bob's score:", score)
// 增加元素的分数
newScore, err := rdb.ZIncrBy(ctx, key, 5, "Bob").Result()
if err != nil {
fmt.Println("ZIncrBy error:", err)
return
}
fmt.Println("Bob's new score:", newScore)
// 获取有序集合的长度
size, err := rdb.ZCard(ctx, key).Result()
if err != nil {
fmt.Println("ZCard error:", err)
return
}
fmt.Println("ZSet size:", size)
// 获取排名(从低到高)
rank, err := rdb.ZRank(ctx, key, "Bob").Result()
if err != nil {
fmt.Println("ZRank error:", err)
return
}
fmt.Println("Bob's rank (asc):", rank)
// 获取排名(从高到低)
revRank, err := rdb.ZRevRank(ctx, key, "Bob").Result()
if err != nil {
fmt.Println("ZRevRank error:", err)
return
}
fmt.Println("Bob's rank (desc):", revRank)
// 获取分数范围内的元素(升序)
rangeByScore, err := rdb.ZRangeByScore(ctx, key, &redis.ZRangeBy{
Min: "80",
Max: "95",
}).Result()
if err != nil {
fmt.Println("ZRangeByScore error:", err)
return
}
fmt.Println("Elements between 80-95:", rangeByScore)
// 获取前两名(降序)
topTwo, err := rdb.ZRevRange(ctx, key, 0, 1).Result()
if err != nil {
fmt.Println("ZRevRange error:", err)
return
}
fmt.Println("Top two:", topTwo)
}
4. 高级数据结构¶
4.1 Bitmap位图操作¶
Bitmap是一种特殊的字符串,用于高效存储和操作位数据,适合实现布隆过滤器、用户在线状态等功能。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
ctx := context.Background()
key := "user:login:20231001"
// 设置位(用户ID为10086的用户登录)
err := rdb.SetBit(ctx, key, 10086, 1).Err()
if err != nil {
fmt.Println("SetBit error:", err)
return
}
// 设置另一个位(用户ID为10010的用户登录)
err = rdb.SetBit(ctx, key, 10010, 1).Err()
if err != nil {
fmt.Println("SetBit error:", err)
return
}
// 检查位是否设置(用户是否登录)
isLogin, err := rdb.GetBit(ctx, key, 10086).Result()
if err != nil {
fmt.Println("GetBit error:", err)
return
}
fmt.Println("User 10086 logged in:", isLogin == 1)
// 计算当天登录用户总数
count, err := rdb.BitCount(ctx, key, &redis.BitCount{Start: 0, End: -1}).Result()
if err != nil {
fmt.Println("BitCount error:", err)
return
}
fmt.Println("Total logged in users:", count)
// 另一天的登录数据
key2 := "user:login:20231002"
rdb.SetBit(ctx, key2, 10086, 1)
rdb.SetBit(ctx, key2, 10000, 1)
// 计算两天都登录的用户数(交集)
destKey := "user:login:both_days"
err = rdb.BitOpAnd(ctx, destKey, key, key2).Err()
if err != nil {
fmt.Println("BitOpAnd error:", err)
return
}
bothDaysCount, err := rdb.BitCount(ctx, destKey, nil).Result()
if err != nil {
fmt.Println("BitCount error:", err)
return
}
fmt.Println("Users logged in both days:", bothDaysCount)
}
4.2 HyperLogLog基数统计¶
HyperLogLog是一种概率数据结构,用于高效估算集合的基数(元素个数),适合统计页面UV等场景。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
ctx := context.Background()
key := "page:uv:homepage"
// 添加访问用户ID
added, err := rdb.PFAdd(ctx, key, "user1", "user2", "user3").Result()
if err != nil {
fmt.Println("PFAdd error:", err)
return
}
fmt.Println("Added to HyperLogLog:", added)
// 再添加一些用户,包括重复的
rdb.PFAdd(ctx, key, "user3", "user4", "user5")
// 估算基数(独立用户数)
count, err := rdb.PFCount(ctx, key).Result()
if err != nil {
fmt.Println("PFCount error:", err)
return
}
fmt.Println("Estimated unique users:", count)
// 创建另一个HyperLogLog
key2 := "page:uv:productpage"
rdb.PFAdd(ctx, key2, "user3", "user5", "user6", "user7")
// 合并两个HyperLogLog
err = rdb.PFMerge(ctx, "page:uv:combined", key, key2).Err()
if err != nil {
fmt.Println("PFMerge error:", err)
return
}
// 估算合并后的基数
combinedCount, err := rdb.PFCount(ctx, "page:uv:combined").Result()
if err != nil {
fmt.Println("PFCount error:", err)
return
}
fmt.Println("Estimated unique users (combined):", combinedCount)
}
4.3 GEO地理位置功能¶
GEO功能用于存储和操作地理位置信息,适合实现附近的人、商家等功能。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
ctx := context.Background()
key := "restaurants"
// 添加地理位置信息(经度,纬度,名称)
added, err := rdb.GeoAdd(ctx, key, &redis.GeoLocation{
Longitude: 116.403874,
Latitude: 39.914885,
Name: "Restaurant A",
}, &redis.GeoLocation{
Longitude: 116.410088,
Latitude: 39.907404,
Name: "Restaurant B",
}, &redis.GeoLocation{
Longitude: 116.395645,
Latitude: 39.929986,
Name: "Restaurant C",
}).Result()
if err != nil {
fmt.Println("GeoAdd error:", err)
return
}
fmt.Println("Added locations:", added)
// 获取地理位置的经纬度
pos, err := rdb.GeoPos(ctx, key, "Restaurant A").Result()
if err != nil {
fmt.Println("GeoPos error:", err)
return
}
fmt.Println("Restaurant A position:", pos[0])
// 计算两个位置之间的距离(单位:米)
distance, err := rdb.GeoDist(ctx, key, "Restaurant A", "Restaurant B", "m").Result()
if err != nil {
fmt.Println("GeoDist error:", err)
return
}
fmt.Println("Distance between A and B (meters):", distance)
// 查找指定位置附近的元素
// 参数:经度,纬度,半径,单位
nearby, err := rdb.GeoRadius(ctx, key, 116.403874, 39.914885, &redis.GeoRadiusQuery{
Radius: 2000,
Unit: "m",
Count: 10,
Sort: "ASC",
}).Result()
if err != nil {
fmt.Println("GeoRadius error:", err)
return
}
fmt.Println("Restaurants within 2km:")
for _, place := range nearby {
fmt.Printf("- %s: %.2f meters\n", place.Name, place.Distance)
}
}
5. Redis连接池优化¶
连接池是高效使用Redis连接的关键,合理配置连接池可以显著提升性能。
5.1 连接池参数配置¶
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"time"
)
func main() {
// 优化的连接池配置
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
// 连接池配置
PoolSize: 100, // 连接池最大连接数,根据并发量调整
MinIdleConns: 20, // 最小空闲连接数,保持一定数量的热连接
MaxConnAge: 30 * time.Minute, // 连接的最大生存期,避免长时间连接
IdleTimeout: 5 * time.Minute, // 连接的最大空闲时间
ReadTimeout: 3 * time.Second, // 读超时
WriteTimeout: 3 * time.Second, // 写超时
PoolTimeout: 5 * time.Second, // 获取连接的超时时间
// 重试策略
MaxRetries: 3, // 最大重试次数
MinRetryBackoff: 100 * time.Millisecond, // 最小重试间隔
MaxRetryBackoff: 1 * time.Second, // 最大重试间隔
})
ctx := context.Background()
_, err := rdb.Ping(ctx).Result()
if err != nil {
fmt.Println("Connection error:", err)
return
}
fmt.Println("Connected with optimized pool settings")
// 打印连接池状态
printPoolStats(rdb.PoolStats())
}
// 打印连接池状态
func printPoolStats(stats *redis.PoolStats) {
fmt.Println("\nConnection Pool Stats:")
fmt.Printf("Total connections: %d\n", stats.TotalConns)
fmt.Printf("Idle connections: %d\n", stats.IdleConns)
fmt.Printf("Stale connections: %d\n", stats.StaleConns)
}
5.2 连接复用策略¶
连接复用是提升Redis性能的关键,以下是一些最佳实践:
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"sync"
)
// 全局Redis客户端实例,确保单例
var (
rdb *redis.Client
once sync.Once
)
// 获取Redis客户端实例(单例模式)
func getRedisClient() *redis.Client {
once.Do(func() {
rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
PoolSize: 50,
MinIdleConns: 10,
IdleTimeout: 5 * 60 * time.Second,
})
})
return rdb
}
// 封装Redis操作,确保连接正确使用
func redisOperation(ctx context.Context, fn func(*redis.Client) error) error {
client := getRedisClient()
return fn(client)
}
func main() {
ctx := context.Background()
// 使用封装的方法进行Redis操作
err := redisOperation(ctx, func(client *redis.Client) error {
return client.Set(ctx, "example", "connection_pool", 0).Err()
})
if err != nil {
fmt.Println("Redis operation error:", err)
return
}
val, err := redisOperationWithResult(ctx, func(client *redis.Client) (interface{}, error) {
return client.Get(ctx, "example").Result()
})
if err != nil {
fmt.Println("Redis get error:", err)
return
}
fmt.Println("Value:", val)
}
// 带返回值的Redis操作封装
func redisOperationWithResult(ctx context.Context, fn func(*redis.Client) (interface{}, error)) (interface{}, error) {
client := getRedisClient()
return fn(client)
}
5.3 性能监控与调优¶
监控Redis连接池性能并进行调优:
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"time"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
PoolSize: 50,
MinIdleConns: 10,
})
ctx := context.Background()
// 定期打印连接池状态
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
printPoolStats(rdb.PoolStats())
}
}
}()
// 模拟负载测试
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
rdb.Incr(ctx, "counter").Err()
time.Sleep(time.Microsecond * 100)
}
}()
}
wg.Wait()
fmt.Println("Load test completed")
}
func printPoolStats(stats *redis.PoolStats) {
fmt.Printf("\nPool stats at %s:\n", time.Now().Format(time.RFC3339))
fmt.Printf("Total connections: %d\n", stats.TotalConns)
fmt.Printf("Idle connections: %d\n", stats.IdleConns)
fmt.Printf("Stale connections: %d\n", stats.StaleConns)
}
调优建议: 1. 根据并发量调整PoolSize,通常设置为CPU核心数的5-10倍 2. MinIdleConns设置为预期的稳定并发量,避免频繁创建新连接 3. 监控StaleConns数量,如果持续增长,可能是网络问题或Redis服务器不稳定 4. 观察IdleConns,如果长期为0,说明连接池太小 5. 如果获取连接经常超时,需要增大PoolSize或PoolTimeout
6. 管道与事务¶
6.1 Pipeline批量操作¶
Pipeline用于批量执行多个命令,减少网络往返次数,提高性能。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"time"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
ctx := context.Background()
// 普通方式执行多个命令
start := time.Now()
for i := 0; i < 1000; i++ {
key := fmt.Sprintf("key%d", i)
rdb.Set(ctx, key, i, 0)
}
fmt.Printf("Normal execution time: %v\n", time.Since(start))
// 使用Pipeline批量执行
start = time.Now()
pipe := rdb.Pipeline()
for i := 1000; i < 2000; i++ {
key := fmt.Sprintf("key%d", i)
pipe.Set(ctx, key, i, 0)
}
// 执行Pipeline中的所有命令
_, err := pipe.Exec(ctx)
if err != nil {
fmt.Println("Pipeline error:", err)
return
}
fmt.Printf("Pipeline execution time: %v\n", time.Since(start))
// 读取Pipeline执行结果
pipe = rdb.Pipeline()
pipe.Get(ctx, "key500")
pipe.Get(ctx, "key1500")
results, err := pipe.Exec(ctx)
if err != nil {
fmt.Println("Pipeline get error:", err)
return
}
for _, result := range results {
val, _ := result.(*redis.StringCmd).Result()
fmt.Println("Value:", val)
}
}
6.2 事务操作实现¶
Redis事务可以确保一组命令原子性执行,要么全部执行,要么全部不执行。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
ctx := context.Background()
// 清除测试数据
rdb.Del(ctx, "balance", "debt")
// 开启事务
tx := rdb.TxPipeline()
// 事务中的命令
tx.IncrBy(ctx, "balance", 100)
tx.IncrBy(ctx, "debt", 50)
// 执行事务
_, err := tx.Exec(ctx)
if err != nil {
fmt.Println("Transaction error:", err)
return
}
// 查看结果
balance, _ := rdb.Get(ctx, "balance").Int64()
debt, _ := rdb.Get(ctx, "debt").Int64()
fmt.Println("Balance:", balance)
fmt.Println("Debt:", debt)
// 使用Watch实现乐观锁
// 监控balance键,如果在事务执行前被修改,事务会失败
err = rdb.Watch(ctx, func(tx *redis.Tx) error {
// 获取当前值
val, err := tx.Get(ctx, "balance").Int64()
if err != nil && err != redis.Nil {
return err
}
// 准备事务
pipe := tx.Pipeline()
pipe.Set(ctx, "balance", val+100, 0)
// 执行事务
_, err = pipe.Exec(ctx)
return err
}, "balance") // 监控的键
if err != nil {
fmt.Println("Watch transaction error:", err)
return
}
newBalance, _ := rdb.Get(ctx, "balance").Int64()
fmt.Println("New balance after watch transaction:", newBalance)
}
6.3 Lua脚本应用¶
Lua脚本可以在Redis服务器端原子性地执行多个命令,适合实现复杂的业务逻辑。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
// 定义Lua脚本:转账操作
const transferScript = `
-- 检查余额是否充足
local balance = tonumber(redis.call('get', KEYS[1]) or "0")
local amount = tonumber(ARGV[1])
if balance >= amount then
-- 扣减余额
redis.call('decrby', KEYS[1], amount)
-- 增加目标账户余额
redis.call('incrby', KEYS[2], amount)
return 1 -- 成功
else
return 0 -- 失败
end
`
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
ctx := context.Background()
// 初始化账户余额
rdb.Set(ctx, "account:1001", 1000, 0)
rdb.Set(ctx, "account:1002", 500, 0)
// 加载Lua脚本并获取SHA1哈希
script := redis.NewScript(transferScript)
sha, err := script.Load(ctx, rdb).Result()
if err != nil {
fmt.Println("Load script error:", err)
return
}
fmt.Println("Script SHA:", sha)
// 执行Lua脚本
// KEYS: 源账户,目标账户
// ARGV: 转账金额
result, err := rdb.EvalSha(ctx, sha, []string{"account:1001", "account:1002"}, 300).Int64()
if err != nil {
fmt.Println("Eval script error:", err)
return
}
if result == 1 {
fmt.Println("Transfer successful")
} else {
fmt.Println("Transfer failed: insufficient balance")
}
// 查看转账后余额
acc1, _ := rdb.Get(ctx, "account:1001").Int64()
acc2, _ := rdb.Get(ctx, "account:1002").Int64()
fmt.Println("Account 1001 balance:", acc1)
fmt.Println("Account 1002 balance:", acc2)
// 尝试转账超出余额的金额
result, err = rdb.EvalSha(ctx, sha, []string{"account:1001", "account:1002"}, 2000).Int64()
if err != nil {
fmt.Println("Eval script error:", err)
return
}
if result == 1 {
fmt.Println("Second transfer successful")
} else {
fmt.Println("Second transfer failed: insufficient balance")
}
}
实战练习¶
练习1:用户会话管理系统¶
实现一个基于Redis的用户会话管理系统,支持会话创建、验证、更新和销毁。
package main
import (
"context"
"crypto/rand"
"encoding/base64"
"fmt"
"github.com/go-redis/redis/v8"
"time"
)
// SessionManager 会话管理器
type SessionManager struct {
rdb *redis.Client
prefix string // 键前缀
expiration time.Duration // 会话过期时间
}
// NewSessionManager 创建新的会话管理器
func NewSessionManager(addr string, prefix string, expiration time.Duration) *SessionManager {
rdb := redis.NewClient(&redis.Options{
Addr: addr,
})
return &SessionManager{
rdb: rdb,
prefix: prefix,
expiration: expiration,
}
}
// generateSessionID 生成随机会话ID
func (sm *SessionManager) generateSessionID() string {
b := make([]byte, 32)
rand.Read(b)
return base64.URLEncoding.EncodeToString(b)
}
// CreateSession 创建新会话
func (sm *SessionManager) CreateSession(ctx context.Context, userID string, data map[string]interface{}) (string, error) {
sessionID := sm.generateSessionID()
key := fmt.Sprintf("%s:%s", sm.prefix, sessionID)
// 使用事务设置会话数据和过期时间
tx := sm.rdb.TxPipeline()
// 设置用户ID
tx.HSet(ctx, key, "user_id", userID)
// 设置额外数据
for k, v := range data {
tx.HSet(ctx, key, k, v)
}
// 设置过期时间
tx.Expire(ctx, key, sm.expiration)
// 执行事务
_, err := tx.Exec(ctx)
if err != nil {
return "", err
}
return sessionID, nil
}
// GetSession 获取会话数据
func (sm *SessionManager) GetSession(ctx context.Context, sessionID string) (map[string]string, error) {
key := fmt.Sprintf("%s:%s", sm.prefix, sessionID)
// 获取所有会话数据
data, err := sm.rdb.HGetAll(ctx, key).Result()
if err != nil {
return nil, err
}
// 如果会话不存在
if len(data) == 0 {
return nil, fmt.Errorf("session not found")
}
// 延长会话有效期
sm.rdb.Expire(ctx, key, sm.expiration)
return data, nil
}
// UpdateSession 更新会话数据
func (sm *SessionManager) UpdateSession(ctx context.Context, sessionID string, data map[string]interface{}) error {
key := fmt.Sprintf("%s:%s", sm.prefix, sessionID)
// 检查会话是否存在
exists, err := sm.rdb.Exists(ctx, key).Result()
if err != nil {
return err
}
if exists == 0 {
return fmt.Errorf("session not found")
}
// 更新会话数据
tx := sm.rdb.TxPipeline()
for k, v := range data {
tx.HSet(ctx, key, k, v)
}
tx.Expire(ctx, key, sm.expiration)
_, err = tx.Exec(ctx)
return err
}
// DestroySession 销毁会话
func (sm *SessionManager) DestroySession(ctx context.Context, sessionID string) error {
key := fmt.Sprintf("%s:%s", sm.prefix, sessionID)
return sm.rdb.Del(ctx, key).Err()
}
// GetUserSessions 获取用户的所有会话
func (sm *SessionManager) GetUserSessions(ctx context.Context, userID string) ([]string, error) {
// 注意:在实际应用中,可能需要维护用户与会话的映射关系
// 这里简化处理,实际应用中应考虑使用Set存储用户的所有会话ID
return []string{}, nil
}
func main() {
// 创建会话管理器,会话有效期30分钟
sm := NewSessionManager("localhost:6379", "session", 30*time.Minute)
ctx := context.Background()
// 创建新会话
sessionID, err := sm.CreateSession(ctx, "user123", map[string]interface{}{
"username": "john_doe",
"email": "john@example.com",
"role": "user",
})
if err != nil {
fmt.Println("Create session error:", err)
return
}
fmt.Println("Created session ID:", sessionID)
// 获取会话数据
sessionData, err := sm.GetSession(ctx, sessionID)
if err != nil {
fmt.Println("Get session error:", err)
return
}
fmt.Println("Session data:", sessionData)
// 更新会话数据
err = sm.UpdateSession(ctx, sessionID, map[string]interface{}{
"role": "admin",
})
if err != nil {
fmt.Println("Update session error:", err)
return
}
// 再次获取会话数据,验证更新
updatedData, err := sm.GetSession(ctx, sessionID)
if err != nil {
fmt.Println("Get updated session error:", err)
return
}
fmt.Println("Updated session data:", updatedData)
// 销毁会话
err = sm.DestroySession(ctx, sessionID)
if err != nil {
fmt.Println("Destroy session error:", err)
return
}
fmt.Println("Session destroyed")
// 验证会话已销毁
_, err = sm.GetSession(ctx, sessionID)
if err != nil {
fmt.Println("Expected error after destruction:", err)
}
}
练习2:实时排行榜实现¶
使用Redis的ZSet实现一个实时排行榜系统,支持添加分数、查询排名和获取排行榜。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"time"
)
// Leaderboard 排行榜
type Leaderboard struct {
rdb *redis.Client
key string // 存储排行榜的键名
}
// NewLeaderboard 创建新的排行榜
func NewLeaderboard(addr string, key string) *Leaderboard {
rdb := redis.NewClient(&redis.Options{
Addr: addr,
})
return &Leaderboard{
rdb: rdb,
key: key,
}
}
// AddScore 添加或更新用户分数
func (lb *Leaderboard) AddScore(ctx context.Context, userID string, score float64) error {
return lb.rdb.ZAdd(ctx, lb.key, &redis.Z{
Score: score,
Member: userID,
}).Err()
}
// IncrScore 增加用户分数
func (lb *Leaderboard) IncrScore(ctx context.Context, userID string, increment float64) (float64, error) {
return lb.rdb.ZIncrBy(ctx, lb.key, increment, userID).Result()
}
// GetRank 获取用户排名(从1开始,分数高的排名靠前)
func (lb *Leaderboard) GetRank(ctx context.Context, userID string) (int64, error) {
return lb.rdb.ZRevRank(ctx, lb.key, userID).Result() + 1
}
// GetScore 获取用户分数
func (lb *Leaderboard) GetScore(ctx context.Context, userID string) (float64, error) {
return lb.rdb.ZScore(ctx, lb.key, userID).Result()
}
// GetTopN 获取排名前N的用户
func (lb *Leaderboard) GetTopN(ctx context.Context, n int64) ([]*redis.Z, error) {
return lb.rdb.ZRevRangeWithScores(ctx, lb.key, 0, n-1).Result()
}
// GetUserAround 获取用户周围的用户(包括自己)
func (lb *Leaderboard) GetUserAround(ctx context.Context, userID string, count int64) ([]*redis.Z, error) {
rank, err := lb.rdb.ZRevRank(ctx, lb.key, userID).Result()
if err != nil {
return nil, err
}
start := rank - count
if start < 0 {
start = 0
}
end := rank + count
return lb.rdb.ZRevRangeWithScores(ctx, lb.key, start, end).Result()
}
// GetTotalUsers 获取总用户数
func (lb *Leaderboard) GetTotalUsers(ctx context.Context) (int64, error) {
return lb.rdb.ZCard(ctx, lb.key).Result()
}
func main() {
// 创建排行榜实例
lb := NewLeaderboard("localhost:6379", "game:leaderboard")
ctx := context.Background()
// 清除测试数据
lb.rdb.Del(ctx, lb.key)
// 添加一些分数
users := []struct {
id string
score float64
}{
{"user1", 850},
{"user2", 920},
{"user3", 780},
{"user4", 950},
{"user5", 880},
}
for _, u := range users {
err := lb.AddScore(ctx, u.id, u.score)
if err != nil {
fmt.Printf("Error adding score for %s: %v\n", u.id, err)
}
}
// 增加某个用户的分数
newScore, err := lb.IncrScore(ctx, "user3", 100)
if err != nil {
fmt.Println("Error incrementing score:", err)
} else {
fmt.Printf("User3's new score: %.0f\n", newScore)
}
// 获取用户排名
rank, err := lb.GetRank(ctx, "user2")
if err != nil {
fmt.Println("Error getting rank:", err)
} else {
fmt.Printf("User2's rank: %d\n", rank)
}
// 获取前三名
top3, err := lb.GetTopN(ctx, 3)
if err != nil {
fmt.Println("Error getting top 3:", err)
} else {
fmt.Println("Top 3 users:")
for i, z := range top3 {
fmt.Printf("%d. %s - %.0f\n", i+1, z.Member, z.Score)
}
}
// 获取user2周围的用户
around, err := lb.GetUserAround(ctx, "user2", 1)
if err != nil {
fmt.Println("Error getting around users:", err)
} else {
fmt.Println("\nUsers around user2:")
for _, z := range around {
rank, _ := lb.GetRank(ctx, z.Member.(string))
fmt.Printf("Rank %d: %s - %.0f\n", rank, z.Member, z.Score)
}
}
// 获取总用户数
total, err := lb.GetTotalUsers(ctx)
if err != nil {
fmt.Println("Error getting total users:", err)
} else {
fmt.Printf("\nTotal users in leaderboard: %d\n", total)
}
}
练习3:分布式锁的Redis实现¶
实现一个基于Redis的分布式锁,用于在分布式系统中保证资源的互斥访问。
package main
import (
"context"
"crypto/rand"
"encoding/base64"
"fmt"
"github.com/go-redis/redis/v8"
"sync"
"time"
)
// RedisLock 基于Redis的分布式锁
type RedisLock struct {
rdb *redis.Client
lockKey string // 锁的键名
lockValue string // 锁的值,用于唯一标识锁的持有者
expiration time.Duration // 锁的过期时间,防止死锁
}
// NewRedisLock 创建新的分布式锁实例
func NewRedisLock(addr string, lockKey string, expiration time.Duration) *RedisLock {
rdb := redis.NewClient(&redis.Options{
Addr: addr,
})
return &RedisLock{
rdb: rdb,
lockKey: lockKey,
lockValue: generateRandomValue(),
expiration: expiration,
}
}
// generateRandomValue 生成随机值,用于唯一标识锁的持有者
func generateRandomValue() string {
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
panic(err)
}
return base64.URLEncoding.EncodeToString(b)
}
// Lock 获取锁
// blocking表示是否阻塞等待,timeout为等待超时时间
func (rl *RedisLock) Lock(ctx context.Context, blocking bool, timeout time.Duration) (bool, error) {
startTime := time.Now()
for {
// 尝试获取锁
// 使用SET NX (Not Exist)命令,只有当键不存在时才设置成功
// 同时设置过期时间,防止死锁
success, err := rl.rdb.SetNX(ctx, rl.lockKey, rl.lockValue, rl.expiration).Result()
if err != nil {
return false, err
}
if success {
// 获取锁成功
return true, nil
}
// 如果不阻塞,直接返回失败
if !blocking {
return false, nil
}
// 检查是否超时
if time.Since(startTime) >= timeout {
return false, fmt.Errorf("timeout waiting for lock")
}
// 短暂休眠后重试
time.Sleep(100 * time.Millisecond)
}
}
// Unlock 释放锁
// 使用Lua脚本确保释放锁的原子性,只有持有锁的客户端才能释放锁
func (rl *RedisLock) Unlock(ctx context.Context) (bool, error) {
// Lua脚本:检查锁的值是否匹配,如果匹配则删除锁
script := redis.NewScript(`
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
`)
result, err := script.Run(ctx, rl.rdb, []string{rl.lockKey}, rl.lockValue).Int64()
if err != nil {
return false, err
}
return result == 1, nil
}
// Renew 延长锁的有效期
func (rl *RedisLock) Renew(ctx context.Context) (bool, error) {
// Lua脚本:检查锁的值是否匹配,如果匹配则延长过期时间
script := redis.NewScript(`
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('expire', KEYS[1], ARGV[2])
else
return 0
end
`)
result, err := script.Run(ctx, rl.rdb, []string{rl.lockKey},
rl.lockValue, int(rl.expiration.Seconds())).Int64()
if err != nil {
return false, err
}
return result == 1, nil
}
// 模拟需要加锁的资源操作
func accessResource(lock *RedisLock, clientID int) {
ctx := context.Background()
fmt.Printf("Client %d trying to acquire lock...\n", clientID)
// 尝试获取锁,最多等待5秒
acquired, err := lock.Lock(ctx, true, 5*time.Second)
if err != nil {
fmt.Printf("Client %d error acquiring lock: %v\n", clientID, err)
return
}
if !acquired {
fmt.Printf("Client %d failed to acquire lock\n", clientID)
return
}
defer func() {
// 释放锁
released, err := lock.Unlock(ctx)
if err != nil {
fmt.Printf("Client %d error releasing lock: %v\n", clientID, err)
} else if released {
fmt.Printf("Client %d released lock\n", clientID)
} else {
fmt.Printf("Client %d failed to release lock (not owner)\n", clientID)
}
}()
fmt.Printf("Client %d acquired lock, accessing resource...\n", clientID)
// 模拟资源操作
time.Sleep(2 * time.Second)
fmt.Printf("Client %d finished accessing resource\n", clientID)
}
func main() {
// 创建分布式锁实例,锁过期时间为5秒
lock := NewRedisLock("localhost:6379", "resource:lock", 5*time.Second)
// 模拟多个客户端同时竞争锁
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(clientID int) {
defer wg.Done()
accessResource(lock, clientID)
}(i)
}
wg.Wait()
fmt.Println("All clients finished")
}
以上就是Redis集成与基础操作的完整教程,涵盖了客户端选择、连接管理、数据类型操作、高级数据结构、连接池优化以及管道与事务等内容,并通过三个实战练习展示了Redis在实际项目中的应用。掌握这些知识,可以帮助你在Go语言开发中高效地使用Redis。