SpringCloud组件03---Hystrix实践

因为Hystrix大量使用到观察者模式和RxJava,所以需要补充相关知识。

观察者模式可以看这里:《》

下面是RxJava的快速入门

RxJava快速入门#

基本概念#

RxJava 有四个基本概念:

  • Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、Event事件。ObservableObserver 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通 知 Observer。这里的事件就是对依赖的服务进行调用。

  • 一个Observable可以发出多个事件,直到结束或者发生异常。

  • Observable每次发出一个事件就会调用Subscriber对象的onNext()方法。

  • 每一个Observable的执行,最后都会通过调用一个Subscriber.onComplete()或者Subscriber.onError()结束事件的操作流。

  • RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted()onError()

  • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。

  • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。

  • 在一个正确运行的事件序列中, onCompleted()onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

    image-20200603184440919

创建流程#

Observer观察者#

观察者,它决定事件触发的时候将有怎样的行为。有两种方式,直接创建Observer接口,或者从抽象类Subscriber实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}

@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}

@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};

Subscriber和上面的Observer二者只是抽象类提供了更多的拓展方法(onStartunsubscribe)而已,创建过程是一样的。

Observable 被观察者(事件源)#

决定什么时候触发事件以及触发怎样的事件。 RxJava 使用 create() 方法来创建一个 Observable ,并为它定义事件触发规则:

1
2
3
4
5
6
7
8
9
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});

可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表:

  • Observable 被订阅的时候,OnSubscribecall() 方法会自动被调用

  • 事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用三次 onNext() 和一次 onCompleted())。

  • 这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

这里只是最简单的一个例子,事件的内容是字符串,而不是一些复杂的对象;事件的内容是已经定好了的,而不像有的观察者模式一样是待确定的(例如网络请求的结果在请求返回之前是未知的);所有事件在一瞬间被全部发送出去,而不是夹杂一些确定或不确定的时间间隔或者经过某种触发器来触发的。

简介#

官网

https://github.com/Netflix/hystrix

因为调用链路越来越长,当某个微服务挂了,可能造成整个调用链路的请求拥挤和挂起,导致更多服务down掉,最终形成服务雪崩。

Hystrix是用于处理分布式系统的延迟容错开源库,在超时、异常等场景下,Hystrix保证在一个依赖出问题的情况下,不会导致整体集群集体失败、避免级联故障、提高分布式系统的弹性

断路器是一种开关装置。实现快速失败,服务失败后,通过服务的故障监控,向调用方发送一个预期的、可处理的备选FallBack反馈,而不是长时间挂起或者抛出异常。

这样保证了服务调用方的线程不会长时间挂起、不必要的等待,避免了故障的蔓延和雪崩。

造成雪崩原因可以归结为以下三个:

  • 服务提供者不可用(硬件故障,程序Bug,缓存击穿,用户大量请求)
  • 重试加大流量(用户重试,代码逻辑重试)
  • 服务调用者不可用(同步等待造成的资源耗尽)

最终的结果就是一个服务不可用导致一系列服务的不可用,而往往这种后果往往无法预料的。

概念#

服务熔断#

一般是指软件系统中,由于某些原因使得服务出现了过载现象,为防止造成整个系统故障,从而采用的一种保护措施,所以很多地方把熔断亦称为过载保护。

服务降级#

划分优先级,忍痛割爱。整体资源快不够了,忍痛将某些服务先关掉,待渡过难关,再开启回来。对方不可用的时候,给一个可预期的备选兜底FallBack。

什么时候会降级:程序异常/超时/熔断触发/线程池、信号量打满

要在调用方做降级(不然那个微服务都down掉了再做降级也没什么意义了) 比如说我们 user 调用payment 那么就在user 做降级.

熔断、降级的关系#

二者的目标是一致的,目的都是保证上游服务的稳定性。但其关注的重点并不一样,融断对下层依赖的服务并不级(或者说孰轻孰重),一旦产生故障就断掉;而降级需要对下层依赖的业务分级,把产生故障的丢了,换一个轻量级的方案,是一种退而求其次的方法。
根据业务场景的不同,一般采用以下两种模式:

降级方式#

第一种(最常用)如果服务失败,则我们通过fallback进行降级,返回静态值。

  • fallback进行降级,返回静态值:

image-20200602153749926

  • 级联方式降级:

如果第一个服务失败,则调用备用服务,例如失败重试或者访问缓存失败再去取数据库。

服务级联的目的则是尽最大努力保证返回数据的成功性,但如果考虑不充分,则有可能导致级联的服务崩溃(比如,缓存失败了,把全部流量打到数据库,瞬间导致数据库挂掉)。

因此级联模式,也要慎用,增加了管理的难度。

image-20200602153858394

