分布式系统(十)

服务熔断的具体表现形式是熔断器。那么,熔断器的基本结构是怎么样的?它又是如何实现的呢?

问题背景

熔断(Circuit Breaker)是很实用的一种容错机制。一方面在于熔断机制非常重要,另一方面也可以通过这个问题很好地区分候选人的知识广度和深度

从面试角度讲,关于熔断机制有很多种具体的问法,面试官往往会采用这样的方式来考查候选人:

  • 熔断器的状态有哪些?相互之间是如何转换的?
  • 在系统运行时,如何有效地收集和统计运行时数据?
  • 你使用过的熔断器有哪些?它的基本原理是怎么样的?
  • 如果让你来设计并实现一个简单的熔断机制,你会怎么做?

上述问题虽然各有侧重点,但对于熔断机制而言,考查的重点还是候选人在工程实践的基础上所掌握的技术原理。

问题分析

服务熔断-阈值

通过上述分析,我们可以梳理出如下技术上的要点。

  • 状态性:通过合理的状态切换来控制是否对请求进行熔断。
  • 运行时数据:收集服务运行时数据并进行统计分析,为阈值判断提供依据。
  • 阈值控制:各个状态切换的控制开关。

通过对上述要点的分析,这道题的回答思路就有了,而回答好这道题的关键就是要对上述要点背后的技术原理进行系统的阐述。接下来,让我们来对具体的技术体系展开讨论。

技术体系

基于熔断器的设计理念,我们对熔断过程进行进行抽象和提炼,可以得到如下图所示的熔断器基本结构。

可以看到,该状态机包含三个状态,即 Closed(关闭)、Open(打开)和 Half-Open(半开)。

  • Closed。这是熔断器的默认状态。
    • 在该状态下,相当于熔断器没有发挥任何效果,所有的请求可以得到正常的响应。
    • 但是,在熔断器内部还是会对所有的调用过程进行监控,如果有异常发生则会进行不断累加。
  • Open。这是熔断器的打开状态。
    • 在该状态下,相当于熔断器对所有的请求进行了隔断,来自服务消费者的请求不会触达到服务的提供者。
    • 同时,在熔断器内部也会启动一个计时器,当处于该状态达到一定时间时,熔断器会进入到半开状态。
  • Half-Open。这是熔断器的半开状态。
    • 所谓半开,指的是熔断器会允许一部分请求通过,然后再对这些请求的响应结果进行统计。
    • 如果这些请求的成功比例达到一定的阈值,则会把熔断器设回到关闭状态,反之则进入打开状态。

我们讨论如何自己实现一个包含这三个状态的熔断器。

首先,我们需要定义一个枚举类 State,代码如下:

1
2
3
4
5
public enum State {
CLOSED,
OPEN,
HALF_OPEN
}

然后,我们定义代表熔断器的 CircuitBreaker 接口,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
public interface CircuitBreaker {
// 成功的响应,会对熔断器状态进行重置
void recordSuccess();
// 失败的响应,根据需要对状态进行设置
void recordFailure(String response);
// 获取熔断器的当前状态
String getState();
// 设置熔断器状态
void setState(State state);
// 对远程服务发起请求
String attemptRequest() throws RemoteServiceException;
}

对于 CircuitBreaker 接口的实现过程是我们讨论的重点。为了实现对熔断器状态的合理控制,我们首先需要定义一系列的中间变量,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//远程调用的超时时间	
private final long timeout;
//重试时间间隔
private final long retryTimePeriod;
//最近一次的调用失败时间
long lastFailureTime;
//最近一次的失败响应结果
private String lastFailureResponse;
//累计失败次数
int failureCount;
//失败率阈值
private final int failureThreshold;
//熔断器状态
private State state;
//类似无限大的一个未来时间
private final long futureTime = 1000000000000;

当我们发起一个远程调用时,我们需要基于调用结果合理设置熔断器的状态,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public String attemptRequest() throws RemoteServiceException {
evaluateState();
if (state == State.OPEN) {
//如果熔断式处于打开状态,就直接返回失败结果
return this.lastFailureResponse;
} else {
try {
//执行远程调用
var response = service.call();
//远程调用成功
recordSuccess();
return response;
} catch (RemoteServiceException ex) {
//远程调用失败
recordFailure(ex.getMessage());
throw ex;
}
}
}

我们继续来实现当调用成功和失败时,对应的 recordSuccess 和 recordFailure 方法的执行逻辑,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
//调用成功,失败次数清零,最近失败时间设置成无限大,并把熔断器状态设置成关闭
public void recordSuccess() {
this.failureCount = 0;
this.lastFailureTime = System.nanoTime() + futureTime;
this.state = State.CLOSED;
}

//调用失败,失败次数加1,最近失败时间设置成当前时间,并把失败结果设置成最近一次调用失败的响应
public void recordFailure(String response) {
failureCount = failureCount + 1;
this.lastFailureTime = System.nanoTime();
this.lastFailureResponse = response;
}

