Skip to main content
Soul Gateway Learning Resilience4j Plugin

Soul Gateway Learning Resilience4j Plugin

yanbingAbout 5 minSoul

Aim

  • What is Resilience4J?
  • Resilience 4j experience with soul
    • Current-limiting
    • Fuse
  • Interpretation of Resilience4J Plug-in Source Code

What is Resilience4j?

  • Resilience4J is the recommended fault tolerance scheme of Spring Cloud Gateway. It is a lightweight fault tolerance library.

  • It borrows from Hystrix and uses JDK8 functional programming, namely lambda expressions.

  • In contrast, Netflix Hystrix has a compilation dependency on Archaius. Resilience4j You don't need to reference all the dependencies. You can reference the relevant modules according to the functions you need. Hystrix will not be updated. Spring offers an alternative to Netflix Hystrix, namely Resilence4J

  • Resilience4J provides a range of usability features that enhance microservices:

    • Circuit Breaker
    • Rate Limiter
    • Isolation based on semaphore
    • Cache
    • Time limiter
    • Request to restart Retry
  • Official Dependency Package

    <dependency>
           <groupId>io.github.resilience4j</groupId>
           <artifactId>resilience4j-circuitbreaker</artifactId>
           <version>${resilience.version}</version>
    </dependency>

Resilience 4j experience with soul

  • First, open Resilience4j in the soul-admin console plug-in management.

  • Add dependency in soul gateway

      <dependency>
           <groupId>org.dromara</groupId>
           <artifactId>soul-spring-boot-starter-plugin-ratelimiter</artifactId>
           <version>${project.version}</version>
       </dependency>
* Resilience4j Processing in Detail:

	* timeoutDurationRate:Timeout for waiting to acquire tokens, in milliseconds, default value: 5000.

	* limitRefreshPeriod:Time interval for refreshing tokens, in milliseconds, default value: 500.

	* limitForPeriod:Number of tokens refreshed each time, default value: 50.

	* circuitEnable:Whether to enable circuit breaker, 0: off, 1: on, default value: 0.

	* timeoutDuration:Timeout for circuit breaker, in milliseconds, default value: 30000.

	* fallbackUri:URI for fallback handling.

	* slidingWindowSize:Size of the sliding window, default value: 100.

	* slidingWindowType:Type of the sliding window, 0: based on count, 1: based on time, default value: 0.

	* minimumNumberOfCalls:Minimum number of requests to trigger circuit breaker, circuit breaker statistics will be calculated only if this threshold is exceeded, default value: 100.

	* waitIntervalFunctionInOpenState:Duration for which the circuit breaker remains open, in milliseconds, default value: 10.

	* permittedNumberOfCallsInHalfOpenState:Size of the circular buffer in the half-open state, circuit breaker calculation will be performed only if this number is reached, default value: 10.

	* failureRateThreshold:Percentage of error rate, circuit breaker will only open if this threshold is reached, default value: 50.

	* automaticTransitionFromOpenToHalfOpenEnabled:Whether to automatically transition from open state to half-open state, true: yes, false: no, default value: false.

Current-limiting

  • The parameter configuration is checked as follows. If the parameter value is less than the default value, the default value will be directly assigned, so it is convenient to directly modify the configuration of the source code for testing the effect: the number of tokens refreshed each time is 2, the time interval for refreshing tokens is 1 s, and the timeout time is 1 s.
    /**
     * check filed default value.
     *
     * @param resilience4JHandle {@linkplain Resilience4JHandle}
     * @return {@linkplain Resilience4JHandle}
     */
    public Resilience4JHandle checkData(final Resilience4JHandle resilience4JHandle) {
        resilience4JHandle.setTimeoutDurationRate(Math.max(resilience4JHandle.getTimeoutDurationRate(), Constants.TIMEOUT_DURATION_RATE));
		   //resilience4JHandle.setLimitRefreshPeriod(Math.max(resilience4JHandle.getLimitRefreshPeriod(), Constants.LIMIT_REFRESH_PERIOD));
		   //resilience4JHandle.setLimitForPeriod(Math.max(resilience4JHandle.getLimitForPeriod(), Constants.LIMIT_FOR_PERIOD));
        // Set the number of tokens refreshed each time to 2, and the time interval for refreshing tokens to 1 second
        resilience4JHandle.setLimitRefreshPeriod(1000);
        resilience4JHandle.setLimitForPeriod(2);
        resilience4JHandle.setTimeoutDuration(1000);
        resilience4JHandle.setCircuitEnable(Math.max(resilience4JHandle.getCircuitEnable(), Constants.CIRCUIT_ENABLE));
		   //resilience4JHandle.setTimeoutDuration(Math.max(resilience4JHandle.getTimeoutDuration(), Constants.TIMEOUT_DURATION));
        resilience4JHandle.setFallbackUri(!"0".equals(resilience4JHandle.getFallbackUri()) ? resilience4JHandle.getFallbackUri() : "");
        resilience4JHandle.setSlidingWindowSize(Math.max(resilience4JHandle.getSlidingWindowSize(), Constants.SLIDING_WINDOW_SIZE));
        resilience4JHandle.setSlidingWindowType(Math.max(resilience4JHandle.getSlidingWindowType(), Constants.SLIDING_WINDOW_TYPE));
        resilience4JHandle.setMinimumNumberOfCalls(Math.max(resilience4JHandle.getMinimumNumberOfCalls(), Constants.MINIMUM_NUMBER_OF_CALLS));
        resilience4JHandle.setWaitIntervalFunctionInOpenState(Math.max(resilience4JHandle.getWaitIntervalFunctionInOpenState(), Constants.WAIT_INTERVAL_FUNCTION_IN_OPEN_STATE));
        resilience4JHandle.setPermittedNumberOfCallsInHalfOpenState(Math.max(resilience4JHandle.getPermittedNumberOfCallsInHalfOpenState(), Constants.PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE));
        resilience4JHandle.setFailureRateThreshold(Math.max(resilience4JHandle.getFailureRateThreshold(), Constants.FAILURE_RATE_THRESHOLD));
        return resilience4JHandle;
    }
  • Use SuperBenchmarker tool, 4 threads, execute 10s