Hystrix执行以下操作:

  • 提供保护并控制通过第三方客户端库访问(通常是通过网络)的依赖项带来的延迟和失败。
  • 停止复杂的分布式系统中的级联故障。
  • 快速失败并快速恢复。
  • FallBack回退并在可能的情况下正常降级。
  • 启用近乎实时的监视,警报和操作控制。

Hystrix的工作原理:

  • 防止任何单个依赖项耗尽所有容器(例如Tomcat)用户线程。
  • 减少负载并快速失败,而不是排队。
  • 在可行的情况下提供备用,以保护用户免受故障的影响。
  • 隔离:例如隔板bulkhead,泳道swimlane和断路器模式circuit breaker patterns,(线程池隔离和信号量隔离)限制调用分布式服务的资源使用,某一个调用的服务出现问题不会影响其他服务调用。
  • 降级机制:超时降级、资源不足时(线程或信号量)降级,降级后可以配合降级接口返回托底数据。
  • 融断:当失败率达到阀值自动触发降级(如因网络故障/超时造成的失败率高),熔断器触发的快速失败会进行快速恢复。
  • 缓存:提供了请求缓存、请求合并实现。
  • 通过近实时监控指标,警报优化

服务限流#

秒杀等高并发场景,严禁一窝蜂,需要排队,一秒钟只能N个有序运行。

如何实现的:#

  • 将对外部系统(或“依赖项”)的所有调用包装在通常在单独线程中执行的HystrixCommand或HystrixObservableCommand对象中(这是命令模式的示例)。

  • 超时调用时间阈值。 有默认值,但可以自定义为超过99.5%的调用时间。

  • 为每个依赖项维护一个小的线程池(或信号量); 如果已满,发往该依赖项的请求将立即被拒绝,而不是排队。

  • 测量成功,失败(客户端抛出的异常),超时和线程拒绝。

  • 如果某个服务的错误百分比超过阈值,则使断路器跳闸,以在一段时间内手动或自动停止所有对特定服务的请求。

  • 当请求失败,被拒绝,超时或短路时执行回退逻辑。

  • 几乎实时监控指标和配置更改。

    更多见:

https://github.com/Netflix/Hystrix/wiki/How-it-Works#flow1

image-20200602151354011

上图的步骤#

  1. 构造一个HystrixCommandHystrixObservableCommand对象

  2. 执行命令,有四种方式。分别可以返回单个和多个返回。HystrixObservableCommand两个方法分别获取Hot和Cold的Observable。

  3. 响应是否已缓存:如果为此命令启用了请求缓存,并且如果对请求的响应在缓存中可用,则该缓存的响应将立即以的形式返回Observable

  4. 断路器是否开启:当执行该命令时,Hystrix会检查断路器,以查看断路器是否断开。

    如果断开(或“跳闸”),那么Hystrix将不执行命令,而是将流路由到(8)获取回退。如果电路是闭合的,则流程进行到(5),以检查是否有足够的容量来运行该命令。

  5. 线程池/队列/信号量是否已满:如果与该命令关联的线程池和队列(或信号量,如果未在线程中运行)已满,则Hystrix将不执行该命令,但会立即将流路由到(8)获取回退。

  6. HystrixObservableCommand.construct()HystrixCommand.run(),run返回单个响应或引发异常。construct返回一个Observable,它发出响应或发送onError通知。

  7. 计算电路健康度:Hystrix向断路器报告成功,失败,拒绝和超时,断路器保持滚动的一组计算统计信息的计数器。它使用这些统计信息来确定电路何时应“跳闸”,在此点它会将随后的所有请求短路,直到经过恢复期为止,在此之后,在首先检查某些运行状况检查之后,它将再次闭合电路。

  8. 获取后备:两个处理降级的类区别:

    1. construct或者run引发异常

    2. 断路器断开

    3. 线程池或者信号量满了

    4. 命令时间超时

      HystrixCommand.getFallback()或者HystrixObservableCommand.resumeWithFallback()进行后备实现,最终的后备应该不依赖网络静态,否则会级联失败。

      如果没准备后备,缺省抛出异常,会到OnError通知,非常糟糕要避免。

  9. 获取成功后的响应:参见 HystrixCommand和HystrixObservableCommand两个处理降级的类区别

这里有一个关联源码的动态序列图 https://design.codelytics.io/hystrix/how-it-works

降级出现的场景:

  • 上图4,命令处于“熔断、短路”状态的时候
  • 上图5,当前命令的线程池、请求队列或者信号量被占满的时候。
  • 上图6,当HystrixObservableCommand.construct()或者HystrixCommand.run()发生异常的时候。

隔离的实现#

线程池隔离模式:#

