SpringCloud-Hystrix

Spring Cloud Hystrix 实现了断路器,线程隔离等一系列服务保护功能. 它也是基于Netfix的开源框架Hystrix实现的, 该框架的目标在于通过控制那些访问远程系统, 服务和第三方库的节点, 从而对延迟和故障提供更强大的容错能力. Hystrix具备服务降级,服务熔断,线程信号隔离,请求缓冲,请求合并以及服务监控等强大功能.

容错手段

出现的问题

服务提供者如果响应非常缓慢, 导致了用户的请求被强制等待, 直到提供者能响应或者超时. 在高负载的情况下, 如果没有任何处理, 会导致服务提供者的资源耗尽甚至整个系统崩溃. 例如电商网站如果没有处理这种情况, 在双11期间有过多的并发请求, 导致用户支付的请求延迟很久没有响应, 等待很长时间就会超时失败, 支付失败又会导致用户频繁的刷新支付页面并尝试重新支付, 此时服务已经没有多余的资源了, 这样更加重了服务器的负载, 最终系统被压挂了.

微服务架构下的系统会有多个服务共同支持, 每个服务负责不同的功能, 服务之间需要进行网络通信, 因此会出现嵌套调用, 网络通信有些情况是脆弱的, 请求超时无法避免. 此时如果某个基础服务A出现问题, 那么他的调用方B会调不通, 导致流量阻塞在B服务, 最终B服务也会出现问题, 然后导致B的调用方C出现问题, 这种问题逐渐放大的过程叫做雪崩效应.

如何解决

  1. 为网络请求设置超时时间
    调用网络请求会对应一个线程去处理这个请求, 就会占用某些资源, 资源一直不释放最终被耗尽, 导致服务不可用. 设置超时时间就是设置了最大可占用的时间, 超过这个时间就强制释放.
  2. 使用断路器模式
    断路器模拟了家中使用的电力保险, 电流过大会导致保险熔断, 保护电器不会被大电流烧毁. 服务中对断路器的应用也一样, 当调用量过大时, 可以强制让某些调用不进行, 当自动诊断出服务已经恢复了, 断路器再运行调用. 服务中的断路器需要比现实的电力断路器更加智能, 它需要在某些情况下自动断开, 需要在某些情况下恢复, 断开情况下需要走失败的处理逻辑, 它的断开不是真正意义上的断开, 而是允许少量的请求可以通过. 这些某个框架可以做到——Spring Cloud Hystrix;

Spring Cloud Hystrix

Hystrix 是豪猪的意思. 豪猪是一种哺乳动物,全身是刺用以更好的保护自己. 下面是hystrix项目介绍图.

hystrix

简介

Hystrix是由Netfilx开源的一个延迟和容错库, 用于隔离访问远程服务, 防止出现级联失败, 从而提高系统的可用性和容错性. 主要有一下几点:

  • 包裹请求: 使用HystrixCommand或者HystrixObservableCommand包裹对依赖服务的调用, 每个命令启动一个线程去执行, 使用到了命令模式.
  • 跳闸模式: 某个服务的错误率超过阈值时, 会自动or手动跳闸, 停止请求该服务一段时间.
  • 资源隔离: 为每个依赖服务都提供了小的线程池(信号量), 线程池(信号量)被耗尽, 请求将会直接被拒绝, 加快响应.
  • 监控: 提供了监控api, 可以实时的看到服务依赖调用的状态.
  • 回退机制: 当被拒绝访问依赖服务的时候, 开发者可以指定一个默认的返回, 例如请求某个列表失败了, 默认会返回一个空的列表, 而不是报错.
  • 自我恢复: 断路一段时间后, 它会进入半开模式, 来判断是否已经恢复.

工作流程

hystrix-command-flow-chart.png

第一步:实例化一个命令对象

首先会构造一个HystrixCommandorHystrixObservableCommand对象, 是对服务调用操作的封装. 从名字上看是使用了命令模式的设计模式. 这两个对象分别对应两个场景.

  • HystrixCommand: 用在依赖的服务返回单个操作结果的时候
  • HystrixObservableCommand: 用在依赖的服务返回多个操作结果的时候
设计模式-命令模式

第二步:请求执行命令

图中可以看到4种命令的执行方式, 其中HystrixCommand有下面两种执行方式.

  • execute(): 同步执行
  • queue(): 异步执行, 返回一个Future对象.

其中HystrixObservableCommand, 能返回多个操作结果, 有下面两种执行方式.

  • observe(): 返回Observable对象, 代表操作的多个结果, 是一个Hot Observable.
  • toObservable(): 同样返回Observable对象, 代表操作的多个结果, 但是是一个Cold Observable.

Hystrix 底层大量使用了 RxJava, RxJava的核心内容就包含了Observable, 可以理解其为”事件源”, 和它对应的叫Subscriber, 理解为”订阅者”, 两者通过subscribe发生订阅.

