跳至主要內容
Soul网关学习Http请求探险

Soul网关学习Http请求探险

百钰大约 11 分钟Soul

回顾

在 Soul 请求处理概览概览这篇文章中,我们已经知晓了 Soul 针对于请求的处理入库在DefaultSoulPluginChain 的 excute,其中执行了一个插件链的模式来完成了请求的处理。

我们大体梳理了注入到plugins的插件,但是即使这样依然不能纵观全局,对此特地对 soul 插件所涉及的类进行了相关梳理,整体梳理结果如下图。

在梳理文章中可以看到核心类是SoulPlugin、PluginEnum、PluginDataHandler、MetaDataSubscriber,在梳理请求的相关文章中我们目前只需要重点关注 SoulPlugin 与 PluginEnum 类。

SoulPlugin 类我们已经有了一定的理解,那 PluginEnum 枚举类的主要作用是什么呢?

PluginEnum:插件的枚举类

属性作用
code插件的执行顺序 越小越先执行
role角色 暂时未发现实际引用地址
name插件名称

其实我们不难发现在DefaultSoulPluginChain 的 plugins的插件都是有固定的执行顺序的,那这个插件的执行顺序是在哪定义的呢?

最终可以追溯到SoulConfiguration类下

    public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
        //省略
        final List<SoulPlugin> soulPlugins = pluginList.stream()
               .sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
        return new SoulWebHandler(soulPlugins);
    }

整理整个 PluginEnum 类相关引用,整理出如下表格,不难看出插件与插件之间的顺序关系

等级作用
第一等级只有 GlobalPlugin 全局插件
第二等级到第八等级可以理解为在请求发起前的前置处理插件
第九等级到第十一等级可以理解为针对调用方的方式所针对的不同调用处理
第十二等级只有 MonitorPlugin 监控插件
第十三等级是针对于各个调用方返回结果处理的 Response 相关插件

在刚才的回顾中我们已经明白 soul 处理请求的大体流程

  • 1.GloBalPlugin 插件 进行全局的初始化
  • 2.部分插件根据鉴权、限流、熔断等规则对请求进行处理
  • 3.选择适合自己的调用方式进行拼装参数,发起调用。
  • 4.进行监控
  • 5.对调用的结果进行处理

请求流程梳理

以下演示代码截图来自于 soul-examples 下的 http demo,调用的接口地址为http://127.0.0.1:9195/http/test/findByUserId?userId=10

DefaultSoulPluginChain 的 excute方法进行埋点,查看一次 http 请求调用经过了哪些类?

public Mono<Void> execute(final ServerWebExchange exchange) {
            return Mono.defer(() -> {
                if (this.index < plugins.size()) {
                    SoulPlugin plugin = plugins.get(this.index++);
                    Boolean skip = plugin.skip(exchange);
                    if (skip) {
                        System.out.println("跳过的插件为"+plugin.getClass().getName().replace("org.dromara.soul.plugin.",""));
                        return this.execute(exchange);
                    }
                    System.out.println("未跳过的插件为"+plugin.getClass().getName().replace("org.dromara.soul.plugin.",""));
                    return plugin.execute(exchange, this);
                }
                return Mono.empty();
            });
        }

最终输出的未跳过的插件如下:

未跳过的插件为 global.GlobalPlugin
未跳过的插件为 sign.SignPlugin
未跳过的插件为 waf.WafPlugin
未跳过的插件为 ratelimiter.RateLimiterPlugin
未跳过的插件为 hystrix.HystrixPlugin
未跳过的插件为 resilience4j.Resilience4JPlugin
未跳过的插件为 divide.DividePlugin
未跳过的插件为 httpclient.WebClientPlugin
未跳过的插件为 alibaba.dubbo.param.BodyParamPlugin
未跳过的插件为 monitor.MonitorPlugin
未跳过的插件为 httpclient.response.WebClientResponsePlugin