C:\Users\v-yanb07>sb -u http://localhost:9195/http/test/findByUserId?userId=1 -c 4 -N 10
Starting at 2021-03-14 15:46:28
[Press C to stop the test]
23      (RPS: 1)
---------------Finished!----------------
Finished at 2021-03-14 15:46:51 (took 00:00:23.0477097)
24      (RPS: 1)                        Status 200:    25

RPS: 2.2 (requests/second)
Max: 2020ms
Min: 472ms
Avg: 1677ms

  50%   below 1994ms
  60%   below 1997ms
  70%   below 1999ms
  80%   below 1999ms
  90%   below 2001ms
  95%   below 2019ms
  98%   below 2020ms
  99%   below 2020ms
99.9%   below 2020ms
  • Output log
2021-03-14 12:16:35.252  INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController  : Current limiting test
2021-03-14 12:16:36.249  INFO 379336 --- [ctor-http-nio-4] o.d.s.e.h.controller.HttpTestController  : Current limiting test
2021-03-14 12:16:36.250  INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController  : Current limiting test
2021-03-14 12:16:37.250  INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController  : Current limiting test
2021-03-14 12:16:37.250  INFO 379336 --- [ctor-http-nio-4] o.d.s.e.h.controller.HttpTestController  : Current limiting test
2021-03-14 12:16:38.250  INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController  : Current limiting test
2021-03-14 12:16:38.250  INFO 379336 --- [ctor-http-nio-4] o.d.s.e.h.controller.HttpTestController  : Current limiting test
2021-03-14 12:16:39.252  INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController  : Current limiting test
2021-03-14 12:16:39.252  INFO 379336 --- [ctor-http-nio-4] o.d.s.e.h.controller.HttpTestController  : Current limiting test

The console log outputs two lines per second to verify that the throttling is in effect

Fuse

  • From the configuration information, we know that the fuse is off by default, and we need to open it.
  • Soul-examples-http Add sleep time at call interface
    @GetMapping("/findByUserId")
    public UserDTO findByUserId(@RequestParam("userId") final String userId) throws Exception{
        UserDTO userDTO = new UserDTO();
        userDTO.setUserId(userId);
        userDTO.setUserName("hello world");
       log.info("Current limiting test");

        int i = RandomUtils.nextInt(1,3);
        if(i %2==0){
		//throw new Exception("Exception thrown");
        Thread.currentThread().sleep(2000);
        }
        return userDTO;
    }
  • Resilience4JHandle # checkData Manually set the timeout to 1s
    resilience4JHandle.setTimeoutDuration(1000);
  • The pos interface calls

    http://localhost:9195/http/test/findByUserId?userId=1

In case of multiple requests, some requests return normal data, and some requests return the following data, indicating that the timeout fuse is effective.

{
  "code": 500,
  "message": "Internal Server Error",
  "data": "404 NOT_FOUND"
}

Interpretation of Resilience4J Plug-in Source Code

The soul gateway Resilience4j plug-in source code uses a reactive programmingopen in new window lot of methods. First, you need to understand responsive programming.

  • Resilience4J Plug-in Directory Structure
