Soul网关学习Apache Dubbo插件原理解析
目标
- Apache Dubbo 插件介绍
- 元数据介绍
- Apache Dubbo 插件配置
- Bootstrap pom 配置
- soul-admin 配置
- dubbo 服务 pom 配置
- Apache Dubbo 泛化调用介绍
- 通过 API 方式使用泛化调用
- 通过 spring 使用泛化调用
- 泛化调用实现流程
- Soul Dubbo 插件调用解析
- ApachDubboPlugin 泛化调用准备
- ApacheDubboProxySerivce
- DubboResponsePlugin
- WebFluxResultUtils 返回结果
- Dubbo 泛化调用介绍
- 总结
- 参考
Apache Dubbo 插件介绍
Apache Dubbo 是一款高性能、轻量级的开源 Java 服务框架,主要提供了六大核心能力,面向接口代理的高性能 RPC 调用,智能容错和负载均衡,服务自动注册与发现,高度可扩展能力,运行期流量调度,可视化的服务治理与运维。 网关中 Dubbo 插件主要是将 Http协议
转换成 Dubbo协议
,也是网关实现 Dubbo 泛化调用的关键。而 Dubbo 插件需要配合 元数据
才能实现 Dubbo 调用。
元数据介绍
元数据作用就是在进行协议转换时候要获取真实的请求 path
、methodName
、 parameterTypes
为泛化调用做好准备
- 在数据库中,我们有一张表单独存储 Dubbo 元信息,通过数据同步方案,会把这张表的数据同步到网关的 JVM 内存中
- 表结构如下
CREATE TABLE IF NOT EXISTS `meta_data` (
`id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'id',
`app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '应用名称',
`path` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '路径,不能重复',
`path_desc` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '路径描述',
`rpc_type` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'rpc类型',
`service_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '服务名称',
`method_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '方法名称',
`parameter_types` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '参数类型 多个参数类型 逗号隔开',
`rpc_ext` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT 'rpc的扩展信息,json格式',
`date_created` datetime(0) NOT NULL COMMENT '创建时间',
`date_updated` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间',
`enabled` tinyint(4) NOT NULL DEFAULT 0 COMMENT '启用状态',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic;
path
字段主要是在请求网关的时候,会根据你的path
字段来匹配到一条数据,然后进行后续的处理流程rpc_ext
字段如果代理的接口是Dubbo
类型的服务接口,同时设置了group
version
字段时候,那么信息就会存储到rpc_ext
中- 每一个
Dubbo
接口方法会应对一条元数据,对比 SpringCloud、http 分别是只存储一条/contextPath/** 和不存储
Apache Dubbo 插件配置
soul-bootstrap pom 配置
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-plugin-apache-dubbo</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
soul-admin 配置
登录 soul-admin 后台在插件管理页面打开 Dubbo 配置选项的开关,和填写注册中心的连接地址
dubbo 服务 pom 配置
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-client-apache-dubbo</artifactId>
<version>${soul.version}</version>
</dependency>
@SoulDubboClient(path = "/insert", desc = "Insert a row of data")
public DubboTest insert(final DubboTest dubboTest) {
dubboTest.setName("hello world Soul Apache Dubbo: " + dubboTest.getName());
return dubboTest;
}
被代理的服务使用提供的
soul-spring-boot-starter-client-apache-dubbo
客户端依赖,同时使用@SoulDubboClient
注解,在启动时候将接口的名称,参数类型,参数内容注册到soul-admin
端,然后admin
端将数据同步到bootstrap
端。
Apache Dubbo 泛化调用介绍
泛化接口调用方式主要用于客户端没有 API 接口及模型类元的情况,参数及返回值中的所有 POJO 均用 Map
表示, 通常用于框架集成,可通过 GenericSerivce 调用所有服务实现。
通过 API 方式使用泛化调用(网关目前使用方式)
ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
reference.setGeneric(true);
reference.setApplication(applicationConfig);
reference.setRegistry(registryConfig);
reference.setInterface(metaData.getServiceName());
reference.setProtocol("dubbo");
网关通过 API 方式声明注册使用泛化调用
通过 Spring 使用泛化调用
<dubbo:reference id="barService" interface="com.foo.BarService" generic="true" />
泛化调用实现流程
+-------------------------------------------+ +-------------------------------------------+
| consumer 端 | | provider 端 |
| | | |
| | | |
| | | |
| | | |
| +------------------+ | | +--------------+ |
| |GenericImplFilter | | Invocation | |GenericFilter | |
| +----> | +-------------------------> | | |
| | +------------------+ | | +--------------+ |
| +-----------+ | | | +-----------+ |
| | | | | | | | |
| |Client | | | +--> | Service | |
| | | | | | | |
| +-----------+ | | +-------+---+ |
| | | | |
| ^ +------------------+ | | +--------------+ | |
| | |GenericImplFilter | | | |GenericFilter | <----------+ |
| +-------------+ | <-------------------------+ | |
| +------------------+ | | +--------------+ |
| | | |
| | | |
| | | |
| | | |
+-------------------------------------------+ +-------------------------------------------+
GenericService
这个接口和 Java 的反射调用非常像,只需提供调用的方法名称,参数的类型以及参数的值就可以直接调用对应方法了。
- GenericFilter : 负责 provider 端参数的转换
- 调用时,将 hashMap 结构的参数转换成对应 Pojo
- 返回结果是,将 Pojo 转换成 hashMap
- GenericImplFilter : 负责 consumer 端参数的转换,将 Pojo 转换成 hashMap 接口
/**
* Generic service interface
*
* @export
*/
public interface GenericService {
/**
* Generic invocation
*
* @param method 方法名,如:findPerson,如果有重载方法,需带上参数列表,如:findPerson(java.lang.String)
* @param parameterTypes 参数类型
* @param args 参数列表
* @return invocation 返回值
* @throws GenericException 方法抛出的异常
*/
Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;
default CompletableFuture<Object> $invokeAsync(String method, String[] parameterTypes, Object[] args) throws GenericException {
Object object = $invoke(method, parameterTypes, args);
if (object instanceof CompletableFuture) {
return (CompletableFuture<Object>) object;
}
return CompletableFuture.completedFuture(object);
}
}
Soul Dubbo 插件调用解析
当业务请求发起时候,首先进入 SoulWebHandler
(至于为什么成为请求入口自行查询,本文不作解释) 类的 Handle
方法,下面就带了 plugins
从 DefaultSoulPluginChain
类开始进入插件链调用。
@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler);
}
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
// 响应式编程
return Mono.defer(() -> {
// 判断当前index 是否 < 插件数量
if (this.index < plugins.size()) {
// 依次从plugins 中获取一种插件进行调用
SoulPlugin plugin = plugins.get(this.index++);
// 判断此插件是否未打开
Boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
}
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
本章只关注 Apache Dubbo 所以我们重点放到 Dubbo 插件的调用。 经过 Debug 网关程序我们知道其实是按照上面的顺序一个一个的进行判断调用。下面我们关注
ApacheDubboPlugin
ApachDubboPlugin 泛化调用准备
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
// 获取 dubbo_params 数据
String body = exchange.getAttribute(Constants.DUBBO_PARAMS);
// 获取 exchange context的属性值
SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
// 获取 exchange metaData 属性值
MetaData metaData = exchange.getAttribute(Constants.META_DATA);
// 判断metaData是否有误,如果有误直接返回 metaData 有误的返回信息
if (!checkMetaData(metaData)) {
assert metaData != null;
log.error(" path is :{}, meta data have error.... {}", soulContext.getPath(), metaData.toString());
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = SoulResultWrap.error(SoulResultEnum.META_DATA_ERROR.getCode(), SoulResultEnum.META_DATA_ERROR.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// 判断 metaData的parameterTypes 和 body 是否为空,如果有误则返回Body错误信息
if (StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(body)) {
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = SoulResultWrap.error(SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getCode(), SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// 带着exchange、body、metaData 进行 Dubbo GenericsService的异步调用
final Mono<Object> result = dubboProxyService.genericInvoker(body, metaData, exchange);
return result.then(chain.execute(exchange));
}
首先对泛化调用所需要的参数进行检查
ApacheDubboProxyService
public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {
// issue(https://github.com/dromara/soul/issues/471), add dubbo tag route
String dubboTagRouteFromHttpHeaders = exchange.getRequest().getHeaders().getFirst(Constants.DUBBO_TAG_ROUTE);
if (StringUtils.isNotBlank(dubboTagRouteFromHttpHeaders)) {
RpcContext.getContext().setAttachment(CommonConstants.TAG_KEY, dubboTagRouteFromHttpHeaders);
}
// 根据metaData路径获取ferference
ReferenceConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getPath());
if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {
ApplicationConfigCache.getInstance().invalidate(metaData.getPath());
reference = ApplicationConfigCache.getInstance().initRef(metaData);
}
// 根据ferference 获取泛化调用的实例 GenericService
GenericService genericService = reference.get();
Pair<String[], Object[]> pair;
if (ParamCheckUtils.dubboBodyIsEmpty(body)) {
pair = new ImmutablePair<>(new String[]{}, new Object[]{});
} else {
// 根据body 和 parameterTypes 组织Dubbo 泛化调用的参数类型和参数值
pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());
}
// 下面使用GenericSerice 默认方法$invokeAsync进行异步调用
CompletableFuture<Object> future = genericService.$invokeAsync(metaData.getMethodName(), pair.getLeft(), pair.getRight());
return Mono.fromFuture(future.thenApply(ret -> {
if (Objects.isNull(ret)) {
ret = Constants.DUBBO_RPC_RESULT_EMPTY;
}
// 等调用成功之后 将结果和类型复制到exchagne 对应的属性上
exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, ret);
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
return ret;
})).onErrorMap(exception -> exception instanceof GenericException ? new SoulException(((GenericException) exception).getExceptionMessage()) : new SoulException(exception));
}
DubboResponsePlugin
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
return chain.execute(exchange).then(Mono.defer(() -> {
final Object result = exchange.getAttribute(Constants.DUBBO_RPC_RESULT);
if (Objects.isNull(result)) {
Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
Object success = SoulResultWrap.success(SoulResultEnum.SUCCESS.getCode(), SoulResultEnum.SUCCESS.getMsg(), JsonUtils.removeClass(result));
return WebFluxResultUtils.result(exchange, success);
}));
}
WebFluxResultUtils 返回结果
Dubbo 泛化调用介绍
Dubbo 泛化调用主要就分为两块分别是消费端如何使用 GenericImplFilter
拦截泛化调用、服务提供端如何使用 GenericFilter
拦截请求后把泛化参数序列化然后请求给具体服务。
服务消费端 org.apache.dubbo.rpc.filter.GenericImplFilter 是如何拦截泛化调用
@Activate(group = CommonConstants.CONSUMER, value = GENERIC_KEY, order = 20000)
public class GenericImplFilter implements Filter, Filter.Listener {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// ... 省略非核心代码
// 判断是否为泛化调用
if (isMakingGenericCall(generic, invocation)) {
// 获取泛化参数
Object[] args = (Object[]) invocation.getArguments()[2];
// 如果泛化为nativeJava
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (Object arg : args) {
if (!(byte[].class == arg.getClass())) {
error(generic, byte[].class.getName(), arg.getClass().getName());
}
}
// 如果泛化方式为bean
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (Object arg : args) {
if (!(arg instanceof JavaBeanDescriptor)) {
error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName());
}
}
}
// 设置attachment ,以便与服务端调用
invocation.setAttachment(
GENERIC_KEY, invoker.getUrl().getParameter(GENERIC_KEY));
}
// 发起远程调用
return invoker.invoke(invocation);
}
private boolean isMakingGenericCall(String generic, Invocation invocation) {
return (invocation.getMethodName().equals($INVOKE) || invocation.getMethodName().equals($INVOKE_ASYNC))
&& invocation.getArguments() != null
&& invocation.getArguments().length == 3
&& ProtocolUtils.isGeneric(generic);
}
}
GenericImplFilter 实现接口 Filter(关于 Dubbo 中的 Filter,不做介绍)然后执行 Invoke 方法,invoke 方法主要做如下事情:
- 参数校验,检查这个调用是否是泛化调用
- 获取泛化参数
- 判断泛化调用方式:遍历每个参数,然后依次判断参数的泛化方式是 nativejava 还是 bean 方式
- 发起远程调用
服务提供端通过 GenericFilter 拦截泛化请求
@Activate(group = CommonConstants.PROVIDER, order = -20000)
public class GenericFilter implements Filter, Filter.Listener {
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
// 参数校验
if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !GenericService.class.isAssignableFrom(invoker.getInterface())) {
// 获取参数名称、参数类型、参数值
String name = ((String) inv.getArguments()[0]).trim();
String[] types = (String[]) inv.getArguments()[1];
Object[] args = (Object[]) inv.getArguments()[2];
try {
// 使用反射获取调用的方法
Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
Class<?>[] params = method.getParameterTypes();
if (args == null) {
args = new Object[params.length];
}
// 获取泛化引用使用的泛化类型,true or bean or nativejava
String generic = inv.getAttachment(GENERIC_KEY);
if (StringUtils.isBlank(generic)) {
generic = RpcContext.getContext().getAttachment(GENERIC_KEY);
}
// 如果generic=true 则使用true方式对入参进行反序列化
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)
|| ProtocolUtils.isGenericReturnRawResult(generic)) {
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
// 如果 generic=nativejava,则使用nativejava方式对入参进行反序列化
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (int i = 0; i < args.length; i++) {
if (byte[].class == args[i].getClass()) {
try (UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) {
args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA)
.deserialize(null, is).readObject();
} catch (Exception e) {
throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
}
} else {
throw new RpcException(...);
}
}
// 如果 generic=bean 则使用bean方式对入参进行反序列化
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (int i = 0; i < args.length; i++) {
if (args[i] instanceof JavaBeanDescriptor) {
args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
} else {
throw new RpcException(...);
}
}
} ...
// 将本次请求传递到FilterChain的下一个Filter中,并返回结果result
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args, inv.getAttachments(), inv.getAttributes());
rpcInvocation.setInvoker(inv.getInvoker());
rpcInvocation.setTargetServiceUniqueName(inv.getTargetServiceUniqueName());
return invoker.invoke(rpcInvocation);
} catch (NoSuchMethodException e) {
throw new RpcException(e.getMessage(), e);
} catch (ClassNotFoundException e) {
throw new RpcException(e.getMessage(), e);
}
}
// 如果不是泛化调用,直接把请求传给FilterChain的下一个Filter
return invoker.invoke(inv);
}
}
以上就是 Dubbo 服务提供端如何拦截泛化请求,并进行处理的大体流程:
- 参数校验,判断此次请求是不是泛化调用
- 获取参数名称、参数类型、参数值
- 使用反射获取调用的方法,和使用的泛化方式
true
ornativejava
orbean
- 根据泛化方式,反序列化泛化参数
- 将本次请求,包括调用的方法,参数和上下文信息传递给 FilterChain 的下一个 Filter 中,并返回 Result 结果
- 根据泛化方式,反序列化 Result 结果返回给服务消费端
总结
以上从如何配置 Dubbo 插件到整个调用流程的分析,然后分别介绍服务消费端与服务提供端如何拦截泛化调用流程对参数进行序列化细节,希望对你有所帮助