Apache Dubbo is a high-performance and lightweight open source Java service framework, which mainly provides six core capabilities: high-performance RPC invocation for interface agents, intelligent fault tolerance and load balancing, automatic service registration and discovery, high scalability, run-time traffic scheduling, and visual service governance and operation and maintenance. The Dubbo plug-in in the gateway is mainly used to convert Http requests to Dubbo protocol, and it is also the key for the gateway to implement Dubbo generalization calls. Dubbo plug-ins need to cooperate metadata to implement Dubbo calls.
The function of metadata is to get the real request path and methodNameparameterTypes prepare for the generalization call during the protocol conversion.
In the database, we have a separate table to store Dubbo meta information. Through the data synchronization scheme, the data of this table will be synchronized to the JVM memory of the gateway
The path field is mainly used to match a piece of data according to your path field when requesting the gateway, and then carry out the subsequent processing process.
rpc_ext Field If the proxy interface is a Dubbo service interface of type and the field is set groupversion, the information will be stored rpc_ext in
Each Dubbo interface method will deal with a piece of metadata. Compared with Spring Cloud and HTTP, only one piece of/contextPath/\ ** is stored and none is stored respectively.
Log in to soul-admin background, open the switch of Dubbo configuration option on the plug-in management page, and fill in the connection address of the registry.
@SoulDubboClient(path ="/insert", desc ="Insert a row of data")publicDubboTestinsert(finalDubboTest dubboTest){
dubboTest.setName("hello world Soul Apache Dubbo: "+ dubboTest.getName());return dubboTest;}
The proxied service uses the soul-spring-boot-starter-client-apache-dubbo provided client dependencies and @SoulDubboClient annotations to register the interface name, parameter type, and parameter content to the soul-admin end at startup, and then the backend admin synchronizes the data to the bootstrap end.
# Introduction to Apache Dubbo Generalization Calls
The generalized interface calling mode is mainly used when the client does not have an API interface and a model class element. All POJOs in the parameters and return values are represented by Map. It is usually used for framework integration and can be implemented by calling all services through GenericS.
# Using generalized calls through the API (the way the gateway is currently used)
GenericService This interface is very similar to Java's reflection call. You only need to provide the name of the method called, the type of the parameter, and the value of the parameter to call the corresponding method directly.
Generic Filter: responsible for the conversion of provider-side parameters
Converts the parameters of the hashMap structure to the corresponding Pojo when called
The return result is to convert Pojo to hashMap.
GenericImpl Filter: It is responsible for the conversion of consumer side parameters and converting Pojo to hashMap interface.
/**
* Generic service interface
*
* @export
*/publicinterfaceGenericService{/**
* Generic invocation
*
* @parammethod Method name, e.g., findPerson. If there are overloaded methods, include the parameter list, e.g., findPerson(java.lang.String)
* @paramparameterTypes Parameter types
* @paramargs Parameter list
* @return invocation result
* @throwsGenericException Exception thrown by the method
*/Object $invoke(String method,String[] parameterTypes,Object[] args)throwsGenericException;defaultCompletableFuture<Object> $invokeAsync(String method,String[] parameterTypes,Object[] args)throwsGenericException{Object object = $invoke(method, parameterTypes, args);if(object instanceofCompletableFuture){return(CompletableFuture<Object>) object;}returnCompletableFuture.completedFuture(object);}}
When a service request is initiated, the method of the Handle class is entered SoulWebHandler first (as to why it becomes the request entry to query by itself, this article will not explain). The following is the plugins plug-in chain call from DefaultSoulPluginChain the class.
@OverridepublicMono<Void>execute(finalServerWebExchange exchange){// Reactive programmingreturnMono.defer(()->{// Check if the current index is less than the number of pluginsif(this.index < plugins.size()){// Get a plugin from plugins one by oneSoulPlugin plugin = plugins.get(this.index++);// Check if this plugin should be skippedBoolean skip = plugin.skip(exchange);if(skip){returnthis.execute(exchange);}return plugin.execute(exchange,this);}returnMono.empty();});}
This chapter focuses only on Apache Dubbo, so we'll focus on the invocation of the Dubbo plug-in. Through the Debug gateway program, we know that it is actually judged and called one by one according to the above order. Let's focus on ApacheDubboPlugin
@OverrideprotectedMono<Void>doExecute(finalServerWebExchange exchange,finalSoulPluginChain chain,finalSelectorData selector,finalRuleData rule){// Get dubbo_params dataString body = exchange.getAttribute(Constants.DUBBO_PARAMS);// Get attribute value from exchange contextSoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);assert soulContext !=null;// Get attribute value from exchange metaDataMetaData metaData = exchange.getAttribute(Constants.META_DATA);// Check if metaData is incorrect, if so, return an error responseif(!checkMetaData(metaData)){assert metaData !=null;
log.error(" path is :{}, meta data have error.... {}", soulContext.getPath(), metaData.toString());
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);Object error =SoulResultWrap.error(SoulResultEnum.META_DATA_ERROR.getCode(),SoulResultEnum.META_DATA_ERROR.getMsg(),null);returnWebFluxResultUtils.result(exchange, error);}// Check if parameterTypes and body in metaData are empty, if so, return a body error responseif(StringUtils.isNoneBlank(metaData.getParameterTypes())&&StringUtils.isBlank(body)){
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);Object error =SoulResultWrap.error(SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getCode(),SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getMsg(),null);returnWebFluxResultUtils.result(exchange, error);}// Perform asynchronous call to Dubbo GenericsService with exchange, body, and metaDatafinalMono<Object> result = dubboProxyService.genericInvoker(body, metaData, exchange);return result.then(chain.execute(exchange));}
First, check the parameters required by the generalization call.
publicMono<Object>genericInvoker(finalString body,finalMetaData metaData,finalServerWebExchange exchange)throwsSoulException{// issue(https://github.com/dromara/soul/issues/471), add dubbo tag routeString dubboTagRouteFromHttpHeaders = exchange.getRequest().getHeaders().getFirst(Constants.DUBBO_TAG_ROUTE);if(StringUtils.isNotBlank(dubboTagRouteFromHttpHeaders)){RpcContext.getContext().setAttachment(CommonConstants.TAG_KEY, dubboTagRouteFromHttpHeaders);}// Get reference based on metaData pathReferenceConfig<GenericService> reference =ApplicationConfigCache.getInstance().get(metaData.getPath());if(Objects.isNull(reference)||StringUtils.isEmpty(reference.getInterface())){ApplicationConfigCache.getInstance().invalidate(metaData.getPath());
reference =ApplicationConfigCache.getInstance().initRef(metaData);}// et the instance of GenericService for generic invocation based on referenceGenericService genericService = reference.get();Pair<String[],Object[]> pair;if(ParamCheckUtils.dubboBodyIsEmpty(body)){
pair =newImmutablePair<>(newString[]{},newObject[]{});}else{// Organize parameter types and values for Dubbo generic invocation based on body and parameterTypes
pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());}// Perform asynchronous call using the default $invokeAsync method of GenericServiceCompletableFuture<Object> future = genericService.$invokeAsync(metaData.getMethodName(), pair.getLeft(), pair.getRight());returnMono.fromFuture(future.thenApply(ret ->{if(Objects.isNull(ret)){
ret =Constants.DUBBO_RPC_RESULT_EMPTY;}// After successful invocation, copy the result and type to attributes of the exchange
exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, ret);
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE,ResultEnum.SUCCESS.getName());return ret;})).onErrorMap(exception -> exception instanceofGenericException?newSoulException(((GenericException) exception).getExceptionMessage()):newSoulException(exception));}
Dubbo generalized invocation is mainly divided into two parts, namely, how to use GenericImplFilter the consumer side to intercept the generalized invocation, and how to use GenericFilter the service provider side to serialize the generalized parameters after intercepting the request and then request the specific service.
# How does the service consumer org. Apache. Dubbo. RPC. Filter. GenericImplFilter intercept generalized calls?
@Activate(group =CommonConstants.CONSUMER, value =GENERIC_KEY, order =20000)publicclassGenericImplFilterimplementsFilter,Filter.Listener{@OverridepublicResultinvoke(Invoker<?> invoker,Invocation invocation)throwsRpcException{// ... Omitted non-core code// Check if it's a generic callif(isMakingGenericCall(generic, invocation)){// Get the generic parametersObject[] args =(Object[]) invocation.getArguments()[2];// If it's nativeJava modeif(ProtocolUtils.isJavaGenericSerialization(generic)){for(Object arg : args){if(!(byte[].class== arg.getClass())){error(generic,byte[].class.getName(), arg.getClass().getName());}}// If it's bean mode}elseif(ProtocolUtils.isBeanGenericSerialization(generic)){for(Object arg : args){if(!(arg instanceofJavaBeanDescriptor)){error(generic,JavaBeanDescriptor.class.getName(), arg.getClass().getName());}}}// Set attachment for server-side invocation
invocation.setAttachment(GENERIC_KEY, invoker.getUrl().getParameter(GENERIC_KEY));}// Perform remote invocationreturn invoker.invoke(invocation);}privatebooleanisMakingGenericCall(String generic,Invocation invocation){return(invocation.getMethodName().equals($INVOKE)|| invocation.getMethodName().equals($INVOKE_ASYNC))&& invocation.getArguments()!=null&& invocation.getArguments().length ==3&&ProtocolUtils.isGeneric(generic);}}
GenericImpl Filter implements the interface Filter (I will not introduce the Filter in Dubbo) and then executes the Invoke method. The invoke method mainly does the following things:
Parameter validation to check whether the call is a generalized call
Get the generalization parameters
Determine the generalization calling mode: traverse each parameter, and then determine whether the generalization mode of the parameter is native Java or bean in turn
Initiates a remote call
# The service provider intercepts the generalization request through Generic Filter.
@Activate(group =CommonConstants.PROVIDER, order =-20000)publicclassGenericFilterimplementsFilter,Filter.Listener{@OverridepublicResultinvoke(Invoker<?> invoker,Invocation inv)throwsRpcException{// Parameter validationif((inv.getMethodName().equals($INVOKE)|| inv.getMethodName().equals($INVOKE_ASYNC))&& inv.getArguments()!=null&& inv.getArguments().length ==3&&!GenericService.class.isAssignableFrom(invoker.getInterface())){// Get parameter name, type, and valueString name =((String) inv.getArguments()[0]).trim();String[] types =(String[]) inv.getArguments()[1];Object[] args =(Object[]) inv.getArguments()[2];try{// Get the called method using reflectionMethod method =ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);Class<?>[] params = method.getParameterTypes();if(args ==null){
args =newObject[params.length];}// Get the generic type used for the generic reference, true or bean or nativejavaString generic = inv.getAttachment(GENERIC_KEY);if(StringUtils.isBlank(generic)){
generic =RpcContext.getContext().getAttachment(GENERIC_KEY);}// If generic=true, deserialize parameters using the true methodif(StringUtils.isEmpty(generic)||ProtocolUtils.isDefaultGenericSerialization(generic)||ProtocolUtils.isGenericReturnRawResult(generic)){
args =PojoUtils.realize(args, params, method.getGenericParameterTypes());// If generic=nativejava, deserialize parameters using the nativejava method}elseif(ProtocolUtils.isJavaGenericSerialization(generic)){for(int i =0; i < args.length; i++){if(byte[].class== args[i].getClass()){try(UnsafeByteArrayInputStream is =newUnsafeByteArrayInputStream((byte[]) args[i])){
args[i]=ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA).deserialize(null, is).readObject();}catch(Exception e){thrownewRpcException("Deserialize argument ["+(i +1)+"] failed.", e);}}else{thrownewRpcException(...);}}// If generic=bean, deserialize parameters using the bean method}elseif(ProtocolUtils.isBeanGenericSerialization(generic)){for(int i =0; i < args.length; i++){if(args[i]instanceofJavaBeanDescriptor){
args[i]=JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);}else{thrownewRpcException(...);}}}...// Pass the current request to the next Filter in the FilterChain and return the resultRpcInvocation rpcInvocation =newRpcInvocation(method, invoker.getInterface().getName(), args, inv.getAttachments(), inv.getAttributes());
rpcInvocation.setInvoker(inv.getInvoker());
rpcInvocation.setTargetServiceUniqueName(inv.getTargetServiceUniqueName());return invoker.invoke(rpcInvocation);}catch(NoSuchMethodException e){thrownewRpcException(e.getMessage(), e);}catch(ClassNotFoundException e){thrownewRpcException(e.getMessage(), e);}}// If it's not a generic call, pass the request directly to the next Filter in the FilterChainreturn invoker.invoke(inv);}}
The above is the general process of how the Dubbo service provider intercepts the generalization request and processes it:
Parameter check to determine whether the request is a generalized call
Get the parameter name, parameter type, parameter value,
Use reflection to get the method called, and the generalization used true or or
Deserialize the generalization parameters based on the generalization method
Pass the request, including the called method, parameters and context information, to the next Filter in the FilterChain, and return the Result
According to the generalization method, deserialize the Result and return it to the service consumer
The above is the analysis of how to configure the Dubbo plug-in to the entire call process, and then respectively introduce the details of how the service consumer and service provider intercept the generalized call process and serialize the parameters. I hope it will be helpful to you.