这里有个小疑惑,为啥这个 alibaba.dubbo.param.BodyParamPlugin 插件会被执行,暂时忽略,后期跟踪。

我们发现一次针对于 http 请求的网关调用 所执行的插件的大体流程与我们猜想的处理流程一致。
目前我们只挑重点来讲,即GlobalPlugin、DividePlugin、WebClientPlugin、WebClientResponsePlugin

发起 Debug 调用依次追踪上述四个插件的作用。

GlobalPlugin SoulContext 对象封装插件

GlobalPlugin 的插件的 excute 方法如下所示

public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        final ServerHttpRequest request = exchange.getRequest();
        final HttpHeaders headers = request.getHeaders();
        final String upgrade = headers.getFirst("Upgrade");
        SoulContext soulContext;
        if (StringUtils.isBlank(upgrade) || !"websocket".equals(upgrade)) {
            soulContext = builder.build(exchange);
        } else {
            final MultiValueMap<String, String> queryParams = request.getQueryParams();
            soulContext = transformMap(queryParams);
        }
        exchange.getAttributes().put(Constants.CONTEXT, soulContext);
        return chain.execute(exchange);
    }

不难看出 在 GlobalPlugin 的 excute 方法中主要目的就是封装一个SoulContext 对象,放入 exchange 中(exchange 对象是整个插件链上的共享对象,有一个插件执行完成后传递给下一个插件,本人理解的就是一个类似于 ThreadLocal 对象)。

那 SoulContext 对象中又包含哪些属性呢?

属性含义
module每种 RPCType 针对的值不同 http 调用时指代网关调用的前置地址
method切割后的方法名(在 RpcType 为 http 时)
rpcTypeRPC 调用类型有 Http、dubbo、sofa 等
httpMethodHttp 调用的方式目前只支持 get、post
sign鉴权的相关属性目前不知道具体作用,可能与 SignPlugin 插件有关
timestamp时间戳
appKey鉴权的相关属性目前不知道具体作用,可能与 SignPlugin 插件有关
path路径指代调用到 soul 网关的全路径(在 RpcType 为 http 时)
contextPath与 module 取值一致(在 RPCType 为 http 时)
realUrl与 method 的值一致(在 RpcType 为 http 时)
dubboParamsdubbo 的参数?
startDateTime开始时间怀疑与监控插件和统计指标模块有联用

在执行完 GlobalPlugin 插件后,最终封装完成的 SoulContext 对象如下所示。

其他 RPCType 的 SoulContext 的参数封装可以查看DefaultSoulContextBuilder 的 build方法进行追踪,由于本编文章主要追溯 http 调用,故在这里不在多余讨论。

DividePlugin 路由选择插件

在执行完成 GlobalPlugin 插件后,最终封装成了一个SoulContext 对象,并将其放在了ServerWebExchange中,供下游的调用链使用。

接下来让我们看一下DividePlugin 插件在整个链式调用过程中到底起了一个什么样的作用?

AbstractSoulPlugin

通过追溯源码得知DividePlugin 插件继承于 AbstractSoulPlugin 类,而 AbstractSoulPlugin 类实现了 SoulPlugin 接口

那么AbstractSoulPlugin又做了哪些扩展呢?让我们梳理一下该类的方法。

方法名作用
excute实现于 SoulPlugin 接口,在 AbstractSoulPlugin 中起到一个模板方法的作用
doexcute抽象方法 交由各个子类实现
matchSelector匹配选择器
filterSelector筛选选择器
matchRule匹配规则
filterRule筛选规则
handleSelectorIsNull处理选择器为空情况
handleRuleIsNull处理规则为空情况
selectorLog选择器日志打印
ruleLog规则日志打印

看一下excute方法的具体作用