└─resilience4j
    │  Resilience4JPlugin.java                   // Plugin processing, core class
    │
    ├─build
    │      Resilience4JBuilder.java         	 // Build Resilience4JConf object
    │
    ├─conf
    │      Resilience4JConf.java
    │
    ├─executor
    │      CombinedExecutor.java      		 	 // Limiter and circuit breaker executor
    │      Executor.java
    │      RateLimiterExecutor.java    	 		 // Limiter executor
    │
    ├─factory
    │      Resilience4JRegistryFactory.java    	 // Build limiter and circuit breaker objects
    │
    └─handler
            Resilience4JHandler.java
  • Resilience4JPlugn # doExecuteResilience4JPlugn inherits AbstractSoulPlugin like other soul plug-ins. As long as it is enabled, it will go to the core method doExecute through the chain mechanism.
    @Override
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assert soulContext != null;
        // Get configuration information object
        Resilience4JHandle resilience4JHandle = GsonUtils.getGson().fromJson(rule.getHandle(), Resilience4JHandle.class);
        // Check configuration information, assign default values if they're less than defaults
        resilience4JHandle = resilience4JHandle.checkData(resilience4JHandle);
        // circuitEnable configuration: 1 enables circuit breaker component, otherwise use limiter component
        if (resilience4JHandle.getCircuitEnable() == 1) {
            return combined(exchange, chain, rule);
        }

        return rateLimiter(exchange, chain, rule);
    }
  • Current Limiting Resilience4JPlugin # rateLimiter
    private Mono<Void> rateLimiter(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) {
    return ratelimiterExecutor.run(
            // chain.execute(exchange) calls subsequent plugins
            chain.execute(exchange), fallback(ratelimiterExecutor, exchange, null), Resilience4JBuilder.build(rule))
            .onErrorResume(throwable -> ratelimiterExecutor.withoutFallback(exchange, throwable))


    // Called by ratelimiterExecutor.run
    @Override
public <T> Mono<T> run(final Mono<T> toRun, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf conf) {
    // Limiter component
    RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(conf.getId(), conf.getRateLimiterConfig());
    // Limiting execution
    Mono<T> to = toRun.transformDeferred(RateLimiterOperator.of(rateLimiter));
    if (fallback != null) {
    // Execute fallback
        return to.onErrorResume(fallback);
    }
    return to;
}


    // to.onErrorResume(fallback);
    default Mono<Void> fallback(ServerWebExchange exchange, String uri, Throwable t) {
    if (StringUtils.isBlank(uri)) {
        return withoutFallback(exchange, t);
    }
    DispatcherHandler dispatcherHandler = SpringBeanUtils.getInstance().getBean(DispatcherHandler.class);
    ServerHttpRequest request = exchange.getRequest().mutate().uri(Objects.requireNonNull(UriUtils.createUri(uri))).build();
    ServerWebExchange mutated = exchange.mutate().request(request).build();
    // Execute the fallback
    return dispatcherHandler.handle(mutated);
}
  • Fuse Resilience 4JPlugin # combined
    private Mono<Void> combined(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) {
        Resilience4JConf conf = Resilience4JBuilder.build(rule);
        return combinedExecutor.run(
                chain.execute(exchange).doOnSuccess(v -> {
                    HttpStatus status = exchange.getResponse().getStatusCode();
                    if (status == null || !status.is2xxSuccessful()) {
                        exchange.getResponse().setStatusCode(null);
                        throw new CircuitBreakerStatusCodeException(status == null ? HttpStatus.INTERNAL_SERVER_ERROR : status);
                    }
                }), fallback(combinedExecutor, exchange, conf.getFallBackUri()), conf);
    }


   // Executed in combinedExecutor#run
    public <T> Mono<T> run(final Mono<T> run, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf resilience4JConf) {
        RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(resilience4JConf.getId(), resilience4JConf.getRateLimiterConfig());
        CircuitBreaker circuitBreaker = Resilience4JRegistryFactory.circuitBreaker(resilience4JConf.getId(), resilience4JConf.getCircuitBreakerConfig());
                     //Circuit breaker operation
        Mono<T> to = run.transformDeferred(CircuitBreakerOperator.of(circuitBreaker))
                // Limiting operation
                .transformDeferred(RateLimiterOperator.of(rateLimiter))
                // Set timeout
                .timeout(resilience4JConf.getTimeLimiterConfig().getTimeoutDuration())
                // Throw timeout exception if timeout occurs
                .doOnError(TimeoutException.class, t -> circuitBreaker.onError(
                        resilience4JConf.getTimeLimiterConfig().getTimeoutDuration().toMillis(),
                        TimeUnit.MILLISECONDS,
                        t));
        if (fallback != null) {
            to = to.onErrorResume(fallback);
        }
        return to;
    }

Sum up

  • The soul gateway provides current limiting and fusing, and the fusing is off by default
  • If the parameter value is less than the default value, the default value will be used directly
Last update:
Contributors: Cicici-Shi