RxJava是一个响应性扩展的Java VM实现:一个通过使用可观察到的序列组合异步和基于事件的程序的库。
它扩展了observer模式,以支持数据/事件序列,并添加了操作符,允许您以声明的方式组合序列,同时抽象出底层线程、同步、线程安全和并发数据结构等方面的问题。
使用例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.yuda.demo.rxjavademo;

import rx.Observable;
import rx.Subscriber;

/**
* CreateUser: canyuda
* CreateTime: 2019/11/3 13:51
* Description:
*/
public class Main {
public static void main(String[] args) {
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onCompleted();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("Main.onCompleted");
}

@Override
public void onError(Throwable e) {
System.out.println("Main.onError");
}

@Override
public void onNext(String s) {
System.out.println("Main.onNext: s:" + s);
}
});
}
}
  • Hot Observable: 发布事件不需要订阅者, 事件执行过程中如果有了订阅者, 订阅者仅可以看到整个局部的过程.
  • Cold Observable: 发布事件需要订阅者, 没有时进行等待, 有订阅者时事件才会发布, 订阅者能看到整个事件过程.

注意: HystrixCommand其实底层也用到了RxJava, 区别是HystrixCommand保证了返回结果的单一.

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}

public Future<R> queue() {
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
/// 省略实现
// 注:仅对cancel()方法进行了修饰
// 修饰的内容是根据配置判断如果中断:executionIsolationThreadInterruptOnFutureCancel
};
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
Throwable t = decomposeException(e);
if (t instanceof HystrixBadRequestException) {
return f;
} else if (t instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException) t;
switch (hre.getFailureType()) {
case COMMAND_EXCEPTION:
case TIMEOUT:
return f;
default:
throw hre;
}
} else {
throw Exceptions.sneakyThrow(t);
}
}
}
return f;
}
  • execute()是通过queue()返回的Futureget()实现的同步.
  • queue() 是通过把Observable转换为BlockingObservable,然后在转换为Future, 然后用另外一个Future包装了一层, 最终返回出Future对象

第三步:判断响应是否已经缓存

如果缓存功能开启了, 并且命中了缓存, 那么就会立即以Observable对象的形式返回.

第四步:线路是否打开

如果缓存没有命中, 会在执行命令前检测断路器是否开启了.

  • 开启: 调用过程直接中断, 跳转到fallback处理逻辑
  • 关闭: 跳转到第五步, 去检测是否有足够的资源来执行命令.

第五步:线程池/队列/信号量是否满了

每一个命令的执行需要占用一定的资源, 可以是线程池, 请求队列, 或者是信号量. 如果这些资源被占满, Hystrix就认为并发太高了, 于是直接跳转到fallback处理逻辑.

线程池: Hystrix为每个依赖的服务提供了专有线程池, 各个依赖服务之间采用舱璧模式(Bulkhead Pattern)隔离

第六步:命令执行者开始执行

执行 HystrixObservableCommand.construct() 或者 HystrixCommand.run().

如果命令设置了时间阈值, 并超过了这个时间, 则会抛一个TimeoutException, 然后跳转到fallback处理逻辑.

如果正常执行, HystrixObservableCommand返回一个Observable对象; 而HystrixCommand返回一个Observable并发送一个结果,产生onCompleted()的结束通知,

第七步:计算线路健康度

执行出结果后, Hystrix会把刚刚执行命令时的执行状态信息(成功/失败/拒绝/超时) 反馈给断路器, 断路器会维护一组数组统计这些信息, 并且根据这些信息来判断是否需要打开断路器, 以及如何打开断路器.

第八步:后备处理

当命令由于某些原因无法执行时, 执行的降级逻辑, 无法执行的原因有很多, 比如:断路器打开了, 资源被占满了 或者执行时抛异常了.

HystrixObservableCommandHystrixCommand 降级的执行有些区别:

  • HystrixObservableCommand: resumeWithFallback()返回一个Observable对象, 可以发射多个降级结果
  • HystrixCommand: getFallback() 返回一个结果.

具体业务中降级逻辑尽量是能稳定执行的. 如果不能做到, 还可以针对降级逻辑使用Hystrix进行降级, 从而形成级联的降级策略. 但是最终的处理需要是稳定可靠的. 如果降级逻辑出现了异常, 则底层依然会返回一个Observable对象, 但是它是没有任何结果的, 而是通过onError()通知命令立即中断, 最后把异常发送给调用者. 下面是降级逻辑异常的现象:

  • execute(): 直接抛异常
  • queue(): 正常返回一个Futrue对象, 但是get()时会抛异常
  • observe()toObservable(): 正常返回Observable对象, 当订阅时, 立即会通过调用者的onError()通知中止请求.

第九步:返回成功结果

下面可以看到4种处理方式的不同.

