V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
xiaoxuz
V2EX  ›  分享发现

熔断-架构细碎设计系列(二)

  •  1
     
  •   xiaoxuz · 2021-06-19 02:09:34 +08:00 · 1592 次点击
    这是一个创建于 1043 天前的主题,其中的信息可能已经有所发展或是发生改变。

    What is 熔断 ?

    很多人问:熔断机制是什么?

    百科解释:

    熔断机制( Circuit Breaker ),也叫自动停盘机制,是指当[股指]( https://baike.baidu.com/item/股指 /3342555)[波幅]( https://baike.baidu.com/item/波幅 /6961924)达到规定的熔断点时,[交易所]( https://baike.baidu.com/item/交易所 /6148547)为控制风险采取的暂停交易措施。

    白话翻译:

    你拿 1000 块去打麻将,分分钟输的精光,这个时候要休战 10 分钟,给你时间去 ATM 又取了 1000 块,取完之后分 2 次每次只玩 500 块,如果连续两次都赢钱那么就可以继续玩耍,否则还需要休战、取钱、分批玩耍,如此循环。

    • 休战是为了防范恐慌情绪进一步扩散,影响你打牌的判断
    • 分批玩耍是为了循序渐进,避免 all in 输光

    在这里插入图片描述

    服务治理中的熔断机制:

    指的是在发起服务调用的时候,如果被调用方返回的错误率超过一定的阈值或触发某些特定策略,那么后续的请求将不会真正发起请求,而是在调用方直接返回错误。

    如果看过细碎设计系列上一篇文章:

    文章卡片地址

    有的同学应该会发现,熔断和限流貌似很像,但其实两者最大差别就是:

    • 限流是服务端根据其自身能力设置的一个过载保护对外

    • 熔断是调用端对自身的一个降级保护对内

    注意:能熔断的服务肯定不是核心链路上的必选服务。如果是的话,则服务如果超时或宕机,前台服务就无法使用了,这就不是熔断。所以,熔断其实也是一种降级方式。

    Why use 熔断?

    在微服务或普通系统架构间,服务和服务依赖很常见,如果服务端异常,调用端还是不断的请求或者重试,这样首先服务端很容易彻底打挂掉,并且调用端因为堆积了大量请求操作也可能导致宕机无法提供服务。

    在这里插入图片描述

    如下图:

    在这里插入图片描述

    • 服务初始1min 节点:服务 C 异常无响应,服务 B 继续不断重试
    • 服务初始2min 节点:因为服务 C 持续无响应,服务 B 不断重试,导致服务 B 线程池占用打满,服务 A 开始不断重试。
    • 服务初始3min 节点:服务 B 持续无响应,导致服务 A 不可用

    这就是未启动熔断策略导致的滚雪球服务雪崩


    熔断器如何设计?

    熔断器即为调用端服务端发起通信时对下游服务的服务质量进行监测策略熔断中间件

    如下图:

    在这里插入图片描述

    上游服务 A 向下游服务 B 发起通信时首先经过了 Breaker中间件的处理。

    如果按照上下游分层的话,由此可见:Breaker 属于上游服务 A,即说明了上文熔断是对调用端自身的一种保护。

    Breaker 熔断器主流程分为三步骤,BeforeCallAfter。下文讲诉熔断器构造时会详细描述。

    熔断器结构

    • 状态机
    • 滑动计数器
    • 运行三步骤

    状态机

    熔断器内部状态机有三种状态

    • Close 熔断器关闭

      调用方正常访问服务方

    • Open 熔断器开启

      熔断器阻断调用方对服务方的访问

    • Half Open熔断器半开

      释放调用方小流量访问服务方,检查服务方是否健康

    如下图:

    在这里插入图片描述

    Init -> Close 熔断器初始化为 Close 状态

    Close -> Open 服务方提供服务异常,熔断器由 Close 变为 Open

    服务异常的定位由上游服务自己定义,比如:

    • 服务方请求 Timeout
    • 服务方请求 Http Code 非 2xx
    • 业务自定义范围 errNo > 0

    熔断策略也是自定义,比如:

    • 请求错误数>N
    • 请求错误占比>N%
    • 连续请求错误数>N

    Open -> Half Open 熔断器度过冷却期,准备尝试恢复服务,状态变为 Half Open 。

    冷却期: 指当熔断器打开后, 在一段自定义的时间内拒绝任何服务。

    Half Open -> Open 在熔断器半开状态内,发现服务方异常,则熔断器再次 Open 。

    Half Open -> Close 当熔断器半开时间内,满足恢复条件,则熔断器变为 Close 。

    恢复条件为调用方自定义,比如:

    • 连续成功数>N
    • 连续成功请求占比 > N%

    滑动计数器

    熔断器的熔断和恢复策略都是基于请求计数,并且每一个滑动时间窗口都会存在一个计数器

    所以说:熔断策略是通过在某一个时间窗口内,计数器达到某一个阈值而触发。

    如下图:

    在这里插入图片描述

    TimeLine 的每一个节点为一个时间窗口,每一个时间窗口对应了一组计数器。

    注意

    窗口的滑动操作不仅有正向时间推移,状态机状态流转也会主动滑动窗口。

    运行三步骤

    上文有讲,熔断器运行机制主要分位三步骤:

    • Before

      状态机状态检查和流量拦截

    • Call

      代理请求目标服务方

    • After

      基于 Call 返回的 Response 进行计数器指标统计和状态更新


    源码 Demo 分析

    文章配源码,安排!

    在这里插入图片描述

    Demo 地址 : https://github.com/xiaoxuz/breaker

    Breaker 结构
    type Breaker struct {
    	m            sync.Mutex
    	state        State
    	Metrics      *Metrics
    	Strategy     *BreakStrategyConfig
    	HalfMaxCalls int64
    	OpenTime     time.Time
    	CoolingTime  time.Duration
    }
    
    • m 读写锁
    • state Breaker 状态
    • Metrics 计数器
    • Strategy 熔断策略
    • HalfMaxCalls 半开状态下最大请求次数,也是恢复服务的阈值
    • OpenTime 熔断器打开时间
    • CoolingTime 熔断器打开冷却时间
    Metrics 结构
    type Metrics struct {
    	MetricsID int64			// 计数器 ID
    	Win       *Window		// 滑动时间窗口
    	Norm      *Norm			// 指标统计
    }
    type Window struct {
    	Size      time.Duration	// 窗口大小
    	StartTime time.Time			// 窗口开启时间
    }
    type Norm struct {
    	AllCnt            int64	// 总请求数
    	SuccCnt           int64	// 成功数
    	FailCnt           int64 // 失败数
    	ContinuousSuccCnt int64 // 连续成功数
    	ContinuousFailCnt int64	// 连续失败数
    }
    

    计数器是由两部分组成:

    • *Window滑动时间窗口
    • *Norm指标统计
    Breaker 主流程
    // main
    func (b *Breaker) Call(f func() (interface{}, error)) (interface{}, error) {
    	// lock
    	b.m.Lock()
    	defer b.m.Unlock()
    
    	// 前置检查
    	if err := b.Before(); err != nil {
    		return nil, err
    	}
    
    	// call
    	b.Metrics.Call()
    	response, err := f()
    
    	// 后置处理
    	b.After(err == nil)
    
    	return response, nil
    }
    

    Sync.Mutex 读写锁控制并发,依次执行 Before -> Call.f() -> After

    Before 前置逻辑

    前置状态机状态检查和流量拦截

    具体如何进行检查和拦截的呢?先看代码:

    func (b *Breaker) Before() error {
    	now := time.Now()
    
    	switch b.state {
    	case STATE_OPEN:
    		// 如果超过冷却期,则调整为半开状态
    		if b.OpenTime.Add(b.CoolingTime).Before(now) {
    			b.Change(STATE_HALFOPEN, now)
    			return nil
    		}
    		// 如果未过冷却期则拒绝服务
    		return ERR_SERVICE_BREAK
    		break
    	case STATE_HALFOPEN:
    		// 如果请求数超过半开上限,则拒绝服务
    		if b.Metrics.Norm.AllCnt >= b.HalfMaxCalls {
    			return ERR_SERVICE_BREAK_HALFOPEN
    		}
    		break
    	//case STATE_CLOSED:
    	default:
    		// 如果时间窗口开始时间小于当前时间,则属于执行滑动窗口
    		if b.Metrics.Win.StartTime.Before(now) {
    			b.Metrics.Restart(now.Add(b.Metrics.Win.Size))
    		}
    		return nil
    	}
    	return nil
    }
    

    判断当前状态:

    • 打开状态

      判断是否度过冷却期,如果为 true,则调整为半开模式。否则拒绝服务,返回errors.New("service break")

    • 半开状态

      如果请求数超过半开上限,则拒绝服务

    • 关闭状态

      判断是否需要滑动窗口

    Call 目标服务

    只有在 Before前置检查通过后,才能代理执行服务请求。

    b.Metrics.Call()当前计数器执行Norm.AllCnt++

    After 后置逻辑
    func (b *Breaker) After(response bool) error {
    	// 请求成功
    	if true == response {
    		// Succ 计数+1
    		b.Metrics.Succ()
    
    		// 如果当前熔断器为半开状态,并且连续成功数达到阈值,那么状态机需要流转到关闭状态
    		if b.state == STATE_HALFOPEN && b.Metrics.Norm.ContinuousSuccCnt >= b.HalfMaxCalls {
    			b.Change(STATE_CLOSED, time.Now())
    		}
    	} else {
    		// Fail 计数+1
    		b.Metrics.Fail()
    
    		// 如果当前熔断器为半开状态,那么状态机需要流转到开启状态
    		if b.state == STATE_HALFOPEN {
    			b.Change(STATE_OPEN, time.Now())
    		}
    
    		// 如果当前熔断器为关闭状态,那么基于熔断策略判断是否要流转状态
    		if b.state == STATE_CLOSED {
    			if b.Strategy.Factory().Adapter(b.Metrics) {
    				b.Change(STATE_OPEN, time.Now())
    			}
    		}
    	}
    	return nil
    }
    

    入参 response bool为请求目标服务是否异常。

    请求成功

    b.Metrics.Succ()当前计数器执行

    func (m *Metrics) Succ() {
    	m.Norm.SuccCnt++
    	m.Norm.ContinuousSuccCnt++
    	m.Norm.ContinuousFailCnt = 0
    }
    

    注意这里要将ContinuousFailCnt连续失败数清 0

    这时不同状态决策不一样:

    • Open 状态,不可能走到这个逻辑

    • Close 状态,正常记录SuccCnt++

    • Half Open 状态时,需要判断是否可以关闭 Breaker,恢复服务。

      Demo 源码使用的恢复策略为连续成功数必须达到配置的最大半开流量数

      b.Metrics.Norm.ContinuousSuccCnt >= b.HalfMaxCalls

      不过这块不是绝对的,可以自有发挥~

    请求失败

    b.Metrics.Fail()当前计数器执行

    func (m *Metrics) Fail() {
    	m.Norm.FailCnt++
    	m.Norm.ContinuousFailCnt++
    	m.Norm.ContinuousSuccCnt = 0
    }
    

    注意这里要将ContinuousSuccCnt连续成功数清 0

    这是也要考虑状态流转的情况:

    • Open 状态,正常记录 FailCnt++就好了

    • Half Open 状态,状态机需要立即流转到 Open开启状态

    • Close 状态,基于熔断策略判断是否要流转为 Open 状态

      这里的 Demo 针对熔断策略做了简单的工厂模式调用

      // 熔断策略接口
      type BreakStrategyFunc interface {
      	Adapter(metrics *Metrics) bool // 每个熔断策略都需要实现 Adapter 策略适配方法
      }
      
      // 工厂
      func (bsc BreakStrategyConfig) Factory() BreakStrategyFunc {
      	switch bsc.BreakStrategy {
      	case BREAK_STRATEGY_FAILCNT:
      		return &BsFailCnt{&bsc}
      		break
      	case BREAK_STRATEGY_CONTINIUOUSFAILCNT:
      		return &BsContinuousFailCnt{&bsc}
      		break
      	case BREAK_STRATEGY_FAILRATE:
      		return &BsFailRate{&bsc}
      		break
      	default:
      		panic(fmt.Sprintf("unknown break strategy : %d", bsc.BreakStrategy))
      	}
      	return nil
      }
      

      目前有三个策略:

      • 根据错误计数,如果一个时间窗口期内失败数 >= N 次,开启熔断。

        func (bs *BsFailCnt) Adapter(metrics *Metrics) bool {
        	return metrics.Norm.FailCnt >= bs.FailCntThreshold
        }
        
      • 根据连续错误计数,一个时间窗口期内连续失败 >=N 次,开启熔断。

        func (bs *BsContinuousFailCnt) Adapter(metrics *Metrics) bool {
        	return metrics.Norm.ContinuousFailCnt >= bs.ContinuousFailCntThreshold
        }
        
      • 根据错误比例,一个时间窗口期内错误占比 >= N%,开启熔断。

        func (bs *BsFailRate) Adapter(metrics *Metrics) bool {
        	rate := float64(metrics.Norm.FailCnt / metrics.Norm.AllCnt)
        	return rate >= bs.FailRate
        }
        
    状态流转的细节操作
    // 状态流转
    func (b *Breaker) Change(state State, now time.Time) {
    	// 切换状态
    	switch state {
    	case STATE_OPEN:
    		b.OpenTime = now // 更新熔断器打开时间
    		b.state = state
    		// 新窗口时间为增加冷却时间之后
    		now = now.Add(b.CoolingTime)
    		break
    	case STATE_HALFOPEN:
    		b.state = state
    		now = time.Time{}
    	case STATE_CLOSED:
    		b.state = state
    		// 新窗口时间
    		now = now.Add(b.Metrics.Win.Size)
    	case b.state:
    		return
    	default:
    		return
    	}
    
    	// 重启计数器
    	b.Metrics.Restart(now)
    }
    

    首先保持只要状态流转就要滑动窗口的原则,执行b.Metrics.Restart(now)。代码中为重启计数器,其实做了如下滑动窗口重置统计指标的操作。

    其次不同状态,细节逻辑也不同:

    • Open 更新容器打开时间,并且新窗口开始时间为now.Add(b.CoolingTime 冷却时间)
    • Half Open 没有其他行为
    • Close 滑动窗口时间增加窗口间隔now.Add(b.Metrics.Win.Size)
    Go Test
    breaker := NewBreaker(Config{
    		HalfMaxCalls: 3,
    		WindowSize:   2 * time.Second,
    		Strategy: &BreakStrategyConfig{
    			BreakStrategy:    BREAK_STRATEGY_FAILCNT,
    			FailCntThreshold: 1,
    		},
    		CoolingTime: 5 * time.Second,
    	})
    	var succHandler = func(cnt int) {
    		for i := 0; i < cnt; i++ {
    			if _, err := breaker.Call(func() (i interface{}, err error) {
    				return nil, nil
    			}); err != nil {
    				fmt.Printf("[%s] SuccCall - %s state:%s \n", time.Now().Format("2006-01-02 15:04:05"), err.Error(), breaker.state.Name())
    			} else {
    				fmt.Printf("[%s] SuccCall - service is ok  state:%s \n", time.Now().Format("2006-01-02 15:04:05"), breaker.state.Name())
    			}
    			time.Sleep(1 * time.Second)
    		}
    	}
    	var failHandler = func(cnt int) {
    		for i := 0; i < cnt; i++ {
    			if _, err := breaker.Call(func() (i interface{}, err error) {
    				return nil, errors.New("test err")
    			}); err != nil {
    				fmt.Printf("[%s] FailCall - %s state:%s \n", time.Now().Format("2006-01-02 15:04:05"), err.Error(), breaker.state.Name())
    			} else {
    				fmt.Printf("[%s] FailCall - service is ok  state:%s \n", time.Now().Format("2006-01-02 15:04:05"), breaker.state.Name())
    			}
    			time.Sleep(1 * time.Second)
    		}
    	}
      
      // 测试次数顺序
    	succHandler(5) // succ 5 次
    	failHandler(5) // fail 5 次
    	succHandler(2) // succ 2 次
    	failHandler(1) // 1 次
    	succHandler(10)// succ 10 次
    
    	t.Log("Done")
    

    NewBreaker 的配置:半开上限 3 个请求、时间窗口大小 2s 、冷却期 5s 、熔断策略采用错误数达到 1 个。

    succHandler 和 failHandler 分别是请求成功、失败的方法。每次请求 Sleep 1s 。

    Test Result:

    在这里插入图片描述

    源码地址

    Demo 地址 : https://github.com/xiaoxuz/breaker

    收工

    打完收工,感谢支持!

    在这里插入图片描述

    xiaoxuz
        1
    xiaoxuz  
    OP
       2021-06-20 00:55:37 +08:00
    坚持沉淀知识点,持续输出,看到评论的你,感谢关注公众号!
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   2677 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 28ms · UTC 04:56 · PVG 12:56 · LAX 21:56 · JFK 00:56
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.