目标
- 什么是Resilience4J
- soul的Resilience4j体验
- 限流
- 熔断
- Resilience4J插件源码解读
什么是Resilience4j
- Resilience4J是Spring Cloud Gateway推荐的容错方案,它是一个轻量级的容错库
- 借鉴了Hystrix而设计,并且采用JDK8 这个函数式编程,即lambda表达式
- 相比之下, Netflix Hystrix 对Archaius 具有编译依赖性,Resilience4j你无需引用全部依赖,可以根据自己需要的功能引用相关的模块即可 Hystrix不更新了,Spring提供Netflix Hystrix的替换方案,即Resilence4J
Resilience4J 提供了一系列增强微服务的可用性功能:
- 断路器 CircuitBreaker
- 限流 RateLimiter
- 基于信号量的隔离
- 缓存
- 限时 Timelimiter
- 请求重启 Retry
官方提供的依赖包
<dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-circuitbreaker</artifactId> <version>${resilience.version}</version> </dependency>
soul的Resilience4j体验
首先在soul-admin控制台插件管理开启Resilience4j
在soul网关添加依赖
<dependency> <groupId>org.dromara</groupId> <artifactId>soul-spring-boot-starter-plugin-ratelimiter</artifactId> <version>${project.version}</version> </dependency>
启动三个服务,分别是一个soul-admin,一个soul-bootstrap,一个soul-examples-http
在soul-admin控制台找到插件列表的Resilience4j,自定义配置,如下图,
* Resilience4j处理详解: * timeoutDurationRate:等待获取令牌的超时时间,单位ms,默认值:5000。 * limitRefreshPeriod:刷新令牌的时间间隔,单位ms,默认值:500。 * limitForPeriod:每次刷新令牌的数量,默认值:50。 * circuitEnable:是否开启熔断,0:关闭,1:开启,默认值:0。 * timeoutDuration:熔断超时时间,单位ms,默认值:30000。 * fallbackUri:降级处理的uri。 * slidingWindowSize:滑动窗口大小,默认值:100。 * slidingWindowType:滑动窗口类型,0:基于计数,1:基于时间,默认值:0。 * minimumNumberOfCalls:开启熔断的最小请求数,超过这个请求数才开启熔断统计,默认值:100。 * waitIntervalFunctionInOpenState:熔断器开启持续时间,单位ms,默认值:10。 * permittedNumberOfCallsInHalfOpenState:半开状态下的环形缓冲区大小,必须达到此数量才会计算失败率,默认值:10。 * failureRateThreshold:错误率百分比,达到这个阈值,熔断器才会开启,默认值50。 * automaticTransitionFromOpenToHalfOpenEnabled:是否自动从open状态转换为half-open状态,,true:是,false:否,默认值:false。
限流
- 参数配置 如下是参数配置校验,参数值小于默认值,会直接赋值默认值,因此方便测试效果直接修改源码的配置 : 每次刷新令牌的数量为2 ,刷新令牌的时间间隔为1s,超时时间为1s
/**
* check filed default value.
*
* @param resilience4JHandle {@linkplain Resilience4JHandle}
* @return {@linkplain Resilience4JHandle}
*/
public Resilience4JHandle checkData(final Resilience4JHandle resilience4JHandle) {
resilience4JHandle.setTimeoutDurationRate(Math.max(resilience4JHandle.getTimeoutDurationRate(), Constants.TIMEOUT_DURATION_RATE));
//resilience4JHandle.setLimitRefreshPeriod(Math.max(resilience4JHandle.getLimitRefreshPeriod(), Constants.LIMIT_REFRESH_PERIOD));
//resilience4JHandle.setLimitForPeriod(Math.max(resilience4JHandle.getLimitForPeriod(), Constants.LIMIT_FOR_PERIOD));
//每次刷新令牌的数量为2 ,刷新令牌的时间间隔为1s
resilience4JHandle.setLimitRefreshPeriod(1000);
resilience4JHandle.setLimitForPeriod(2);
resilience4JHandle.setTimeoutDuration(1000);
resilience4JHandle.setCircuitEnable(Math.max(resilience4JHandle.getCircuitEnable(), Constants.CIRCUIT_ENABLE));
//resilience4JHandle.setTimeoutDuration(Math.max(resilience4JHandle.getTimeoutDuration(), Constants.TIMEOUT_DURATION));
resilience4JHandle.setFallbackUri(!"0".equals(resilience4JHandle.getFallbackUri()) ? resilience4JHandle.getFallbackUri() : "");
resilience4JHandle.setSlidingWindowSize(Math.max(resilience4JHandle.getSlidingWindowSize(), Constants.SLIDING_WINDOW_SIZE));
resilience4JHandle.setSlidingWindowType(Math.max(resilience4JHandle.getSlidingWindowType(), Constants.SLIDING_WINDOW_TYPE));
resilience4JHandle.setMinimumNumberOfCalls(Math.max(resilience4JHandle.getMinimumNumberOfCalls(), Constants.MINIMUM_NUMBER_OF_CALLS));
resilience4JHandle.setWaitIntervalFunctionInOpenState(Math.max(resilience4JHandle.getWaitIntervalFunctionInOpenState(), Constants.WAIT_INTERVAL_FUNCTION_IN_OPEN_STATE));
resilience4JHandle.setPermittedNumberOfCallsInHalfOpenState(Math.max(resilience4JHandle.getPermittedNumberOfCallsInHalfOpenState(), Constants.PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE));
resilience4JHandle.setFailureRateThreshold(Math.max(resilience4JHandle.getFailureRateThreshold(), Constants.FAILURE_RATE_THRESHOLD));
return resilience4JHandle;
}
- 使用SuperBenchmarker工具,4个线程,执行10s
C:\Users\v-yanb07>sb -u http://localhost:9195/http/test/findByUserId?userId=1 -c 4 -N 10
Starting at 2021-03-14 15:46:28
[Press C to stop the test]
23 (RPS: 1)
---------------Finished!----------------
Finished at 2021-03-14 15:46:51 (took 00:00:23.0477097)
24 (RPS: 1) Status 200: 25
RPS: 2.2 (requests/second)
Max: 2020ms
Min: 472ms
Avg: 1677ms
50% below 1994ms
60% below 1997ms
70% below 1999ms
80% below 1999ms
90% below 2001ms
95% below 2019ms
98% below 2020ms
99% below 2020ms
99.9% below 2020ms
输出日志
2021-03-14 12:16:35.252 INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController : 限流测试 2021-03-14 12:16:36.249 INFO 379336 --- [ctor-http-nio-4] o.d.s.e.h.controller.HttpTestController : 限流测试 2021-03-14 12:16:36.250 INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController : 限流测试 2021-03-14 12:16:37.250 INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController : 限流测试 2021-03-14 12:16:37.250 INFO 379336 --- [ctor-http-nio-4] o.d.s.e.h.controller.HttpTestController : 限流测试 2021-03-14 12:16:38.250 INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController : 限流测试 2021-03-14 12:16:38.250 INFO 379336 --- [ctor-http-nio-4] o.d.s.e.h.controller.HttpTestController : 限流测试 2021-03-14 12:16:39.252 INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController : 限流测试 2021-03-14 12:16:39.252 INFO 379336 --- [ctor-http-nio-4] o.d.s.e.h.controller.HttpTestController : 限流测试
控制台日志每秒输出两条,由此验证限流生效
熔断
- 从配置信息我们知道熔断器默认是关闭,我们需要开打
soul-examples-http调用接口处添加休眠时间
@GetMapping("/findByUserId") public UserDTO findByUserId(@RequestParam("userId") final String userId) throws Exception{ UserDTO userDTO = new UserDTO(); userDTO.setUserId(userId); userDTO.setUserName("hello world"); log.info("限流测试"); int i = RandomUtils.nextInt(1,3); if(i %2==0){ //throw new Exception("异常抛出"); Thread.currentThread().sleep(2000); } return userDTO; }
Resilience4JHandle#checkData手动设置超时时间为1s
resilience4JHandle.setTimeoutDuration(1000);
多次请求时,有的请求返回正常数据,有的请求返回如下数据,表示超时熔断生效
{ "code": 500, "message": "Internal Server Error", "data": "404 NOT_FOUND" }
Resilience4J插件源码解读
soul网关Resilience4j插件源码大量使用了响应式编程方式,首先需要对响应式编程了解
Resilience4J插件目录结构
└─resilience4j │ Resilience4JPlugin.java //插件处理,核心类 │ ├─build │ Resilience4JBuilder.java //构建Resilience4JConf对象 │ ├─conf │ Resilience4JConf.java │ ├─executor │ CombinedExecutor.java //限流和熔断执行器 │ Executor.java │ RateLimiterExecutor.java //限流执行器 │ ├─factory │ Resilience4JRegistryFactory.java //限流和熔断对象构建 │ └─handler Resilience4JHandler.java
Resilience4JPlugn#doExecute Resilience4JPlugn其他soul中插件一样继承AbstractSoulPlugin,只要开启了,通过链式机制执行,都会走到核心方法doExecute
@Override protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) { final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT); assert soulContext != null; //获取配置信息对象 Resilience4JHandle resilience4JHandle = GsonUtils.getGson().fromJson(rule.getHandle(), Resilience4JHandle.class); //校验配置信息,如果小于默认值,则赋值默认值 resilience4JHandle = resilience4JHandle.checkData(resilience4JHandle); //circuitEnable配置:1 开启熔断组件 ,否则走限流组件 if (resilience4JHandle.getCircuitEnable() == 1) { return combined(exchange, chain, rule); } return rateLimiter(exchange, chain, rule); }
- 限流 Resilience4JPlugin#rateLimiter
private Mono<Void> rateLimiter(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) { return ratelimiterExecutor.run( // chain.execute(exchange) 后续插件执行 chain.execute(exchange), fallback(ratelimiterExecutor, exchange, null), Resilience4JBuilder.build(rule)) .onErrorResume(throwable -> ratelimiterExecutor.withoutFallback(exchange, throwable)) //ratelimiterExecutor.run调用 @Override public <T> Mono<T> run(final Mono<T> toRun, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf conf) { //限流器组件 RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(conf.getId(), conf.getRateLimiterConfig()); //限流执行 Mono<T> to = toRun.transformDeferred(RateLimiterOperator.of(rateLimiter)); if (fallback != null) { //回调的执行 return to.onErrorResume(fallback); } return to; } // to.onErrorResume(fallback); default Mono<Void> fallback(ServerWebExchange exchange, String uri, Throwable t) { if (StringUtils.isBlank(uri)) { return withoutFallback(exchange, t); } DispatcherHandler dispatcherHandler = SpringBeanUtils.getInstance().getBean(DispatcherHandler.class); ServerHttpRequest request = exchange.getRequest().mutate().uri(Objects.requireNonNull(UriUtils.createUri(uri))).build(); ServerWebExchange mutated = exchange.mutate().request(request).build(); //回调的执行地方 return dispatcherHandler.handle(mutated); }
熔断 Resilience4JPlugin#combined
private Mono<Void> combined(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) {
Resilience4JConf conf = Resilience4JBuilder.build(rule);
return combinedExecutor.run(
chain.execute(exchange).doOnSuccess(v -> {
HttpStatus status = exchange.getResponse().getStatusCode();
if (status == null || !status.is2xxSuccessful()) {
exchange.getResponse().setStatusCode(null);
throw new CircuitBreakerStatusCodeException(status == null ? HttpStatus.INTERNAL_SERVER_ERROR : status);
}
}), fallback(combinedExecutor, exchange, conf.getFallBackUri()), conf);
}
//combinedExecutor#run执行的内容
public <T> Mono<T> run(final Mono<T> run, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf resilience4JConf) {
RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(resilience4JConf.getId(), resilience4JConf.getRateLimiterConfig());
CircuitBreaker circuitBreaker = Resilience4JRegistryFactory.circuitBreaker(resilience4JConf.getId(), resilience4JConf.getCircuitBreakerConfig());
//断路器的操作
Mono<T> to = run.transformDeferred(CircuitBreakerOperator.of(circuitBreaker))
//限流操作
.transformDeferred(RateLimiterOperator.of(rateLimiter))
//设置超时时间
.timeout(resilience4JConf.getTimeLimiterConfig().getTimeoutDuration())
//如果超时了抛出超时异常
.doOnError(TimeoutException.class, t -> circuitBreaker.onError(
resilience4JConf.getTimeLimiterConfig().getTimeoutDuration().toMillis(),
TimeUnit.MILLISECONDS,
t));
if (fallback != null) {
to = to.onErrorResume(fallback);
}
return to;
}
总结
- soul网关提供限流和熔断,熔断默认是关闭的
- 参数值小于默认值,会直接使用默认值