hystrix-return-flow

  • toObservable() 返回原始的Observable对象, 订阅时才会触发执行.
  • observe() 先执行toObservable(), 然后立即订阅它, 马上就能开始执行. 并返回一个Observable对象.
  • queue() 同样是先执行toObservable(), 然后通过toBlocking()转换为一个BlockingObservable对象, 再通过toFuture()转换为一个Future对象, 最后返回这个对象.
  • execute() 通过queue()返回的Future对象对象, 执行get()方法返回.

断路器如何工作

circuit-breaker

图中未画出attemptExecution()

断流器核心是HystrixCircuitBreaker接口, 下面我们来看看这个接口能干嘛.

HystrixCircuitBreaker

1.5.12版本源码

  • allowRequest(): 每个命令通过它判断是否继续执行, 这是幂等的, 不改变内部状态.

  • isOpen(): 当前断路器的开关状态

  • markSuccess(): 在”半开路”状态使用, 如果命令执行成功, 调用这个方法关闭断路器.

  • markNonSuccess(): 在”半开路”状态使用, 如果命令执行成功, 调用这个方法打开断路器.

  • attemptExection(): 在命令执行开始时调用以尝试执行, 这是非幂等的, 它可以修改内部状态.

  • Factory类: 维护了一个ConcurrentHashMap<String, HystrixCircuitBreaker> 通过 HystrixCommandKey 找到对应的HystrixCircuitBreaker实例.

  • NoOpCircuitBreaker类: 实现了HystrixCircuitBreaker, 啥都不做的断路器, 断路器永远关闭状态.

  • HystrixCircuitBreakerImpl类: 实现了HystrixCircuitBreaker, 有5个成员:

    • HystrixCommandProperties properties: 属性对象
    • HystrixCommandMetrics metrics: 记录各类度量指标
    • AtomicReference<Status> status: 断路器状态(关/开/半开)
    • AtomicLong circuitOpened: 断路器打开或者上一次测试的时间戳, 断路器关时为-1
    • AtomicReference<Subscription> activeSubscription: 活动订阅者, 监控metrics中的HealthCounts以修改status, Subscription内容如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    private Subscription subscribeToStream() {
    /*
    * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
    */
    return metrics.getHealthCountsStream()
    .observe()
    .subscribe(new Subscriber<HealthCounts>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(HealthCounts hc) {
    // check if we are past the statisticalWindowVolumeThreshold
    if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
    // we are not past the minimum volume threshold for the stat window,
    // so no change to circuit status.
    // if it was CLOSED, it stays CLOSED
    // if it was half-open, we need to wait for a successful command execution
    // if it was open, we need to wait for sleep window to elapse
    } else {
    if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
    //we are not past the minimum error threshold for the stat window,
    // so no change to circuit status.
    // if it was CLOSED, it stays CLOSED
    // if it was half-open, we need to wait for a successful command execution
    // if it was open, we need to wait for sleep window to elapse
    } else {
    // our failure rate is too high, we need to set the state to OPEN
    if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
    circuitOpened.set(System.currentTimeMillis());
    }
    }
    }
    }
    });
    }

下面看看HystrixCircuitBreakerImpl的具体实现吧.

isOpen()
1
2
3
4
5
6
7
8
9
10
@Override
public boolean isOpen() {
if (properties.circuitBreakerForceOpen().get()) {
return true;
}
if (properties.circuitBreakerForceClosed().get()) {
return false;
}
return circuitOpened.get() >= 0;
}

强制开关未设置的情况下, 判断circuitOpened是否大于0, 大于0表示断路开启.

allowRequest()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
if (circuitOpened.get() == -1) {
return true;
} else {
if (status.get().equals(Status.HALF_OPEN)) {
return false;
} else {
return isAfterSleepWindow();
}
}
}

private boolean isAfterSleepWindow() {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
return currentTime > circuitOpenTime + sleepWindowTime;
}

强制开关未设置的情况下, 判断circuitOpened是否等于-1, 如果不等于, 再判断是否处于半开模式, 如果不是半开模式, 再判断是否处于睡眠窗口之后, 超过睡眠窗口的命令让他执行.

markSuccess()和markNonSuccess()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void markSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
//This thread wins the race to close the circuit - it resets the stream to start it over from 0
metrics.resetStream();
Subscription previousSubscription = activeSubscription.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
}
Subscription newSubscription = subscribeToStream();
activeSubscription.set(newSubscription);
circuitOpened.set(-1L);
}
}

@Override
public void markNonSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
//This thread wins the race to re-open the circuit - it resets the start time for the sleep window
circuitOpened.set(System.currentTimeMillis());
}
}

分别负责半开->关,半开->开的状态转换, 维护了circuitOpened, 并且markSuccess()还处理了activeSubscription的更新.

attemptExecution()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public boolean attemptExecution() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
if (circuitOpened.get() == -1) {
return true;
} else {
if (isAfterSleepWindow()) {
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
//only the first request after sleep window should execute
return true;
} else {
return false;
}
} else {
return false;
}
}
}

命令请求执行, 断路器开启状态下, 如果超过睡眠窗口时间, 则切换未半开状态, 尝试执行命令.