使用一个线程池来存储当前的请求,线程池对请求作处理,设置任务返回处理超时时间,堆积的请求堆积入线程池队列。这种方式需要为每个依赖的服务申请线程池,有一定的资源消耗,好处是可以应对突发流量(流量洪峰来临时,处理不完可将数据存储到线程池队里慢慢处理)

信号量隔离模式:#

使用一个原子计数器(或信号量)来记录当前有多少个线程在运行,请求来先判断计数器的数值,若超过设置的最大线程个数则丢弃改类型的新请求,若不超过则执行计数操作请求来计数器+1,请求返回计数器-1。这种方式是严格的控制线程且立即返回模式,无法应对突发流量(流量洪峰来临时,处理的线程超过数量,其他的请求会直接返回,不继续去请求依赖的服务)

区别(两种隔离方式只能选其一):

线程池隔离 信号量隔离
线程 与调用线程非相同线程 与调用线程相同(jetty线程)
开销 排队、调度、上下文开销等 无线程切换,开销低
异步 支持 不支持
并发支持 支持(最大线程池大小) 支持(最大信号量上限)

image-20200602153259821

熔断#

正常状态下,电路处于关闭状态(Closed),如果调用持续出错或者超时,电路被打开进入熔断状态(Open),后续一段时间内的所有调用都会被拒绝(Fail Fast),一段时间以后,保护器会尝试进入半熔断状态(Half-Open),允许少量请求进来尝试,如果调用仍然失败,则回到熔断状态,如果调用成功,则回到电路闭合状态;

image-20200602153404090

HystrixCircuitBreaker(断路器的具体实现),下图是官网How-it-works里面的:

img

详细的工作流程:

http://hot66hot.iteye.com/blog/2155036

Hystrix源码#

HystrixCommand类图#

用于返回一个操作结果

类图,抽象类HystrixCommand继承了AbstractCommand,实现了三个核心接口HystrixExecutableHystrixInvokableInfoHystrixObservable

image-20200603115730422

HystrixObservableCommand类图#

用于返回多个操作结果对象

image-20200603160553985

  • HystrixInvokableInfo是一个空接口,只是一个标记。
  • HystrixInvokable也一样,空接口是一个标记。

HystrixCommand和HystrixObservableCommand#

两个处理降级的类区别:#

  • HystrixCommand有2个方法:

    • public R execute(): 用于同步执行命令。其实是用queue()返回的Future.get()实现同步。会等待任务执行完毕。
    • public Future<R> queue():用于异步执行命令。底层是通过 toObservable()拿到一个ColdObservable对象,通过toBlocking()转换为BlockingObservable,它可以把数据通过阻塞的方式发射出去。但是这里使用toFuture()转换成了一个Future返回,不会阻塞。使得消费者可以异步操作。这种转换要求只能发出一个数据,所以execute和queue都只能返回单一结果。
  • HystrixObservableCommand又提供了2个方法:

    • public Observable<R> observe():返回Observable对象,eagerly的。代表了操作的多个结果。订阅Observable,可用于通过回调异步执行命令。返回的是一个Hot Observable(不管是否有订阅者都会触发事件,每一个订阅者都可能是中途开始的,只看到整个操作的局部)
    • public Observable<R> toObservable():返回的也是Observable对象,lazily的。代表了操作的多个结果。返回的是一个Cold Observable(会等待,直到有订阅者才开始发布事件,对订阅者保证都是从一开始都看到整个操作的全程。)
    • Observable可以发送过个数据,获取多个结果。

    image-20200604002958416

二者降级的方法也不同:#

  • HystrixCommand:通过R getFallback()完成降级,直接返回业务R对象。
  • HystrixObservableCommand:通过Observable<R> resumeWithFallback()完成降级,返回Observable对象来发射一个或者多个降级结果。
  • 如果我们没有为命令实现降级方法,缺省实现是抛异常。或者降级失败也会抛异常,最终会进入到onError()方法中因其命令失败,要避免。

在服务降级的逻辑中,我们需要一个通用的结果。通常是静态或从缓存中获取的兜底数据,而不是依赖于网络。如果一定要依赖网络,那么依赖的服务也必须放到Hystrix的命令中,级联降级。最终最后兜底的那个一定是一个不依赖于网络的,否则可能降级失败。

断路器HystrixCircuitBreaker#

**关键词:**统计值、状态位、CAS、SingleTest、SleepWindow

HystrixCircuitBreaker是一个接口,接口文件中有两个实现类HystrixCircuitBreakerImplNoOpCircuitBreaker,以及一个Factory。

