8.7 服务治理与容错机制¶
大纲评估与调整¶
您提供的大纲整体结构清晰,内容全面,涵盖了服务治理与容错机制的核心知识点。从基础概念到具体实现,从单一机制到综合应用,层次分明,符合学习认知规律。
调整建议: 1. 在"服务治理概述"中增加"微服务架构面临的挑战"作为引入 2. 在"负载均衡策略"前增加"服务注册与发现"小节,使内容更连贯 3. 增加"综合实战案例"部分,展示多种机制协同工作
调整后的大纲保持了原有的核心内容,增加了知识的连贯性和实践性。
教程内容¶
1. 服务治理概述¶
1.1 微服务架构面临的挑战¶
随着业务发展,单体应用逐渐拆分为多个微服务,随之带来了新的挑战: - 服务间依赖关系复杂,故障传播风险增加 - 服务实例动态扩缩容,网络通信不确定性提高 - 流量波动大,系统稳定性面临考验 - 跨服务问题排查难度增加
这些挑战催生了服务治理的需求,旨在通过一系列技术手段保障微服务架构的稳定运行。
1.2 服务治理的定义与目标¶
服务治理是指对微服务架构中的服务进行全面管理和控制的一系列机制和策略,核心目标包括:
- 保障系统高可用,减少故障影响范围
- 优化资源利用,提高系统吞吐量
- 提供可观测性,便于问题排查和优化
- 实现服务弹性伸缩,应对流量变化
- 确保服务间通信的可靠性和安全性
1.3 服务治理的核心组件¶
一个完整的服务治理体系包含以下核心组件:
- 服务注册与发现:维护服务地址信息,实现服务动态发现
- 配置中心:集中管理服务配置,支持动态配置更新
- 熔断器:防止故障级联传播,实现快速失败
- 限流器:控制流量,保护系统免受流量峰值冲击
- 负载均衡:均衡服务实例负载,提高资源利用率
- 监控告警:实时监控系统状态,及时发现和预警问题
- 追踪系统:跟踪分布式请求链路,便于问题定位
1.4 治理策略的分类¶
服务治理策略可分为以下几类:
- 预防性策略:如限流、超时控制,防止系统被压垮
- 容错性策略:如熔断器、重试机制,提高系统容错能力
- 恢复性策略:如服务降级、故障转移,帮助系统快速恢复
- 优化性策略:如负载均衡、动态扩缩容,优化系统性能
1.5 治理与监控的关系¶
监控是服务治理的基础,治理是监控的延伸应用:
- 监控收集系统运行数据,为治理策略提供决策依据
- 治理策略根据监控数据自动调整系统行为
- 治理效果通过监控数据验证和优化
- 监控告警触发手动治理干预
2. 服务注册与发现¶
在讲解具体的容错机制前,我们先了解服务注册与发现,这是服务治理的基础组件。
2.1 服务注册与发现原理¶
服务注册与发现解决了微服务架构中服务地址动态变化的问题,核心流程包括:
- 服务启动时,将自身地址注册到注册中心
- 服务下线时,从注册中心注销
- 客户端从注册中心获取服务地址列表
- 注册中心维护服务健康状态,剔除不健康实例
2.2 Go语言实现简单服务注册中心¶
下面是一个基于HTTP的简单服务注册中心实现:
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
)
// Service 表示一个服务实例
type Service struct {
Name string `json:"name"`
Address string `json:"address"`
Status string `json:"status"` // healthy/unhealthy
LastTTL time.Time `json:"last_ttl"`
}
// Registry 服务注册表
type Registry struct {
services map[string][]Service // key: 服务名, value: 服务实例列表
mu sync.RWMutex
}
func NewRegistry() *Registry {
return &Registry{
services: make(map[string][]Service),
}
}
// Register 注册服务
func (r *Registry) Register(service Service) {
r.mu.Lock()
defer r.mu.Unlock()
service.Status = "healthy"
service.LastTTL = time.Now().Add(30 * time.Second) // TTL 30秒
r.services[service.Name] = append(r.services[service.Name], service)
}
// Unregister 注销服务
func (r *Registry) Unregister(name, address string) {
r.mu.Lock()
defer r.mu.Unlock()
if instances, ok := r.services[name]; ok {
newInstances := []Service{}
for _, instance := range instances {
if instance.Address != address {
newInstances = append(newInstances, instance)
}
}
r.services[name] = newInstances
}
}
// GetServices 获取服务实例列表
func (r *Registry) GetServices(name string) []Service {
r.mu.RLock()
defer r.mu.RUnlock()
return r.services[name]
}
// Renew 续约服务
func (r *Registry) Renew(name, address string) bool {
r.mu.Lock()
defer r.mu.Unlock()
if instances, ok := r.services[name]; ok {
for i, instance := range instances {
if instance.Address == address {
r.services[name][i].LastTTL = time.Now().Add(30 * time.Second)
r.services[name][i].Status = "healthy"
return true
}
}
}
return false
}
// 清理过期服务
func (r *Registry) cleanupExpiredServices() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.mu.Lock()
for name, instances := range r.services {
healthyInstances := []Service{}
for _, instance := range instances {
if time.Now().Before(instance.LastTTL) {
healthyInstances = append(healthyInstances, instance)
}
}
r.services[name] = healthyInstances
}
r.mu.Unlock()
}
}
}
func main() {
registry := NewRegistry()
// 启动清理过期服务的协程
go registry.cleanupExpiredServices()
// 注册服务
http.HandleFunc("/register", func(w http.ResponseWriter, r *http.Request) {
var service Service
if err := json.NewDecoder(r.Body).Decode(&service); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
registry.Register(service)
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Service %s registered", service.Name)
})
// 注销服务
http.HandleFunc("/unregister", func(w http.ResponseWriter, r *http.Request) {
name := r.URL.Query().Get("name")
address := r.URL.Query().Get("address")
registry.Unregister(name, address)
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Service %s unregistered", name)
})
// 获取服务
http.HandleFunc("/services", func(w http.ResponseWriter, r *http.Request) {
name := r.URL.Query().Get("name")
services := registry.GetServices(name)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(services)
})
// 服务续约
http.HandleFunc("/renew", func(w http.ResponseWriter, r *http.Request) {
name := r.URL.Query().Get("name")
address := r.URL.Query().Get("address")
success := registry.Renew(name, address)
if success {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Service %s renewed", name)
} else {
http.Error(w, "Service not found", http.StatusNotFound)
}
})
log.Println("Registry server starting on :8500")
log.Fatal(http.ListenAndServe(":8500", nil))
}
3. 熔断器模式¶
3.1 熔断器的工作原理¶
熔断器模式是一种防止故障级联传播的设计模式,其核心思想是:当服务调用失败率达到阈值时,熔断器打开,阻止后续调用继续访问故障服务,从而保护系统资源并允许故障服务恢复。
熔断器就像电路中的保险丝,当检测到异常时自动"熔断",防止故障扩大。
3.2 熔断器状态转换¶
熔断器通常有三种状态:
- 关闭(Closed):正常状态,允许请求通过,记录失败次数
- 打开(Open):熔断状态,拒绝所有请求,经过一段时间后进入半打开状态
- 半打开(Half-Open):试探状态,允许部分请求通过,验证服务是否恢复
状态转换规则: - 关闭 → 打开:失败率达到预设阈值 - 打开 → 半打开:经过预设的恢复期 - 半打开 → 关闭:请求成功率达到阈值 - 半打开 → 打开:请求失败率仍高于阈值
3.3 熔断策略配置¶
熔断器的核心配置参数包括:
- 失败阈值:触发熔断的失败率或失败次数
- 恢复期:熔断器打开状态持续的时间
- 探测样本数:计算失败率的请求样本数量
- 半打开状态允许的请求数:用于验证服务是否恢复
合理配置这些参数需要根据服务特性和业务需求进行调整。
3.4 Go语言熔断器实现¶
下面是一个简单的熔断器实现:
package main
import (
"errors"
"fmt"
"sync"
"time"
)
// State 熔断器状态
type State int
const (
StateClosed State = iota
StateOpen
StateHalfOpen
)
// CircuitBreaker 熔断器结构
type CircuitBreaker struct {
state State
failureThreshold int // 失败阈值
successThreshold int // 成功阈值
timeout time.Duration // 熔断超时时间
resetTimeout time.Duration // 重置超时时间
lastFailureTime time.Time // 最后一次失败时间
failureCount int // 失败计数
consecutiveSuccesses int // 连续成功计数
mutex sync.Mutex
}
// NewCircuitBreaker 创建熔断器
func NewCircuitBreaker(failureThreshold, successThreshold int, timeout, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
state: StateClosed,
failureThreshold: failureThreshold,
successThreshold: successThreshold,
timeout: timeout,
resetTimeout: resetTimeout,
}
}
// State 获取当前状态
func (cb *CircuitBreaker) State() State {
cb.mutex.Lock()
defer cb.mutex.Unlock()
return cb.state
}
// Execute 执行函数,应用熔断策略
func (cb *CircuitBreaker) Execute(fn func() error) error {
cb.mutex.Lock()
// 检查状态并可能转换状态
switch cb.state {
case StateOpen:
// 如果已过重置时间,进入半打开状态
if time.Since(cb.lastFailureTime) > cb.resetTimeout {
cb.state = StateHalfOpen
} else {
cb.mutex.Unlock()
return errors.New("circuit breaker is open")
}
case StateClosed:
// 状态不变
case StateHalfOpen:
// 状态不变
}
currentState := cb.state
cb.mutex.Unlock()
// 执行函数并设置超时
done := make(chan error, 1)
go func() {
done <- fn()
}()
select {
case err := <-done:
cb.handleResult(err)
return err
case <-time.After(cb.timeout):
cb.handleResult(errors.New("timeout"))
return errors.New("execution timeout")
}
}
// 处理执行结果,更新熔断器状态
func (cb *CircuitBreaker) handleResult(err error) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
if err != nil {
// 处理失败情况
cb.failureCount++
cb.lastFailureTime = time.Now()
cb.consecutiveSuccesses = 0
switch cb.state {
case StateClosed:
// 如果失败次数达到阈值,进入打开状态
if cb.failureCount >= cb.failureThreshold {
cb.state = StateOpen
}
case StateHalfOpen:
// 半打开状态下任何失败都回到打开状态
cb.state = StateOpen
}
} else {
// 处理成功情况
cb.failureCount = 0
cb.consecutiveSuccesses++
switch cb.state {
case StateHalfOpen:
// 如果连续成功次数达到阈值,进入关闭状态
if cb.consecutiveSuccesses >= cb.successThreshold {
cb.state = StateClosed
cb.consecutiveSuccesses = 0
}
}
}
}
// 示例使用
func main() {
// 创建熔断器:失败3次后熔断,5秒后尝试恢复,成功2次后关闭熔断,超时1秒
cb := NewCircuitBreaker(3, 2, 1*time.Second, 5*time.Second)
// 模拟一个可能失败的函数
failureRate := 0.7 // 70%的失败率
attempts := 20
for i := 0; i < attempts; i++ {
err := cb.Execute(func() error {
// 模拟随机失败
if rand.Float64() < failureRate {
return errors.New("operation failed")
}
return nil
})
state := ""
switch cb.State() {
case StateClosed:
state = "Closed"
case StateOpen:
state = "Open"
case StateHalfOpen:
state = "Half-Open"
}
result := "Success"
if err != nil {
result = "Failed: " + err.Error()
}
fmt.Printf("Attempt %d: %-10s State: %s\n", i+1, result, state)
time.Sleep(500 * time.Millisecond)
}
}
3.5 hystrix-go实战应用¶
hystrix-go是Go语言中流行的熔断器库,基于Netflix的Hystrix实现。
首先安装hystrix-go:
使用示例:
package main
import (
"errors"
"fmt"
"net/http"
"time"
"github.com/afex/hystrix-go/hystrix"
)
func main() {
// 配置熔断器
hystrix.ConfigureCommand("example_command", hystrix.CommandConfig{
Timeout: 1000, // 超时时间(毫秒)
MaxConcurrentRequests: 10, // 最大并发请求数
ErrorPercentThreshold: 50, // 错误百分比阈值
SleepWindow: 5000, // 熔断后多久尝试半打开(毫秒)
RequestVolumeThreshold: 5, // 统计错误率的请求数阈值
})
// 注册一个简单的HTTP处理函数来演示熔断器
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// 使用熔断器执行可能失败的操作
err := hystrix.Do("example_command", func() error {
// 模拟一个可能失败的服务调用
return callExternalService()
}, func(err error) error {
// 降级函数:当熔断器打开或执行失败时调用
fmt.Println("Fallback triggered:", err)
w.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprintf(w, "Service temporarily unavailable, please try again later")
return nil
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Error: %v", err)
} else {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Successfully processed request")
}
})
fmt.Println("Server starting on :8080")
http.ListenAndServe(":8080", nil)
}
// 模拟调用外部服务,有50%的失败概率
func callExternalService() error {
// 模拟网络延迟
time.Sleep(200 * time.Millisecond)
// 50%的概率返回错误
if time.Now().UnixNano()%2 == 0 {
return errors.New("external service error")
}
return nil
}
4. 限流与流量控制¶
4.1 限流算法原理¶
限流是控制系统输入流量的机制,防止系统因流量过大而崩溃。常见的限流算法有:
- 固定窗口:将时间划分为固定大小的窗口,限制每个窗口内的请求数
- 滑动窗口:将固定窗口划分为更小的区间,平滑限流效果
- 令牌桶:匀速生成令牌,请求需要获取令牌才能被处理
- 漏桶:请求先进入缓冲区,再匀速处理,平滑突发流量
每种算法都有其适用场景,选择时需考虑系统特性和流量模式。
4.2 令牌桶算法实现¶
令牌桶算法是一种灵活的限流算法,能够应对突发流量。其核心思想是:
- 系统以固定速率向桶中添加令牌
- 每个请求需要从桶中获取一个令牌才能被处理
- 桶有最大容量,令牌满了之后不再添加
- 没有令牌时,请求被限流
package main
import (
"errors"
"fmt"
"sync"
"time"
)
// TokenBucket 令牌桶
type TokenBucket struct {
capacity int // 桶容量
rate int // 令牌生成速率(个/秒)
tokens int // 当前令牌数
lastRefill time.Time // 最后一次填充令牌的时间
mutex sync.Mutex
}
// NewTokenBucket 创建令牌桶
func NewTokenBucket(capacity, rate int) *TokenBucket {
return &TokenBucket{
capacity: capacity,
rate: rate,
tokens: capacity, // 初始填满令牌
lastRefill: time.Now(),
}
}
// refill 填充令牌
func (tb *TokenBucket) refill() {
now := time.Now()
elapsed := now.Sub(tb.lastRefill)
// 计算这段时间应该生成的令牌数
newTokens := int(elapsed.Seconds()) * tb.rate
if newTokens > 0 {
tb.tokens = tb.tokens + newTokens
// 不能超过桶容量
if tb.tokens > tb.capacity {
tb.tokens = tb.capacity
}
tb.lastRefill = now
}
}
// Take 获取令牌
func (tb *TokenBucket) Take() error {
tb.mutex.Lock()
defer tb.mutex.Unlock()
tb.refill()
if tb.tokens > 0 {
tb.tokens--
return nil
}
return errors.New("no tokens available")
}
// TakeN 获取多个令牌
func (tb *TokenBucket) TakeN(n int) error {
tb.mutex.Lock()
defer tb.mutex.Unlock()
tb.refill()
if tb.tokens >= n {
tb.tokens -= n
return nil
}
return errors.New("not enough tokens available")
}
// Available 获取当前可用令牌数
func (tb *TokenBucket) Available() int {
tb.mutex.Lock()
defer tb.mutex.Unlock()
tb.refill()
return tb.tokens
}
// 示例使用
func main() {
// 创建一个容量为10,速率为5个/秒的令牌桶
tb := NewTokenBucket(10, 5)
// 模拟请求
for i := 0; i < 20; i++ {
err := tb.Take()
if err != nil {
fmt.Printf("Request %d: Rejected - %v\n", i+1, err)
} else {
fmt.Printf("Request %d: Accepted - Available tokens: %d\n", i+1, tb.Available())
}
// 前10个请求快速发送,后面的请求间隔300ms
if i < 10 {
time.Sleep(100 * time.Millisecond)
} else {
time.Sleep(300 * time.Millisecond)
}
}
}
4.3 漏桶算法实现¶
漏桶算法能够平滑突发流量,将请求匀速处理。其核心思想是:
- 请求先进入桶中等待处理
- 系统以固定速率从桶中取出请求处理
- 桶满后新请求被丢弃
package main
import (
"errors"
"fmt"
"sync"
"time"
)
// LeakyBucket 漏桶
type LeakyBucket struct {
capacity int // 桶容量
rate int // 漏出速率(个/秒)
requests int // 当前请求数
lastLeak time.Time // 最后一次漏出时间
mutex sync.Mutex
}
// NewLeakyBucket 创建漏桶
func NewLeakyBucket(capacity, rate int) *LeakyBucket {
return &LeakyBucket{
capacity: capacity,
rate: rate,
requests: 0,
lastLeak: time.Now(),
}
}
// leak 漏出请求
func (lb *LeakyBucket) leak() {
now := time.Now()
elapsed := now.Sub(lb.lastLeak)
// 计算这段时间可以漏出的请求数
leaked := int(elapsed.Seconds()) * lb.rate
if leaked > 0 {
if lb.requests > leaked {
lb.requests -= leaked
} else {
lb.requests = 0
}
lb.lastLeak = now
}
}
// Add 添加请求
func (lb *LeakyBucket) Add() error {
lb.mutex.Lock()
defer lb.mutex.Unlock()
lb.leak()
if lb.requests < lb.capacity {
lb.requests++
return nil
}
return errors.New("bucket is full")
}
// Pending 获取当前等待的请求数
func (lb *LeakyBucket) Pending() int {
lb.mutex.Lock()
defer lb.mutex.Unlock()
lb.leak()
return lb.requests
}
// 示例使用
func main() {
// 创建一个容量为5,速率为2个/秒的漏桶
lb := NewLeakyBucket(5, 2)
// 模拟突发请求
for i := 0; i < 10; i++ {
err := lb.Add()
if err != nil {
fmt.Printf("Request %d: Rejected - %v\n", i+1, err)
} else {
fmt.Printf("Request %d: Accepted - Pending requests: %d\n", i+1, lb.Pending())
}
// 前5个请求快速发送,后面的请求间隔500ms
if i < 5 {
time.Sleep(100 * time.Millisecond)
} else {
time.Sleep(500 * time.Millisecond)
}
}
}
4.4 滑动窗口限流¶
滑动窗口是对固定窗口的改进,将时间窗口划分为更小的区间,随着时间推移平滑移动窗口,避免固定窗口在窗口边界可能出现的流量突增问题。
package main
import (
"errors"
"fmt"
"sync"
"time"
)
// SlidingWindow 滑动窗口
type SlidingWindow struct {
windowSize time.Duration // 窗口大小
intervalCount int // 区间数量
maxRequests int // 最大请求数
intervals []int // 每个区间的请求数
currentIndex int // 当前区间索引
lastUpdate time.Time // 最后更新时间
mutex sync.Mutex
}
// NewSlidingWindow 创建滑动窗口
func NewSlidingWindow(windowSize time.Duration, intervalCount, maxRequests int) *SlidingWindow {
return &SlidingWindow{
windowSize: windowSize,
intervalCount: intervalCount,
maxRequests: maxRequests,
intervals: make([]int, intervalCount),
currentIndex: 0,
lastUpdate: time.Now(),
}
}
// 计算当前区间索引并清理过期区间
func (sw *SlidingWindow) updateCurrentInterval() {
now := time.Now()
elapsed := now.Sub(sw.lastUpdate)
// 每个区间的时长
intervalDuration := sw.windowSize / time.Duration(sw.intervalCount)
// 计算需要移动的区间数
intervalsToMove := int(elapsed / intervalDuration)
if intervalsToMove > 0 {
// 移动窗口,清理过期区间
for i := 0; i < intervalsToMove && i < sw.intervalCount; i++ {
sw.currentIndex = (sw.currentIndex + 1) % sw.intervalCount
sw.intervals[sw.currentIndex] = 0
}
sw.lastUpdate = now
}
}
// Allow 检查是否允许请求
func (sw *SlidingWindow) Allow() bool {
sw.mutex.Lock()
defer sw.mutex.Unlock()
sw.updateCurrentInterval()
// 计算总请求数
total := 0
for _, count := range sw.intervals {
total += count
}
if total < sw.maxRequests {
sw.intervals[sw.currentIndex]++
return true
}
return false
}
// Count 获取当前窗口内的请求数
func (sw *SlidingWindow) Count() int {
sw.mutex.Lock()
defer sw.mutex.Unlock()
sw.updateCurrentInterval()
total := 0
for _, count := range sw.intervals {
total += count
}
return total
}
// 示例使用
func main() {
// 创建一个10秒窗口,分为10个区间,最大15个请求
sw := NewSlidingWindow(10*time.Second, 10, 15)
// 模拟请求
for i := 0; i < 30; i++ {
allowed := sw.Allow()
if allowed {
fmt.Printf("Request %d: Allowed - Current count: %d\n", i+1, sw.Count())
} else {
fmt.Printf("Request %d: Rejected - Current count: %d\n", i+1, sw.Count())
}
// 前10个请求快速发送,后面的请求间隔1秒
if i < 10 {
time.Sleep(200 * time.Millisecond)
} else {
time.Sleep(1 * time.Second)
}
}
}
4.5 分布式限流策略¶
在分布式系统中,单机限流无法控制全局流量,需要分布式限流策略:
- 集中式限流:使用Redis等中间件实现统一的限流计数器
- 基于令牌桶的分布式限流:中心节点生成令牌,各节点获取令牌
- 基于一致性哈希的限流:将请求路由到固定节点,在节点上进行限流
- 网关层限流:在API网关统一进行流量控制
以下是基于Redis的分布式限流实现:
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
// RedisRateLimiter 基于Redis的分布式限流器
type RedisRateLimiter struct {
client *redis.Client
key string // Redis键名
rate int // 速率(个/秒)
burst int // 突发容量
windowSize time.Duration // 窗口大小
}
// NewRedisRateLimiter 创建分布式限流器
func NewRedisRateLimiter(client *redis.Client, key string, rate int, burst int, windowSize time.Duration) *RedisRateLimiter {
return &RedisRateLimiter{
client: client,
key: key,
rate: rate,
burst: burst,
windowSize: windowSize,
}
}
// Allow 检查是否允许请求
func (r *RedisRateLimiter) Allow(ctx context.Context) (bool, error) {
// 使用Redis的INCR命令实现计数器
count, err := r.client.Incr(ctx, r.key).Result()
if err != nil {
return false, err
}
// 第一次设置过期时间
if count == 1 {
_, err = r.client.Expire(ctx, r.key, r.windowSize).Result()
if err != nil {
return false, err
}
}
// 检查是否超过限制
return count <= int64(r.rate+r.burst), nil
}
// TokenBucketAllow 使用令牌桶算法检查是否允许请求
func (r *RedisRateLimiter) TokenBucketAllow(ctx context.Context) (bool, error) {
// 使用Redis的令牌桶脚本
script := `
local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
local rate = tonumber(ARGV[1])
local burst = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local window_size = tonumber(ARGV[4])
-- 计算填充速率:每个窗口填充的令牌数
local fill_rate = rate * window_size / 1000
-- 获取当前令牌数和最后更新时间
local current_tokens = tonumber(redis.call('get', tokens_key) or burst)
local last_refreshed = tonumber(redis.call('get', timestamp_key) or now)
-- 计算时间差
local delta = now - last_refreshed
-- 计算新的令牌数
current_tokens = math.min(burst, current_tokens + fill_rate * delta / 1000)
local new_tokens = current_tokens - 1
-- 更新令牌数和时间戳
redis.call('setex', tokens_key, window_size / 1000, new_tokens)
redis.call('setex', timestamp_key, window_size / 1000, now)
-- 返回是否允许请求
return new_tokens >= 0
`
now := time.Now().UnixMilli()
keys := []string{r.key + ":tokens", r.key + ":timestamp"}
args := []interface{}{r.rate, r.burst, now, r.windowSize.Milliseconds()}
result, err := r.client.Eval(ctx, script, keys, args...).Bool()
if err != nil {
return false, err
}
return result, nil
}
// 示例使用
func main() {
// 连接Redis
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// 创建限流器:10个/秒,突发5个,窗口1000ms
limiter := NewRedisRateLimiter(client, "api:rate_limit", 10, 5, time.Second)
ctx := context.Background()
// 模拟请求
for i := 0; i < 20; i++ {
allowed, err := limiter.TokenBucketAllow(ctx)
if err != nil {
fmt.Printf("Request %d: Error - %v\n", i+1, err)
} else if allowed {
fmt.Printf("Request %d: Allowed\n", i+1)
} else {
fmt.Printf("Request %d: Rejected\n", i+1)
}
// 控制请求频率
time.Sleep(100 * time.Millisecond)
}
}
5. 重试机制设计¶
5.1 重试策略类型¶
重试机制是提高服务调用可靠性的重要手段,常见的重试策略有:
- 固定间隔重试:每次重试间隔固定时间
- 指数退避重试:重试间隔呈指数增长
- 随机延迟重试:每次重试间隔随机时间
- 最大次数重试:限制最大重试次数
- 组合策略:结合多种策略的重试机制
选择合适的重试策略需要考虑服务特性、网络状况和业务需求。
5.2 指数退避算法¶
指数退避算法是一种常用的重试策略,其重试间隔随重试次数呈指数增长,减少对服务的持续压力。
package main
import (
"errors"
"fmt"
"math/rand"
"time"
)
// ExponentialBackoff 指数退避重试
func ExponentialBackoff(attempts int, baseDelay time.Duration, maxDelay time.Duration, jitter float64) time.Duration {
if attempts <= 0 {
return 0
}
// 计算指数延迟: baseDelay * 2^(attempts-1)
delay := baseDelay * (1 << (attempts - 1))
// 添加随机抖动,避免重试风暴
if jitter > 0 {
rand.Seed(time.Now().UnixNano())
jitterAmount := time.Duration(rand.Float64() * jitter * float64(delay))
delay += jitterAmount
}
// 不超过最大延迟
if delay > maxDelay {
delay = maxDelay
}
return delay
}
// RetryWithBackoff 使用指数退避策略重试函数
func RetryWithBackoff(fn func() error, maxAttempts int, baseDelay time.Duration, maxDelay time.Duration, jitter float64) error {
var lastErr error
for attempt := 1; attempt <= maxAttempts; attempt++ {
err := fn()
if err == nil {
return nil // 成功
}
lastErr = err
// 如果是最后一次尝试,不再重试
if attempt == maxAttempts {
break
}
// 计算延迟
delay := ExponentialBackoff(attempt, baseDelay, maxDelay, jitter)
fmt.Printf("Attempt %d failed: %v. Retrying in %v...\n", attempt, err, delay)
time.Sleep(delay)
}
return fmt.Errorf("after %d attempts, last error: %w", maxAttempts, lastErr)
}
// 示例使用
func main() {
// 模拟一个可能失败的操作,第3次尝试成功
attempts := 0
fn := func() error {
attempts++
fmt.Printf("Performing operation (attempt %d)...\n", attempts)
// 前2次失败,第3次成功
if attempts < 3 {
return errors.New("temporary error")
}
return nil
}
// 最多重试5次,初始延迟1秒,最大延迟10秒,抖动0.2
err := RetryWithBackoff(fn, 5, time.Second, 10*time.Second, 0.2)
if err != nil {
fmt.Printf("Final error: %v\n", err)
} else {
fmt.Println("Operation succeeded!")
}
}
5.3 重试条件判断¶
不是所有错误都应该重试,需要根据错误类型判断是否适合重试:
package main
import (
"errors"
"fmt"
"net/http"
"time"
)
// 定义可重试的错误类型
var (
ErrTemporary = errors.New("temporary error")
ErrTimeout = errors.New("timeout error")
)
// IsRetryable 判断错误是否可以重试
func IsRetryable(err error) bool {
if err == nil {
return false
}
// 判断特定错误类型
switch err {
case ErrTemporary, ErrTimeout:
return true
}
// 检查HTTP错误状态码
var httpErr interface{ StatusCode() int }
if errors.As(err, &httpErr) {
statusCode := httpErr.StatusCode()
// 5xx服务器错误通常可以重试
// 429请求过多也可以重试
return statusCode >= 500 || statusCode == http.StatusTooManyRequests
}
return false
}
// RetryWithCondition 带条件判断的重试
func RetryWithCondition(fn func() error, maxAttempts int, delay time.Duration) error {
var lastErr error
for attempt := 1; attempt <= maxAttempts; attempt++ {
err := fn()
if err == nil {
return nil // 成功
}
lastErr = err
// 检查是否可以重试
if !IsRetryable(err) {
return fmt.Errorf("non-retryable error: %w", err)
}
// 如果是最后一次尝试,不再重试
if attempt == maxAttempts {
break
}
fmt.Printf("Attempt %d failed: %v. Retrying in %v...\n", attempt, err, delay)
time.Sleep(delay)
}
return fmt.Errorf("after %d attempts, last error: %w", maxAttempts, lastErr)
}
// 示例使用
func main() {
// 模拟一个可能返回不同错误的操作
attempts := 0
fn := func() error {
attempts++
fmt.Printf("Performing operation (attempt %d)...\n", attempts)
// 第一次返回临时错误,第二次返回超时错误,第三次成功
switch attempts {
case 1:
return ErrTemporary
case 2:
return ErrTimeout
default:
return nil
}
}
// 最多重试3次,每次延迟1秒
err := RetryWithCondition(fn, 3, time.Second)
if err != nil {
fmt.Printf("Final error: %v\n", err)
} else {
fmt.Println("Operation succeeded!")
}
}
5.4 重试与幂等性¶
重试机制可能导致重复执行,因此被重试的操作必须是幂等的,即多次执行产生的效果与一次执行相同。
保证幂等性的常用方法:
- 使用唯一标识符(UUID)标记请求,服务端根据标识符去重
- 采用乐观锁或悲观锁控制并发修改
- 设计无状态的服务接口
package main
import (
"errors"
"fmt"
"sync"
)
// OrderService 订单服务
type OrderService struct {
orders map[string]bool // 记录已处理的订单ID
mutex sync.Mutex
}
func NewOrderService() *OrderService {
return &OrderService{
orders: make(map[string]bool),
}
}
// CreateOrder 创建订单(幂等操作)
func (s *OrderService) CreateOrder(orderID string, amount float64) error {
s.mutex.Lock()
defer s.mutex.Unlock()
// 检查订单是否已处理
if s.orders[orderID] {
fmt.Printf("Order %s already processed\n", orderID)
return nil // 已处理,直接返回成功
}
// 模拟创建订单的业务逻辑
fmt.Printf("Creating order %s with amount %.2f\n", orderID, amount)
// 模拟可能的错误
if amount < 0 {
return errors.New("invalid amount")
}
// 标记订单为已处理
s.orders[orderID] = true
return nil
}
// 示例使用
func main() {
service := NewOrderService()
orderID := "ORDER_12345"
// 模拟重试场景
for i := 0; i < 3; i++ {
fmt.Printf("Attempt %d to create order...\n", i+1)
err := service.CreateOrder(orderID, 99.99)
if err != nil {
fmt.Printf("Error: %v\n", err)
break
}
}
}
5.5 重试监控与告警¶
重试本身是系统异常的表现,需要对重试进行监控和告警:
package main
import (
"errors"
"fmt"
"sync/atomic"
"time"
)
// RetryMetrics 重试监控指标
type RetryMetrics struct {
TotalAttempts uint64
TotalRetries uint64
TotalFailures uint64
MaxRetries uint64
LastFailureTime time.Time
}
// RetryWithMetrics 带监控的重试
func RetryWithMetrics(fn func() error, maxAttempts int, delay time.Duration, metrics *RetryMetrics) error {
var lastErr error
attempts := 0
for attempts < maxAttempts {
attempts++
atomic.AddUint64(&metrics.TotalAttempts, 1)
err := fn()
if err == nil {
return nil // 成功
}
lastErr = err
// 如果是最后一次尝试,不再重试
if attempts == maxAttempts {
atomic.AddUint64(&metrics.TotalFailures, 1)
metrics.LastFailureTime = time.Now()
break
}
// 记录重试
atomic.AddUint64(&metrics.TotalRetries, 1)
if uint64(attempts) > atomic.LoadUint64(&metrics.MaxRetries) {
atomic.StoreUint64(&metrics.MaxRetries, uint64(attempts))
}
fmt.Printf("Attempt %d failed: %v. Retrying in %v...\n", attempts, err, delay)
time.Sleep(delay)
}
return fmt.Errorf("after %d attempts, last error: %w", attempts, lastErr)
}
// 定期打印监控指标
func MonitorMetrics(metrics *RetryMetrics, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Println("\nRetry Metrics:")
fmt.Printf("Total Attempts: %d\n", atomic.LoadUint64(&metrics.TotalAttempts))
fmt.Printf("Total Retries: %d\n", atomic.LoadUint64(&metrics.TotalRetries))
fmt.Printf("Total Failures: %d\n", atomic.LoadUint64(&metrics.TotalFailures))
fmt.Printf("Max Retries for a Single Operation: %d\n", atomic.LoadUint64(&metrics.MaxRetries))
fmt.Printf("Last Failure Time: %v\n", metrics.LastFailureTime)
// 检查是否需要告警(例如重试率过高)
attempts := atomic.LoadUint64(&metrics.TotalAttempts)
if attempts > 0 {
retryRate := float64(atomic.LoadUint64(&metrics.TotalRetries)) / float64(attempts)
if retryRate > 0.3 { // 重试率超过30%告警
fmt.Println("ALERT: High retry rate detected!")
}
}
}
}
}
// 示例使用
func main() {
metrics := &RetryMetrics{}
// 启动监控
go MonitorMetrics(metrics, 5*time.Second)
// 模拟一个有50%成功率的操作
successRate := 0.5
fn := func() error {
if rand.Float64() < successRate {
return errors.New("temporary error")
}
return nil
}
// 执行多次操作,每次最多重试3次
for i := 0; i < 10; i++ {
fmt.Printf("\nOperation %d:\n", i+1)
RetryWithMetrics(fn, 3, 1*time.Second, metrics)
time.Sleep(1 * time.Second)
}
// 等待最后一次监控输出
time.Sleep(5 * time.Second)
}
6. 超时控制¶
6.1 超时设置原则¶
合理的超时设置是防止服务长时间阻塞的关键,遵循以下原则:
- 超时时间应小于上游服务的超时时间
- 根据服务特性设置合理的超时时间,既不过短也不过长
- 不同类型的操作应设置不同的超时时间
- 考虑网络延迟和服务负载变化
- 超时时间应可配置,便于动态调整
6.2 Context超时控制¶
Go语言中使用context.Context实现超时控制:
package main
import (
"context"
"errors"
"fmt"
"time"
)
// 模拟一个可能耗时的操作
func longRunningOperation(ctx context.Context, duration time.Duration) error {
select {
case <-time.After(duration):
// 操作完成
return nil
case <-ctx.Done():
// 上下文被取消(超时或手动取消)
return ctx.Err()
}
}
// 带超时控制的函数调用
func callWithTimeout(timeout time.Duration, opDuration time.Duration) error {
// 创建带超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() // 确保资源被释放
// 启动操作
errChan := make(chan error, 1)
go func() {
errChan <- longRunningOperation(ctx, opDuration)
}()
// 等待操作完成或超时
select {
case err := <-errChan:
return err
case <-ctx.Done():
return fmt.Errorf("operation timed out after %v: %w", timeout, ctx.Err())
}
}
// 示例使用
func main() {
// 操作需要2秒完成
opDuration := 2 * time.Second
// 测试1:超时时间3秒(足够完成操作)
fmt.Println("Test 1: Timeout 3s")
err := callWithTimeout(3*time.Second, opDuration)
if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
fmt.Println("Operation completed successfully")
}
// 测试2:超时时间1秒(不足完成操作)
fmt.Println("\nTest 2: Timeout 1s")
err = callWithTimeout(1*time.Second, opDuration)
if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
fmt.Println("Operation completed successfully")
}
}
6.3 级联超时处理¶
在分布式系统中,一个请求可能经过多个服务,需要处理级联超时:
package main
import (
"context"
"fmt"
"time"
)
// 模拟下游服务调用
func downstreamService(ctx context.Context, serviceName string, delay time.Duration) (string, error) {
select {
case <-time.After(delay):
return fmt.Sprintf("Response from %s", serviceName), nil
case <-ctx.Done():
return "", fmt.Errorf("%s: %w", serviceName, ctx.Err())
}
}
// 模拟中间服务,调用下游服务
func intermediateService(ctx context.Context) (string, error) {
// 为下游调用设置超时,应该小于当前上下文的超时
// 预留一部分时间给其他操作
remainingTime := ctx.RemainingTime()
if remainingTime < 500*time.Millisecond {
return "", fmt.Errorf("not enough time for intermediate service")
}
// 分配80%的剩余时间给下游调用
downstreamTimeout := time.Duration(0.8 * float64(remainingTime))
downstreamCtx, cancel := context.WithTimeout(ctx, downstreamTimeout)
defer cancel()
// 调用下游服务
return downstreamService(downstreamCtx, "intermediate", 300*time.Millisecond)
}
// 模拟上游服务,调用中间服务
func upstreamService(ctx context.Context) (string, error) {
// 为中间服务调用设置超时
remainingTime := ctx.RemainingTime()
if remainingTime < 1*time.Second {
return "", fmt.Errorf("not enough time for upstream service")
}
// 分配80%的剩余时间给中间服务调用
intermediateTimeout := time.Duration(0.8 * float64(remainingTime))
intermediateCtx, cancel := context.WithTimeout(ctx, intermediateTimeout)
defer cancel()
// 调用中间服务
return intermediateService(intermediateCtx)
}
// 示例使用
func main() {
// 总超时设置为2秒
totalTimeout := 2 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), totalTimeout)
defer cancel()
// 调用上游服务
result, err := upstreamService(ctx)
if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
fmt.Printf("Result: %s\n", result)
}
// 测试超时情况:总超时1秒,但操作需要更长时间
shortTimeout := 1 * time.Second
ctx2, cancel2 := context.WithTimeout(context.Background(), shortTimeout)
defer cancel2()
result2, err2 := upstreamService(ctx2)
if err2 != nil {
fmt.Printf("Short timeout test error: %v\n", err2)
} else {
fmt.Printf("Short timeout test result: %s\n", result2)
}
}
6.4 超时优化策略¶
超时优化可以提高系统响应性和资源利用率:
- 动态超时:根据服务负载和网络状况动态调整超时时间
- 预热超时:新启动的服务实例设置更长的初始超时
- 分位数超时:基于历史响应时间的分位数设置超时
- 自适应超时:根据最近几次响应时间调整超时设置
package main
import (
"context"
"fmt"
"math"
"sync/atomic"
"time"
)
// AdaptiveTimeout 自适应超时控制器
type AdaptiveTimeout struct {
history []time.Duration // 历史响应时间
maxHistory int // 最大历史记录数
p95Timeout time.Duration // 基于P95的超时时间
factor float64 // 安全系数
initialized uint32 // 是否初始化
}
// NewAdaptiveTimeout 创建自适应超时控制器
func NewAdaptiveTimeout(maxHistory int, factor float64) *AdaptiveTimeout {
return &AdaptiveTimeout{
history: make([]time.Duration, 0, maxHistory),
maxHistory: maxHistory,
factor: factor,
}
}
// Record 记录响应时间
func (a *AdaptiveTimeout) Record(duration time.Duration) {
a.history = append(a.history, duration)
// 保持历史记录不超过最大数量
if len(a.history) > a.maxHistory {
a.history = a.history[len(a.history)-a.maxHistory:]
}
// 计算P95值
if len(a.history) >= 20 { // 至少需要20个样本
a.calculateP95()
atomic.StoreUint32(&a.initialized, 1)
}
}
// calculateP95 计算P95分位数
func (a *AdaptiveTimeout) calculateP95() {
// 复制并排序历史记录
sorted := make([]time.Duration, len(a.history))
copy(sorted, a.history)
for i := 0; i < len(sorted); i++ {
for j := i + 1; j < len(sorted); j++ {
if sorted[j] < sorted[i] {
sorted[i], sorted[j] = sorted[j], sorted[i]
}
}
}
// 计算P95位置
index := int(math.Ceil(0.95 * float64(len(sorted)))) - 1
if index < 0 {
index = 0
}
a.p95Timeout = sorted[index]
}
// GetTimeout 获取当前超时时间
func (a *AdaptiveTimeout) GetTimeout(defaultTimeout time.Duration) time.Duration {
if atomic.LoadUint32(&a.initialized) == 0 {
return defaultTimeout
}
// 应用安全系数
return time.Duration(float64(a.p95Timeout) * a.factor)
}
// 示例使用
func main() {
// 创建自适应超时控制器,最多记录100个历史,安全系数1.5
timeoutController := NewAdaptiveTimeout(100, 1.5)
defaultTimeout := 2 * time.Second
// 模拟多次服务调用,记录响应时间
for i := 0; i < 50; i++ {
// 模拟响应时间:大部分在100-300ms,偶尔有慢响应
baseDelay := time.Duration(100 + rand.Intn(200)) * time.Millisecond
if rand.Float64() < 0.05 { // 5%的概率有慢响应
baseDelay = time.Duration(500 + rand.Intn(500)) * time.Millisecond
}
// 记录响应时间
timeoutController.Record(baseDelay)
// 获取当前超时时间
currentTimeout := timeoutController.GetTimeout(defaultTimeout)
// 使用当前超时时间调用服务
ctx, cancel := context.WithTimeout(context.Background(), currentTimeout)
start := time.Now()
// 模拟服务调用
err := func(ctx context.Context, delay time.Duration) error {
select {
case <-time.After(delay):
return nil
case <-ctx.Done():
return ctx.Err()
}
}(ctx, baseDelay)
duration := time.Since(start)
if err != nil {
fmt.Printf("Call %d: Timeout after %v (configured timeout: %v, actual duration: %v)\n",
i+1, duration, currentTimeout, baseDelay)
} else {
if i%10 == 0 && i > 0 {
fmt.Printf("Call %d: Completed in %v (configured timeout: %v)\n",
i+1, duration, currentTimeout)
}
}
cancel()
time.Sleep(10 * time.Millisecond)
}
}
7. 负载均衡策略¶
7.1 负载均衡算法对比¶
负载均衡算法用于将请求分发到多个服务实例,常见算法对比:
| 算法 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 轮询 | 简单、无状态 | 不考虑实例负载差异 | 实例性能相近的场景 |
| 加权轮询 | 可根据实例性能分配权重 | 权重调整不灵活 | 实例性能差异固定的场景 |
| 随机 | 简单、避免请求集中 | 可能导致负载不均 | 对负载均衡要求不高的场景 |
| 加权随机 | 可根据性能调整权重 | 短期可能不均 | 实例性能有差异的场景 |
| 最小连接数 | 考虑实例当前负载 | 需要维护连接状态 | 长连接服务 |
| IP哈希 | 保证会话一致性 | 实例增减影响大 | 需要会话保持的场景 |
| 一致性哈希 | 实例增减影响小 | 实现复杂 | 分布式缓存、服务发现 |
7.2 一致性哈希算法¶
一致性哈希算法能够在服务实例动态变化时,最小化请求路由的变化:
package main
import (
"crypto/sha1"
"fmt"
"sort"
)
// HashRing 一致性哈希环
type HashRing struct {
replicas int // 每个节点的虚拟节点数
ring map[uint32]string // 哈希环,key是哈希值,value是节点
sortedHashes []uint32 // 排序的哈希值
}
// NewHashRing 创建一致性哈希环
func NewHashRing(replicas int) *HashRing {
return &HashRing{
replicas: replicas,
ring: make(map[uint32]string),
}
}
// 计算字符串的哈希值
func (hr *HashRing) hash(key string) uint32 {
hash := sha1.Sum([]byte(key))
// 取前4个字节作为哈希值
return uint32(hash[3])<<24 | uint32(hash[2])<<16 | uint32(hash[1])<<8 | uint32(hash[0])
}
// Add 添加节点
func (hr *HashRing) Add(nodes ...string) {
for _, node := range nodes {
// 为每个节点创建多个虚拟节点
for i := 0; i < hr.replicas; i++ {
hash := hr.hash(fmt.Sprintf("%s%d", node, i))
hr.ring[hash] = node
hr.sortedHashes = append(hr.sortedHashes, hash)
}
}
// 排序哈希值
sort.Slice(hr.sortedHashes, func(i, j int) bool {
return hr.sortedHashes[i] < hr.sortedHashes[j]
})
}
// Remove 移除节点
func (hr *HashRing) Remove(node string) {
for i := 0; i < hr.replicas; i++ {
hash := hr.hash(fmt.Sprintf("%s%d", node, i))
delete(hr.ring, hash)
// 从排序哈希中移除
index := sort.SearchInts(
// 转换为int切片进行搜索
func() []int {
res := make([]int, len(hr.sortedHashes))
for i, v := range hr.sortedHashes {
res[i] = int(v)
}
return res
}(),
int(hash),
)
if index < len(hr.sortedHashes) && hr.sortedHashes[index] == hash {
hr.sortedHashes = append(hr.sortedHashes[:index], hr.sortedHashes[index+1:]...)
}
}
}
// Get 获取key对应的节点
func (hr *HashRing) Get(key string) string {
if len(hr.ring) == 0 {
return ""
}
hash := hr.hash(key)
// 查找第一个大于等于当前哈希值的节点
index := sort.Search(len(hr.sortedHashes), func(i int) bool {
return hr.sortedHashes[i] >= hash
})
// 如果索引等于长度,说明应该循环到第一个节点
if index == len(hr.sortedHashes) {
index = 0
}
return hr.ring[hr.sortedHashes[index]]
}
// 示例使用
func main() {
// 创建哈希环,每个节点有3个虚拟节点
ring := NewHashRing(3)
// 添加节点
ring.Add("node1:8080", "node2:8080", "node3:8080")
fmt.Println("Added nodes: node1:8080, node2:8080, node3:8080")
// 测试一些key的映射
keys := []string{"user1", "user2", "user3", "user4", "user5", "order1", "order2", "product1"}
for _, key := range keys {
node := ring.Get(key)
fmt.Printf("Key: %-6s -> Node: %s\n", key, node)
}
// 移除一个节点
ring.Remove("node2:8080")
fmt.Println("\nRemoved node: node2:8080")
// 再次测试key的映射,观察变化
for _, key := range keys {
node := ring.Get(key)
fmt.Printf("Key: %-6s -> Node: %s\n", key, node)
}
}
7.3 健康检查集成¶
负载均衡应结合健康检查,避免将请求发送到不健康的实例:
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
// ServiceInstance 服务实例
type ServiceInstance struct {
Address string
Healthy bool
Weight int
LastCheck time.Time
}
// HealthChecker 健康检查器
type HealthChecker struct {
instances []*ServiceInstance
interval time.Duration
mutex sync.Mutex
}
// NewHealthChecker 创建健康检查器
func NewHealthChecker(interval time.Duration) *HealthChecker {
return &HealthChecker{
instances: make([]*ServiceInstance, 0),
interval: interval,
}
}
// AddInstance 添加服务实例
func (hc *HealthChecker) AddInstance(address string, weight int) {
hc.mutex.Lock()
defer hc.mutex.Unlock()
hc.instances = append(hc.instances, &ServiceInstance{
Address: address,
Healthy: false, // 初始标记为不健康,等待健康检查
Weight: weight,
})
}
// GetHealthyInstances 获取健康的服务实例
func (hc *HealthChecker) GetHealthyInstances() []*ServiceInstance {
hc.mutex.Lock()
defer hc.mutex.Unlock()
healthy := make([]*ServiceInstance, 0)
for _, instance := range hc.instances {
if instance.Healthy {
healthy = append(healthy, instance)
}
}
return healthy
}
// Start 开始健康检查
func (hc *HealthChecker) Start() {
ticker := time.NewTicker(hc.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
hc.checkAllInstances()
}
}
}
// 检查所有实例的健康状态
func (hc *HealthChecker) checkAllInstances() {
hc.mutex.Lock()
defer hc.mutex.Unlock()
for _, instance := range hc.instances {
go hc.checkInstance(instance)
}
}
// 检查单个实例的健康状态
func (hc *HealthChecker) checkInstance(instance *ServiceInstance) {
// 简单的HTTP健康检查
url := fmt.Sprintf("http://%s/health", instance.Address)
client := &http.Client{
Timeout: 2 * time.Second,
}
resp, err := client.Get(url)
healthy := err == nil && resp.StatusCode == http.StatusOK
hc.mutex.Lock()
instance.Healthy = healthy
instance.LastCheck = time.Now()
hc.mutex.Unlock()
if healthy {
fmt.Printf("Instance %s is healthy\n", instance.Address)
} else {
fmt.Printf("Instance %s is unhealthy: %v\n", instance.Address, err)
}
}
// WeightedLoadBalancer 加权负载均衡器
type WeightedLoadBalancer struct {
healthChecker *HealthChecker
}
// NewWeightedLoadBalancer 创建加权负载均衡器
func NewWeightedLoadBalancer(hc *HealthChecker) *WeightedLoadBalancer {
return &WeightedLoadBalancer{
healthChecker: hc,
}
}
// ChooseInstance 选择服务实例
func (lb *WeightedLoadBalancer) ChooseInstance() *ServiceInstance {
instances := lb.healthChecker.GetHealthyInstances()
if len(instances) == 0 {
return nil
}
// 计算总权重
totalWeight := 0
for _, instance := range instances {
totalWeight += instance.Weight
}
// 随机选择一个基于权重的索引
rand.Seed(time.Now().UnixNano())
random := rand.Intn(totalWeight)
// 根据权重选择实例
current := 0
for _, instance := range instances {
current += instance.Weight
if current > random {
return instance
}
}
// 应该不会到达这里
return instances[0]
}
// 启动一个简单的健康检查服务
func startHealthyService(address string, healthy bool) {
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
if healthy {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "OK")
} else {
w.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprintf(w, "Error")
}
})
go func() {
fmt.Printf("Starting service at %s\n", address)
http.ListenAndServe(address, nil)
}()
}
// 示例使用
func main() {
// 启动几个测试服务
startHealthyService(":8081", true) // 健康服务
startHealthyService(":8082", true) // 健康服务
startHealthyService(":8083", false) // 不健康服务
// 创建健康检查器,每5秒检查一次
healthChecker := NewHealthChecker(5 * time.Second)
healthChecker.AddInstance("localhost:8081", 3) // 权重3
healthChecker.AddInstance("localhost:8082", 1) // 权重1
healthChecker.AddInstance("localhost:8083", 2) // 权重2,但不健康
// 启动健康检查
go healthChecker.Start()
// 等待第一次健康检查完成
time.Sleep(1 * time.Second)
// 创建负载均衡器
lb := NewWeightedLoadBalancer(healthChecker)
// 测试负载均衡
fmt.Println("\nTesting load balancing:")
counts := make(map[string]int)
for i := 0; i < 100; i++ {
instance := lb.ChooseInstance()
if instance != nil {
counts[instance.Address]++
}
time.Sleep(10 * time.Millisecond)
}
// 输出统计结果
fmt.Println("\nLoad balancing results:")
for address, count := range counts {
fmt.Printf("%s: %d times (%.0f%%)\n", address, count, float64(count)/100*100)
}
}
7.4 动态权重调整¶
根据服务实例的实时负载动态调整权重,优化资源利用:
package main
import (
"fmt"
"sync"
"time"
)
// InstanceMetrics 实例指标
type InstanceMetrics struct {
CPUUsage float64 // CPU使用率(0-1)
MemoryUsage float64 // 内存使用率(0-1)
RequestRate float64 // 请求率(请求/秒)
ErrorRate float64 // 错误率(0-1)
LastUpdated time.Time
}
// DynamicWeightInstance 支持动态权重的服务实例
type DynamicWeightInstance struct {
Address string
BaseWeight int // 基础权重
CurrentWeight int // 当前权重
Metrics InstanceMetrics
Healthy bool
mutex sync.Mutex
}
// UpdateMetrics 更新实例指标
func (i *DynamicWeightInstance) UpdateMetrics(metrics InstanceMetrics) {
i.mutex.Lock()
defer i.mutex.Unlock()
i.Metrics = metrics
i.Metrics.LastUpdated = time.Now()
}
// CalculateDynamicWeight 计算动态权重
func (i *DynamicWeightInstance) CalculateDynamicWeight() int {
i.mutex.Lock()
defer i.mutex.Unlock()
if !i.Healthy {
i.CurrentWeight = 0
return 0
}
// 基于各项指标计算权重调整因子
// 指标越好(值越低),调整因子越高
cpuFactor := 1.0 - i.Metrics.CPUUsage
memFactor := 1.0 - i.Metrics.MemoryUsage
errorFactor := 1.0 - i.Metrics.ErrorRate
// 综合因子,CPU和错误率权重更高
combinedFactor := 0.4*cpuFactor + 0.3*memFactor + 0.3*errorFactor
// 确保因子不为负
if combinedFactor < 0 {
combinedFactor = 0
}
// 计算当前权重,不超过基础权重的2倍,不低于0
i.CurrentWeight = int(float64(i.BaseWeight) * combinedFactor)
if i.CurrentWeight > i.BaseWeight*2 {
i.CurrentWeight = i.BaseWeight * 2
}
if i.CurrentWeight < 0 {
i.CurrentWeight = 0
}
return i.CurrentWeight
}
// DynamicLoadBalancer 动态权重负载均衡器
type DynamicLoadBalancer struct {
instances []*DynamicWeightInstance
mutex sync.Mutex
}
// NewDynamicLoadBalancer 创建动态负载均衡器
func NewDynamicLoadBalancer() *DynamicLoadBalancer {
return &DynamicLoadBalancer{
instances: make([]*DynamicWeightInstance, 0),
}
}
// AddInstance 添加实例
func (lb *DynamicLoadBalancer) AddInstance(instance *DynamicWeightInstance) {
lb.mutex.Lock()
defer lb.mutex.Unlock()
lb.instances = append(lb.instances, instance)
}
// UpdateWeights 更新所有实例的权重
func (lb *DynamicLoadBalancer) UpdateWeights() {
lb.mutex.Lock()
defer lb.mutex.Unlock()
for _, instance := range lb.instances {
instance.CalculateDynamicWeight()
}
}
// ChooseInstance 选择实例
func (lb *DynamicLoadBalancer) ChooseInstance() *DynamicWeightInstance {
lb.mutex.Lock()
defer lb.mutex.Unlock()
// 计算总权重
totalWeight := 0
for _, instance := range lb.instances {
totalWeight += instance.CurrentWeight
}
if totalWeight <= 0 {
return nil // 没有可用实例
}
// 随机选择
rand.Seed(time.Now().UnixNano())
random := rand.Intn(totalWeight)
// 根据权重选择实例
current := 0
for _, instance := range lb.instances {
current += instance.CurrentWeight
if current > random {
return instance
}
}
// 应该不会到达这里
return nil
}
// 模拟指标收集
func simulateMetricsCollection(instances []*DynamicWeightInstance, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
for _, instance := range instances {
if !instance.Healthy {
continue
}
// 模拟指标变化,负载会波动
cpu := 0.3 + rand.Float64()*0.5 // 30%-80%
mem := 0.2 + rand.Float64()*0.6 // 20%-80%
reqRate := 10 + rand.Float64()*90 // 10-100请求/秒
errorRate := 0 + rand.Float64()*0.1 // 0-10%错误率
// 偶尔模拟一个实例负载升高
if rand.Float64() < 0.2 {
cpu = 0.8 + rand.Float64()*0.2 // 80%-100%
errorRate = 0.1 + rand.Float64()*0.2 // 10%-30%错误率
}
instance.UpdateMetrics(InstanceMetrics{
CPUUsage: cpu,
MemoryUsage: mem,
RequestRate: reqRate,
ErrorRate: errorRate,
})
}
}
}
}
// 示例使用
func main() {
// 创建实例
instances := []*DynamicWeightInstance{
{
Address: "node1:8080",
BaseWeight: 10,
Healthy: true,
},
{
Address: "node2:8080",
BaseWeight: 10,
Healthy: true,
},
{
Address: "node3:8080",
BaseWeight: 10,
Healthy: true,
},
}
// 创建负载均衡器
lb := NewDynamicLoadBalancer()
for _, instance := range instances {
lb.AddInstance(instance)
}
// 启动指标收集
go simulateMetricsCollection(instances, 5*time.Second)
// 定期更新权重
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
lb.UpdateWeights()
fmt.Println("\nUpdated weights:")
for _, instance := range instances {
fmt.Printf("%s: Base=%d, Current=%d, CPU=%.1f%%, Errors=%.1f%%\n",
instance.Address,
instance.BaseWeight,
instance.CurrentWeight,
instance.Metrics.CPUUsage*100,
instance.Metrics.ErrorRate*100)
}
}
}
}()
// 初始更新一次权重
lb.UpdateWeights()
// 模拟请求分发
fmt.Println("\nStarting request distribution simulation...")
counts := make(map[string]int)
for i := 0; i < 1000; i++ {
instance := lb.ChooseInstance()
if instance != nil {
counts[instance.Address]++
}
time.Sleep(10 * time.Millisecond)
}
// 输出结果
fmt.Println("\nFinal distribution:")
total := 0
for _, count := range counts {
total += count
}
for address, count := range counts {
fmt.Printf("%s: %d requests (%.1f%%)\n", address, count, float64(count)/float64(total)*100)
}
}
7.5 故障转移机制¶
当服务实例故障时,自动将请求转移到其他健康实例:
package main
import (
"errors"
"fmt"
"sync"
"time"
)
// FailoverInstance 支持故障转移的服务实例
type FailoverInstance struct {
Address string
Healthy bool
FailureCount int
lastFailure time.Time
mutex sync.Mutex
}
// MarkSuccess 标记成功
func (i *FailoverInstance) MarkSuccess() {
i.mutex.Lock()
defer i.mutex.Unlock()
i.FailureCount = 0
i.Healthy = true
}
// MarkFailure 标记失败
func (i *FailoverInstance) MarkFailure() {
i.mutex.Lock()
defer i.mutex.Unlock()
i.FailureCount++
i.lastFailure = time.Now()
// 如果连续失败3次,标记为不健康
if i.FailureCount >= 3 {
i.Healthy = false
}
}
// AttemptRecovery 尝试恢复实例
func (i *FailoverInstance) AttemptRecovery() bool {
i.mutex.Lock()
defer i.mutex.Unlock()
// 如果实例不健康且距离上次失败已超过30秒,尝试恢复
if !i.Healthy && time.Since(i.lastFailure) > 30*time.Second {
i.Healthy = true
i.FailureCount = 0
return true
}
return false
}
// FailoverLoadBalancer 支持故障转移的负载均衡器
type FailoverLoadBalancer struct {
instances []*FailoverInstance
mutex sync.Mutex
}
// NewFailoverLoadBalancer 创建故障转移负载均衡器
func NewFailoverLoadBalancer() *FailoverLoadBalancer {
return &FailoverLoadBalancer{
instances: make([]*FailoverInstance, 0),
}
}
// AddInstance 添加实例
func (lb *FailoverLoadBalancer) AddInstance(instance *FailoverInstance) {
lb.mutex.Lock()
defer lb.mutex.Unlock()
lb.instances = append(lb.instances, instance)
}
// GetHealthyInstances 获取健康实例
func (lb *FailoverLoadBalancer) GetHealthyInstances() []*FailoverInstance {
lb.mutex.Lock()
defer lb.mutex.Unlock()
healthy := make([]*FailoverInstance, 0)
for _, instance := range lb.instances {
// 尝试恢复可能已恢复的实例
instance.AttemptRecovery()
if instance.Healthy {
healthy = append(healthy, instance)
}
}
return healthy
}
// ChooseInstance 选择实例(轮询算法)
func (lb *FailoverLoadBalancer) ChooseInstance() *FailoverInstance {
healthy := lb.GetHealthyInstances()
if len(healthy) == 0 {
return nil
}
// 简单轮询
lb.mutex.Lock()
staticIndex := 0 // 实际实现中应该使用原子变量追踪当前索引
instance := healthy[staticIndex%len(healthy)]
lb.mutex.Unlock()
return instance
}
// InvokeWithFailover 带故障转移的调用
func (lb *FailoverLoadBalancer) InvokeWithFailover(op func(instance *FailoverInstance) error, maxAttempts int) error {
attempts := 0
var lastErr error
for attempts < maxAttempts {
attempts++
instance := lb.ChooseInstance()
if instance == nil {
return errors.New("no healthy instances available")
}
// 调用操作
err := op(instance)
if err == nil {
// 成功,标记实例成功
instance.MarkSuccess()
return nil
}
// 失败,标记实例失败
instance.MarkFailure()
lastErr = err
fmt.Printf("Attempt %d failed on instance %s: %v\n", attempts, instance.Address, err)
// 如果是最后一次尝试,不再重试
if attempts == maxAttempts {
break
}
// 短暂延迟后重试
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf("after %d attempts, last error: %w", attempts, lastErr)
}
// 模拟服务调用
func simulateServiceCall(instance *FailoverInstance, failureRate float64) error {
// 模拟服务调用延迟
time.Sleep(time.Duration(50 + rand.Intn(150)) * time.Millisecond)
// 随机失败
if rand.Float64() < failureRate {
return errors.New("service error")
}
return nil
}
// 示例使用
func main() {
// 创建实例
instances := []*FailoverInstance{
{Address: "node1:8080", Healthy: true},
{Address: "node2:8080", Healthy: true},
{Address: "node3:8080", Healthy: true},
}
// 创建负载均衡器
lb := NewFailoverLoadBalancer()
for _, instance := range instances {
lb.AddInstance(instance)
}
// 模拟不同实例有不同的失败率
// node1: 10%失败率,node2: 60%失败率,node3: 30%失败率
failureRates := map[string]float64{
"node1:8080": 0.1,
"node2:8080": 0.6,
"node3:8080": 0.3,
}
// 执行多次调用
successCount := 0
failureCount := 0
for i := 0; i < 50; i++ {
fmt.Printf("\nRequest %d:\n", i+1)
err := lb.InvokeWithFailover(func(instance *FailoverInstance) error {
return simulateServiceCall(instance, failureRates[instance.Address])
}, 3) // 最多重试3次
if err != nil {
fmt.Printf("Request %d failed: %v\n", i+1, err)
failureCount++
} else {
fmt.Printf("Request %d succeeded\n", i+1)
successCount++
}
}
// 输出结果
fmt.Println("\nFinal statistics:")
fmt.Printf("Total requests: %d\n", successCount + failureCount)
fmt.Printf("Successful: %d (%.1f%%)\n", successCount, float64(successCount)/float64(successCount + failureCount)*100)
fmt.Printf("Failed: %d (%.1f%%)\n", failureCount, float64(failureCount)/float64(successCount + failureCount)*100)
// 输出实例状态
fmt.Println("\nInstance status:")
for _, instance := range instances {
fmt.Printf("%s: Healthy=%v, Failures=%d\n", instance.Address, instance.Healthy, instance.FailureCount)
}
}
8. 服务降级¶
8.1 降级策略设计¶
服务降级是在系统压力过大或异常情况下,牺牲非核心功能保障核心功能可用的策略:
- 按功能降级:关闭非核心功能,保障核心功能
- 按数据降级:返回部分数据或缓存数据,减少计算
- 按质量降级:降低数据精度或返回简化结果
- 按流量降级:限制部分用户的访问,保障其他用户
降级策略应提前设计,明确降级触发条件和降级后的行为。
8.2 自动降级触发条件¶
自动降级通常基于以下触发条件:
- 系统负载过高(CPU、内存使用率高)
- 服务响应时间过长
- 错误率超过阈值
- 依赖服务不可用
- 流量超过预设阈值
package main
import (
"fmt"
"sync/atomic"
"time"
)
// SystemMetrics 系统指标
type SystemMetrics struct {
CPUUsage float64 // CPU使用率(0-1)
MemoryUsage float64 // 内存使用率(0-1)
RequestRate float64 // 请求率(请求/秒)
ErrorRate float64 // 错误率(0-1)
Latency time.Duration // 平均延迟
}
// DegradationPolicy 降级策略
type DegradationPolicy struct {
CPULimit float64 // CPU阈值
MemoryLimit float64 // 内存阈值
ErrorLimit float64 // 错误率阈值
LatencyLimit time.Duration // 延迟阈值
RequestLimit float64 // 请求率阈值
}
// DegradationLevel 降级级别
type DegradationLevel int
const (
DegradationNone DegradationLevel = iota
DegradationLight
DegradationMedium
DegradationHeavy
)
// DegradationManager 降级管理器
type DegradationManager struct {
policy DegradationPolicy
currentLevel DegradationLevel
metrics SystemMetrics
enabled uint32 // 原子布尔值,1表示启用
mutex sync.Mutex
}
// NewDegradationManager 创建降级管理器
func NewDegradationManager(policy DegradationPolicy) *DegradationManager {
return &DegradationManager{
policy: policy,
currentLevel: DegradationNone,
enabled: 1, // 默认启用
}
}
// SetEnabled 设置是否启用降级
func (dm *DegradationManager) SetEnabled(enabled bool) {
if enabled {
atomic.StoreUint32(&dm.enabled, 1)
} else {
atomic.StoreUint32(&dm.enabled, 0)
dm.mutex.Lock()
dm.currentLevel = DegradationNone
dm.mutex.Unlock()
}
}
// UpdateMetrics 更新系统指标
func (dm *DegradationManager) UpdateMetrics(metrics SystemMetrics) {
dm.mutex.Lock()
dm.metrics = metrics
dm.mutex.Unlock()
// 检查是否需要调整降级级别
if atomic.LoadUint32(&dm.enabled) == 1 {
dm.evaluateDegradationLevel()
}
}
// evaluateDegradationLevel 评估降级级别
func (dm *DegradationManager) evaluateDegradationLevel() {
dm.mutex.Lock()
defer dm.mutex.Unlock()
// 计算触发的条件数量
triggered := 0
if dm.metrics.CPUUsage > dm.policy.CPULimit {
triggered++
}
if dm.metrics.MemoryUsage > dm.policy.MemoryLimit {
triggered++
}
if dm.metrics.ErrorRate > dm.policy.ErrorLimit {
triggered++
}
if dm.metrics.Latency > dm.policy.LatencyLimit {
triggered++
}
if dm.metrics.RequestRate > dm.policy.RequestLimit {
triggered++
}
// 根据触发的条件数量确定降级级别
var newLevel DegradationLevel
switch {
case triggered == 0:
newLevel = DegradationNone
case triggered == 1 || triggered == 2:
newLevel = DegradationLight
case triggered == 3 || triggered == 4:
newLevel = DegradationMedium
case triggered >= 5:
newLevel = DegradationHeavy
}
// 如果级别变化,输出日志
if newLevel != dm.currentLevel {
dm.currentLevel = newLevel
fmt.Printf("Degradation level changed to: %v\n", newLevel)
}
}
// GetCurrentLevel 获取当前降级级别
func (dm *DegradationManager) GetCurrentLevel() DegradationLevel {
dm.mutex.Lock()
defer dm.mutex.Unlock()
return dm.currentLevel
}
// String 降级级别字符串表示
func (l DegradationLevel) String() string {
switch l {
case DegradationNone:
return "None"
case DegradationLight:
return "Light"
case DegradationMedium:
return "Medium"
case DegradationHeavy:
return "Heavy"
default:
return "Unknown"
}
}
// 模拟服务处理,根据降级级别调整行为
func handleRequest(dm *DegradationManager, requestID string) {
level := dm.GetCurrentLevel()
fmt.Printf("Handling request %s with degradation level: %s\n", requestID, level)
switch level {
case DegradationNone:
// 正常处理,提供完整功能
time.Sleep(100 * time.Millisecond)
fmt.Printf("Request %s processed with full functionality\n", requestID)
case DegradationLight:
// 轻度降级,关闭部分非核心功能
time.Sleep(70 * time.Millisecond)
fmt.Printf("Request %s processed with light degradation\n", requestID)
case DegradationMedium:
// 中度降级,使用缓存数据,关闭更多功能
time.Sleep(40 * time.Millisecond)
fmt.Printf("Request %s processed with medium degradation\n", requestID)
case DegradationHeavy:
// 重度降级,只提供核心功能,返回简化结果
time.Sleep(20 * time.Millisecond)
fmt.Printf("Request %s processed with heavy degradation\n", requestID)
}
}
// 模拟系统指标变化
func simulateSystemMetrics(dm *DegradationManager) {
// 初始指标正常
metrics := SystemMetrics{
CPUUsage: 0.3,
MemoryUsage: 0.4,
RequestRate: 50,
ErrorRate: 0.05,
Latency: 80 * time.Millisecond,
}
// 前10秒指标逐渐变差
for i := 0; i < 10; i++ {
metrics.CPUUsage += 0.05
metrics.MemoryUsage += 0.04
metrics.RequestRate += 10
metrics.ErrorRate += 0.02
metrics.Latency += 5 * time.Millisecond
dm.UpdateMetrics(metrics)
time.Sleep(1 * time.Second)
}
// 接下来10秒指标逐渐恢复
for i := 0; i < 10; i++ {
metrics.CPUUsage -= 0.07
if metrics.CPUUsage < 0.3 {
metrics.CPUUsage = 0.3
}
metrics.MemoryUsage -= 0.05
if metrics.MemoryUsage < 0.4 {
metrics.MemoryUsage = 0.4
}
metrics.RequestRate -= 12
if metrics.RequestRate < 50 {
metrics.RequestRate = 50
}
metrics.ErrorRate -= 0.025
if metrics.ErrorRate < 0.05 {
metrics.ErrorRate = 0.05
}
metrics.Latency -= 7 * time.Millisecond
if metrics.Latency < 80*time.Millisecond {
metrics.Latency = 80 * time.Millisecond
}
dm.UpdateMetrics(metrics)
time.Sleep(1 * time.Second)
}
}
// 示例使用
func main() {
// 创建降级策略
policy := DegradationPolicy{
CPULimit: 0.7,
MemoryLimit: 0.8,
ErrorLimit: 0.2,
LatencyLimit: 150 * time.Millisecond,
RequestLimit: 100,
}
// 创建降级管理器
dm := NewDegradationManager(policy)
// 启动指标模拟
go simulateSystemMetrics(dm)
// 模拟请求处理
for i := 0; i < 50; i++ {
go handleRequest(dm, fmt.Sprintf("REQ-%03d", i+1))
time.Sleep(200 * time.Millisecond)
}
// 等待所有请求处理完成
time.Sleep(25 * time.Second)
}
8.3 手动降级控制¶
除了自动降级,还需要支持手动降级,在特殊情况下进行干预:
package main
import (
"encoding/json"
"fmt"
"net/http"
"sync"
)
// Feature 系统功能
type Feature string
const (
FeatureRecommendations Feature = "recommendations"
FeatureAnalytics Feature = "analytics"
FeaturePersonalization Feature = "personalization"
FeatureSearchFilters Feature = "search_filters"
FeatureFullTextSearch Feature = "full_text_search"
)
// ManualDegradationManager 手动降级管理器
type ManualDegradationManager struct {
disabledFeatures map[Feature]bool
mutex sync.Mutex
}
// NewManualDegradationManager 创建手动降级管理器
func NewManualDegradationManager() *ManualDegradationManager {
return &ManualDegradationManager{
disabledFeatures: make(map[Feature]bool),
}
}
// DisableFeature 禁用功能
func (m *ManualDegradationManager) DisableFeature(feature Feature) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.disabledFeatures[feature] = true
fmt.Printf("Feature %s disabled\n", feature)
}
// EnableFeature 启用功能
func (m *ManualDegradationManager) EnableFeature(feature Feature) {
m.mutex.Lock()
defer m.mutex.Unlock()
delete(m.disabledFeatures, feature)
fmt.Printf("Feature %s enabled\n", feature)
}
// IsFeatureEnabled 检查功能是否启用
func (m *ManualDegradationManager) IsFeatureEnabled(feature Feature) bool {
m.mutex.Lock()
defer m.mutex.Unlock()
return !m.disabledFeatures[feature]
}
// GetDisabledFeatures 获取所有禁用的功能
func (m *ManualDegradationManager) GetDisabledFeatures() []Feature {
m.mutex.Lock()
defer m.mutex.Unlock()
disabled := make([]Feature, 0, len(m.disabledFeatures))
for feature, disabled := range m.disabledFeatures {
if disabled {
disabled = append(disabled, feature)
}
}
return disabled
}
// 模拟搜索服务,使用降级管理器
func searchService(query string, dm *ManualDegradationManager) map[string]interface{} {
result := map[string]interface{}{
"query": query,
"results": []string{
fmt.Sprintf("Result 1 for '%s'", query),
fmt.Sprintf("Result 2 for '%s'", query),
fmt.Sprintf("Result 3 for '%s'", query),
},
}
// 如果全文搜索功能启用,添加更多结果
if dm.IsFeatureEnabled(FeatureFullTextSearch) {
result["results"] = append(result["results"].([]string),
fmt.Sprintf("Full text result 1 for '%s'", query),
fmt.Sprintf("Full text result 2 for '%s'", query),
)
}
// 如果搜索过滤功能启用,添加过滤选项
if dm.IsFeatureEnabled(FeatureSearchFilters) {
result["filters"] = []string{"price: low to high", "rating: 4+ stars"}
}
// 如果推荐功能启用,添加推荐内容
if dm.IsFeatureEnabled(FeatureRecommendations) {
result["recommendations"] = []string{
fmt.Sprintf("Recommended for '%s'", query),
}
}
// 如果个性化功能启用,添加个性化结果
if dm.IsFeatureEnabled(FeaturePersonalization) {
result["personalized"] = true
}
return result
}
// 启动HTTP API用于手动控制降级
func startControlAPI(dm *ManualDegradationManager) {
// 获取当前降级状态
http.HandleFunc("/degradation/status", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"disabled_features": dm.GetDisabledFeatures(),
})
})
// 禁用功能
http.HandleFunc("/degradation/disable", func(w http.ResponseWriter, r *http.Request) {
feature := Feature(r.URL.Query().Get("feature"))
dm.DisableFeature(feature)
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Feature %s disabled", feature)
})
// 启用功能
http.HandleFunc("/degradation/enable", func(w http.ResponseWriter, r *http.Request) {
feature := Feature(r.URL.Query().Get("feature"))
dm.EnableFeature(feature)
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Feature %s enabled", feature)
})
// 搜索API
http.HandleFunc("/search", func(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query().Get("q")
if query == "" {
http.Error(w, "Missing query parameter 'q'", http.StatusBadRequest)
return
}
result := searchService(query, dm)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(result)
})
go func() {
fmt.Println("Starting control API on :8080")
http.ListenAndServe(":8080", nil)
}()
}
// 示例使用
func main() {
// 创建手动降级管理器
dm := NewManualDegradationManager()
// 启动控制API
startControlAPI(dm)
// 模拟一些搜索请求
queries := []string{"golang", "microservices", "distributed systems", "service governance"}
for i, query := range queries {
fmt.Printf("\nSimulating search %d: %s\n", i+1, query)
result := searchService(query, dm)
fmt.Printf("Search result: %+v\n", result)
time.Sleep(2 * time.Second)
}
fmt.Println("\nAPI server running. You can use the following endpoints:")
fmt.Println("GET /search?q=query - Perform a search")
fmt.Println("GET /degradation/status - Check degradation status")
fmt.Println("GET /degradation/disable?feature=feature_name - Disable a feature")
fmt.Println("GET /degradation/enable?feature=feature_name - Enable a feature")
fmt.Println("Features: recommendations, analytics, personalization, search_filters, full_text_search")
fmt.Println("Press Ctrl+C to exit")
// 保持程序运行
select {}
}
8.4 降级后的用户体验¶
服务降级时应尽量减少对用户体验的影响:
- 清晰告知用户部分功能不可用
- 提供替代方案或简化版功能
- 保持核心功能可用且响应迅速
- 记录降级期间的用户操作,恢复后可补偿
- 避免频繁切换降级状态