public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        String pluginName = named();
        //获取对应插件
        final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
        //判断插件是否启用
        if (pluginData != null && pluginData.getEnabled()) {
            //获取插件下的所有选择器
            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
            if (CollectionUtils.isEmpty(selectors)) {
                return handleSelectorIsNull(pluginName, exchange, chain);
            }
            //匹配选择器
            final SelectorData selectorData = matchSelector(exchange, selectors);
            if (Objects.isNull(selectorData)) {
                return handleSelectorIsNull(pluginName, exchange, chain);
            }
            //打印选择器日志
            selectorLog(selectorData, pluginName);
            final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
            if (CollectionUtils.isEmpty(rules)) {
                return handleRuleIsNull(pluginName, exchange, chain);
            }
            RuleData rule;
            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
                rule = rules.get(rules.size() - 1);
            } else {
                //匹配规则
                rule = matchRule(exchange, rules);
            }
            if (Objects.isNull(rule)) {
                return handleRu![](https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/f523f655f0014d288b7a4502cc6a08d1~tplv-k3u1fbpfcp-watermark.image)leIsNull(pluginName, exchange, chain);
            }
            //打印规则日志
            ruleLog(rule, pluginName);
            //执行子类具体实现
            return doExecute(exchange, chain, selectorData, rule);
        }
        return chain.execute(exchange);
    }

最终整理的流程图如下所示:

ps:在上述的流程图中并没有细化到具体的方法级别的处理。

但仍有几个点需要着重解释一下:

  • 1.插件数据、选择器数据、规则数据的获取全部来自于BaseDataCache,该类是数据同步过程中最终会影响的类。
  • 2.选择器的类型,在使用 SpringMvc 项目进行接口注册时,会有一个 isFull 的选项为 true 代表全局代理,在全局代理模式下只会注册一个选择器\规则(指代代理所有的接口),所以这里的对应处理为 rule.size()-1.
  • 3.选择器和规则的选择,实际的处理要复杂的多,考虑到是介绍一次请求流程的大体逻辑,所以这里不展开阐述,有兴趣的可以查看MatchStrategy、AbstractMatchStrategy 及其相关实现类(后期会单独开一篇具体讲解),此处对应页面的如下:

梳理一下AbstractSoulPlugin 的 exeute 方法作用,经过上述流程图的引导,我们已经知晓该方法的作用是为了选取插件--->选取选择器--->选取规则,最后交由子类的doexcute方法。

接下来让我们看一下DividePlugin 的 doexcute方法具体做了哪些事。

DividePlugin

protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assert soulContext != null;
        //获取规则处理数据
        final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
        //获取该选择器下的注入的地址
        final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
        if (CollectionUtils.isEmpty(upstreamList)) {
            log.error("divide upstream configuration error: {}", rule.toString());
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
        //通过规则对应的负载均衡策略选择一个地址
        DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
        if (Objects.isNull(divideUpstream)) {
            log.error("divide has no upstream");
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        // set the http url
        String domain = buildDomain(divideUpstream);
        //拼装真实调用地址
        String realURL = buildRealURL(domain, soulContext, exchange);
        exchange.getAttributes().put(Constants.HTTP_URL, realURL);
        //设置超时时间 及重试次数
        exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
        exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
        return chain.execute(exchange);
    }

通过上述代码梳理完成后大体逻辑如下:

  • 1.获取选择器对应的注册地址,对应页面数据如下
  • 2.根据规则的 handle 字段获取负载均衡策略,并选择真实的调用地址(LoadBalanceUtils),重试次数和超时时间,对应页面数据如下。
  • 3.将真实调用地址,超时时间,重试次数传递到ServerWebExchange中,供下游调用链使用。 debug 演示: ps:在上述的主题逻辑中我们没有看到参数在哪里?那这个参数在哪封装的呢?答案在buildRealURL 方法中,是从exchange上下文中获取到的。

WebClientPlugin Http 请求调用插件

接下来让我们看看 Soul 如何发起的请求调用

public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assert soulContext != null;
        //获取真实地址
        String urlPath = exchange.getAttribute(Constants.HTTP_URL);
        if (StringUtils.isEmpty(urlPath)) {
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        //获取超时时间
        long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
        //获取重试次数
        int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
        log.info("The request urlPath is {}, retryTimes is {}", urlPath, retryTimes);
        HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
        WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
        return handleRequestBody(requestBodySpec, exchange, timeout, retryTimes, chain);
    }

在 webClient 的excute方法中,主要做了三个事

  • 1.将从 Divide 插件中放入 exchange 的属性取出来,调用的真实地址、超时时间、重试次数
  • 2.封装了一个RequestBodySpec对象(不认识这个响应式编程的东西)
  • 3.调用了一个handleRequestBody方法

先认识handleRequestBody方法

private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
                                         final ServerWebExchange exchange,
                                         final long timeout,
                                         final int retryTimes,
                                         final SoulPluginChain chain) {
        return requestBodySpec.headers(httpHeaders -> {
            httpHeaders.addAll(exchange.getRequest().getHeaders());
            httpHeaders.remove(HttpHeaders.HOST);
        })
                .contentType(buildMediaType(exchange))
                .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
                .exchange()
                //失败打印日志
                .doOnError(e -> log.error(e.getMessage()))
                //设置超时时间
                .timeout(Duration.ofMillis(timeout))
                //设置请求重试实际
                .retryWhen(Retry.onlyIf(x -> x.exception() instanceof ConnectTimeoutException)
                    .retryMax(retryTimes)
                    .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
                //请求结束后对应的处理
                .flatMap(e -> doNext(e, exchange, chain));

    }

在这个方法里,大体可以理解为

  • exchange 中的请求头放到本次调用的请求头中
  • 设置 contentType
  • 设置超时时间
  • 设置失败响应
  • 设置重试的场景及重试次数
  • 最终结果的处理。 在流程中需要还需要看一个doNext 方法

大体逻辑就是判断请求是否成功,将请求结果放入 exchange 中交给下游插件处理。

private Mono<Void> doNext(final ClientResponse res, final ServerWebExchange exchange, final SoulPluginChain chain) {
        if (res.statusCode().is2xxSuccessful()) {
            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
        } else {
            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
        }
        exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);
        return chain.execute(exchange);
    }

ps: 虽然并不懂响应式编程,但并不影响我们阅读代码。

WebClientResponsePlugin Http 结果处理插件

该实现的 excute 方法没有什么核心逻辑,就是判断请求状态码,根据状态码返回给前端不同的数据格式。

public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        return chain.execute(exchange).then(Mono.defer(() -> {
            ServerHttpResponse response = exchange.getResponse();
            ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
            if (Objects.isNull(clientResponse)
                    || response.getStatusCode() == HttpStatus.BAD_GATEWAY
                    || response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
                Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
                return WebFluxResultUtils.result(exchange, error);
            }
            if (response.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT) {
                Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_TIMEOUT.getCode(), SoulResultEnum.SERVICE_TIMEOUT.getMsg(), null);
                return WebFluxResultUtils.result(exchange, error);
            }
            response.setStatusCode(clientResponse.statusCode());
            response.getCookies().putAll(clientResponse.cookies());
            response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
            return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()));
        }));
    }

总结

到此为止,一个基于 Soul 网关发起的 Http 请求调用流程大体已经结束。

梳理 http 请求调用流程

  • Global 插件封装 SoulContext 对象
  • 前置插件处理熔断限流鉴权等操作。
  • Divide 插件选择对应调用的真实地址,重试次数,超时时间。
  • WebClient 插件发起真实的 Http 调用
  • WebClientResponse 插件处理对应结果,返回前台。

基于 Http 调用的大体流程,我们可以大体猜测出基于别 RPC 调用的流程,就是替换发起请求的插件和返回结果处理的插件。

在上文中我们还提到了路由规则的选择LoadBalanceUtils,选择器和规则的处理MatchStrategy

之后将会开启新篇章一步步揭开 RPC 泛化调用,路由选择,选择器、规则匹配的神秘面纱。

上次编辑于:
贡献者: Cicici-Shi