当系统运行一段时间之后,我们如何来获取熔断器的当前状态呢?可以实现如下所示的 getState 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public String getState() {
//如果失败次数大于阈值
if (failureCount >= failureThreshold) {
if ((System.nanoTime() - lastFailureTime) > retryTimePeriod) {
//如果失败的累计时间已经超过了所允许的重试时间间隔,状态即为半熔断
state = State.HALF_OPEN;
} else {
//反之,熔断器应该为开启状态
state = State.OPEN;
}
} else {
//熔断器为打开状态
state = State.CLOSED;
}
return state.name();
}

对应的,如果我们想要直接对熔断器状态进行设置,可以使用如下所示的 setState 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void setState(State state) {
this.state = state;
switch (state) {
case OPEN://设置开启状态
this.failureCount = failureThreshold;
this.lastFailureTime = System.nanoTime();
break;
case HALF_OPEN://设置半开状态
this.failureCount = failureThreshold;
this.lastFailureTime = System.nanoTime() - retryTimePeriod;
break;
default://默认为关闭状态
this.failureCount = 0;
}
}

可以看到,这里我们也只是通过对熔断器的几个核心变量设置合适的值来更新熔断器的最新状态。

以上代码示例可以帮助你更好地理解一个熔断器的内部实现原理,也可以帮助你回答类似“如果让你来实现一个简单的熔断机制,你会怎么做?”这种开放式问题。

但是,这个毕竟只是一个示例。要想完整理解熔断器的实现过程,还是需要我们来对主流分布式服务框架中的原理展开解析。

源码解析

在 Spring Cloud 中,专门为开发人员提供了具备服务熔断功能的Spring Cloud Circuit Breaker 组件。

从命名上看,Spring Cloud Circuit Breaker 是对熔断器的一种抽象,支持不同的熔断器实现方案。在 Spring Cloud Circuit Breaker 中,内置了四种熔断器,如下图所示:

上图中的

  • Netflix Hystrix 显然来自于 Netflix OSS
  • Resilience4j 是受 Hystrix 项目启发所诞生的一款新型的容错库
  • Sentinel 从定位上讲是一款包含了熔断降级功能的高可用流量防护组件
  • Spring Retry 是 Spring 自研的重试和熔断框架

针对以上四种熔断器,Spring Cloud Circuit Breaker 提供了统一的 API。

Spring Cloud Circuit Breaker 组成结构

为了在应用程序中创建一个熔断器,我们可以使用 Spring Cloud Circuit Breaker 中的工厂类 CircuitBreakerFactory,该工厂类的定义如下:

1
2
3
public abstract class CircuitBreakerFactory<CONF, CONFB extends ConfigBuilder<CONF>> extends AbstractCircuitBreakerFactory<CONF, CONFB> {
public abstract CircuitBreaker create(String id);
}

可以看到这是一个抽象类,只有一个 create 方法用来创建 CircuitBreakerCircuitBreaker 是一个接口,约定了熔断器应该具有的功能,该接口定义如下所示:

1
2
3
4
5
6
7
8
9
public interface CircuitBreaker {
default <T> T run(Supplier<T> toRun) {
return run(toRun, throwable -> {
throw new NoFallbackAvailableException("No fallback available.", throwable);
});
};

<T> T run(Supplier<T> toRun, Function<Throwable, T> fallback);
}

这里用到了函数式编程的一些语法,但我们从方法定义上还是可以明显看出包含了 runfallback 这两个方法。其中的 Supplier 包含了你希望运行在熔断器中的业务代码,而 Function 则代表着回退(Fallback)方法。如果你熟悉 Netflix Hystrix 中的 HystrixCommand,你会发现两者之间存在明显的对应关系。

在 Spring Cloud Circuit Breaker 中,分别针对 Hystrix、Resilience4j、Sentinel 和 Spring Retry 这四款框架提供了 CircuitBreakerFactory 抽象类的子类。

一旦在代码工程的类路径中添加了相关的 starter,系统就会自动创建 CircuitBreaker。也就是说 CircuitBreakerFactory.create 方法会实例化对应框架的一个 CircuitBreaker 实例。

在引入具体的开发框架之后,下一步工作就是对它们进行配置。在 CircuitBreakerFactory 的父类 AbstractCircuitBreakerFactory 中,我们发现了如下所示的两个抽象方法:

1
2
3
4
5
//针对某一个id创建配置构造器
protected abstract CONFB configBuilder(String id);

//为熔断器配置默认属性
public abstract void configureDefault(Function<String, CONF> defaultConfiguration);

这里用到了大量的泛型定义,我们可以猜想,在这两个抽象方法的背后,Spring Cloud Circuit Breaker 会针对不同的第三方框架提供不同的配置实现过程。我们在接下来的内容中会基于具体的框架对这一过程做展开讨论。

让我们来到 Hystrix 框架,来看看在 Spring Cloud Circuit Breaker 中是如何使用统一编程模式完成对该框架的集成。我们首先关注实现了 CircuitBreaker 接口的 HystrixCircuitBreaker 类,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class HystrixCircuitBreaker implements CircuitBreaker {
private HystrixCommand.Setter setter;

public HystrixCircuitBreaker(HystrixCommand.Setter setter) {
this.setter = setter;
}

@Override
public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) {
HystrixCommand<T> command = new HystrixCommand<T>(setter) {
@Override
protected T run() throws Exception {
return toRun.get();
}

@Override
protected T getFallback() {
return fallback.apply(getExecutionException());
}
};
return command.execute();
}
}