image-20200604093738685

  • Factory没啥意思,就是一个CurrentHashMap使用命令的key来缓存和实例化断路器。

  • 来看一下缺省断路器实现类HystrixCircuitBreakerImpl就能搞定它的原理了,先大概描述一下再上代码:

    • 有四个核心方法:
    • allowRequest():是否允许访问,主要判断超参数开关。参数设定强制断开、联通的话不回去判断断路器。否则才判断断路器isOpen或者singleTest。没啥意思。
    • isOpen(): true=断开,断开的立刻返回失败。没断开的判断窗口内是否超量(缺省10秒20),没超量也返回ok。超量的看错误率(缺省50%)超标则标记失败状态(CAS标识和失败时间)并返回,否则返回ok。
    • allowSingleTest():是否允许测一把,在上面isOpen里面如果错误量超标,会CAS标记断开,同时记录断开时间。然后当前请求时间>断开时间+sleepWindow(缺省5秒)后属于半开状态,此时的请求只允许测试一次,成功就恢复,没成功继续修改断开时间。【也是CAS修改时间戳,保证只会发起一次测试
    • markSuccess():请求成功后就标记成功并清空计数器(CAS标记成功防止重复清空)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
private final HystrixCommandProperties properties; //实例化的时候带进来的,一大堆配置参数和开关
private final HystrixCommandMetrics metrics; //实例化的时候带进来的,一堆指标

/* track whether this circuit is open/closed at any given point in time (default to false==closed) */
private AtomicBoolean circuitOpen = new AtomicBoolean(false); // true的时候就是open,也就是断路器断开了

/* when the circuit was marked open or was last allowed to try a 'singleTest' */
private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong(); // 一个时间戳

protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
}

// 标记成功
public void markSuccess() {
if (circuitOpen.get()) { //true=open=断开
if (circuitOpen.compareAndSet(true, false)) { // 注意使用了CAS避免并发问题,恢复断路器。
//win the thread race to reset metrics // CAS拿到了执行权
//Unsubscribe from the current stream to reset the health counts stream. This only affects the health counts view, //退订当前流以重置运行状况计数流。这只会影响健康计数相关的信息
//and all other metric consumers are unaffected by the reset //并且所有其他指标使用者均不受重置的影响
metrics.resetStream();
}
}
}

@Override
public boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) { // 参数里面看看是不是强制断开的,是的话拒绝。
// properties have asked us to force the circuit open so we will allow NO requests
return false;
}
if (properties.circuitBreakerForceClosed().get()) { // 参数里面是不是强制关闭的,是的话允许。
// we still want to allow isOpen() to perform it's calculations so we simulate normal behavior // 计数
isOpen();
// properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
return true;
}
return !isOpen() || allowSingleTest(); // 否则看是否闭合,或者是否允许单一测试(单一测试是挂了很久了,再来一个请求允许试一把看看恢复了没)
}

// 挂了很久,测一把看看是否恢复了。
public boolean allowSingleTest() {
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get(); // 从Atomic时间戳了拿出来上次失败时间
// 1) if the circuit is open // 断路器断开了,而且已经断开了很久,超过了参数配置的睡眠窗口时间。就会允许测试一把
// 2) and it's been longer than 'sleepWindow' since we opened the circuit
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// 如果测试成功,就会把断路器合上,恢复服务。否则把失败时间往后赋值,再等下一次时间窗口后进行singleTest
// We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
// If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
// if this returns true that means we set the time so we'll return true to allow the singleTest
// if it returned false it means another thread raced us and allowed the singleTest before we did
return true; // 允许
}
}
return false; // 另一个线程已经测试了
}


// 断路器是否闭合
@Override
public boolean isOpen() {
if (circuitOpen.get()) {
// 如果是断开的(true),我们会立即返回true断开,而不必费心尝试“关闭”自己,因为这允许allowSingleTest和随后的成功测试关闭
// if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
return true;
}
// 如果我们是闭合的,看一眼是否有错误使我们跳闸,以便使电路断开
// we're closed, so let's see if errors have made us so we should trip the circuit open
HealthCounts health = metrics.getHealthCounts();

// 检查是否超过总统计量的阈值参数,没有超过总量就立即返回false(没断开),不用计算别的
// check if we are past the statisticalWindowVolumeThreshold
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
return false;
}

// 如果超过了总量,再来计算错误比例
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false; // 错误比例没超标,直接返回ok的,没断开
} else {
// our failure rate is too high, trip the circuit
if (circuitOpen.compareAndSet(false, true)) { // 错误率太高,CAS把断路器关闭掉。CAS成功的话
// if the previousValue was false then we want to set the currentTime
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis()); // 设置断路时间
return true; // 返回断路了
} else {
// How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
// caused another thread to set it to true already even though we were in the process of doing the same
// In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
return true; // 这里是被别的CAS抢占了,但是也要告诉当前的调用者这次是断开的。
}
}
}

}

隔离#

配置参数#

很多参数,去这里查:

https://github.com/Netflix/Hystrix/wiki/Configuration

https://www.jianshu.com/p/e07661b9bae8

https://www.jianshu.com/p/3e11ac385c73uyi