不难想象,这里应该构建了一个 HystrixCommand 对象,并在该对象原有的 rungetFallback 方法中封装了 CircuitBreaker 中的统一方法调用,而最终实现熔断操作的还是 Hystrix 原生的 HystrixCommand

然后,我们接着来看 HystrixCircuitBreakerFactory,这个类的实现过程也简洁明了,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class HystrixCircuitBreakerFactory extends CircuitBreakerFactory<HystrixCommand.Setter, HystrixCircuitBreakerFactory.HystrixConfigBuilder> {
//实现默认配置
private Function<String, HystrixCommand.Setter> defaultConfiguration = id -> HystrixCommand.Setter.withGroupKey(
HystrixCommandGroupKey.Factory.asKey(getClass().getSimpleName())).andCommandKey(HystrixCommandKey.Factory.asKey(id));

public void configureDefault(Function<String, HystrixCommand.Setter> defaultConfiguration) {
this.defaultConfiguration = defaultConfiguration;
}

public HystrixConfigBuilder configBuilder(String id) {
return new HystrixConfigBuilder(id);
}

//创建熔断器
public HystrixCircuitBreaker create(String id) {
Assert.hasText(id, "A CircuitBreaker must have an id.");
HystrixCommand.Setter setter = getConfigurations().computeIfAbsent(id, defaultConfiguration);
return new HystrixCircuitBreaker(setter);
}

//创建配置构造器
public static class HystrixConfigBuilder extends AbstractHystrixConfigBuilder<HystrixCommand.Setter> {
public HystrixConfigBuilder(String id) {
super(id);
}

@Override
public HystrixCommand.Setter build() {
return HystrixCommand.Setter.withGroupKey(getGroupKey()).andCommandKey(getCommandKey()).andCommandPropertiesDefaults(getCommandPropertiesSetter());
}
}
}

上述代码基本就是对原有 HystrixCommand 中关于服务分组等属性的简单封装,你可以结合接下来要介绍的 Hystrix 框架熔断机制内容做进一步理解。

Hystrix 熔断机制

在 Hystrix 中,最核心的就是 HystrixCircuitBreaker 接口,该接口代表了对熔断器的抽象过程,如下所示:

1
2
3
4
5
public interface HystrixCircuitBreaker {
public boolean allowRequest();
public boolean isOpen();
void markSuccess();
}

可以看到 HystrixCircuitBreaker 接口只有三个方法。在 Hystrix 中,该接口的实现类是 HystrixCircuitBreakerImpl

我们首先来看最重要的 allowRequest 方法,该方法用来判断每个请求是否可被执行allowRequest 实际上是对 isOpen 方法做了一层封装,在通过调用 isOpen 来触发熔断器的计算逻辑之前,先根据 HystrixCommandProperties 中的配置信息来判断是否强制开启熔断器,具体实现如下所示:

1
2
3
4
5
6
7
8
9
10
public boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
isOpen();
return true;
}
return !isOpen() || allowSingleTest();
}

接下来的 isOpen() 方法用来获取熔断器的当前状态。请注意,熔断器中关于阈值判断的一系列处理逻辑都位于该方法中,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean isOpen() {
if (circuitOpen.get()) {
return true;
}

HealthCounts health = metrics.getHealthCounts();
// 检查是否达到最小请求数,如果未达到的话即使请求全部失败也不会熔断
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
return false;
}

// 检查错误百分比是否达到设定的阀值,如果未达到的话也不会熔断
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// 如果错误率过高, 进行熔断,并记录下熔断时间
if (circuitOpen.compareAndSet(false, true)) {circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
return true;
}
}
}
}

最后,我们来到用来关闭熔断器markSuccess 方法。显然,该方法在熔断器处于半开状态下进行使用,我们可以通过该方法将熔断器设置为关闭状态。

这时候,熔断器要做的一件事情就是重置对请求的统计指标,如下所示:

1
2
3
4
5
6
7
public void markSuccess() {
if (circuitOpen.get()) {
if (circuitOpen.compareAndSet(true, false)) {
metrics.resetStream();
}
}
}

HystrixCircuitBreaker 接口的这三个方法的执行逻辑实际上都不复杂,HystrixCircuitBreaker 通过一个 circuitOpen 状态位控制着整个熔断判断流程,而这个状态位本身的状态值则取决于系统目前的运行时数据。

解题要点

从解题思路上讲,目前业界主流的熔断器实现模型基本都是类似的。就知识体系而言,要注意的是在熔断器内部实现过程中,并不是只有熔断和非熔断这两个简单状态,而是会引入一个半熔断的状态,通过对当前服务所处理请求的情况进行统计分析,熔断器会基于一定的阈值动态完成这些状态之间的自动切换。

另一方面,面试官在考查候选人能力的一大关注点还是实践能力,所以可以结合具体工具的使用过程来阐述熔断器的核心概念。