Soul网关学习Http长轮询解析01
后台与网关数据同步 (Http 长轮询篇)
配置
后台信息模式切换
在上篇分析 Zookeeper 同步的文章 (Soul 网关源码分析-11 期) 中, 我们通过 DataSyncConfiguration 这个配置类做的切换, 这次有了经验, 直接贴配置
soul:
sync:
websocket:
enabled: false
http:
enabled: true
网关信息模式切换
后台模式切换完成, 接下来就是网关, 继续照葫芦画瓢找到关键配置类上的参数设置. 这里也直接贴网关配置
soul:
sync:
# websocket:
# urls: ws://localhost:9095/websocket
http:
url: http://localhost:9095
DataChangedListener 体系
后台数据初始化 DataSyncConfiguration 配置关键 Bean , 看看这里关于 Http 长轮询的 Bean
@Configuration
public class DataSyncConfiguration {
@Configuration
@ConditionalOnProperty(name = "soul.sync.http.enabled", havingValue = "true")
@EnableConfigurationProperties(HttpSyncProperties.class)
static class HttpLongPollingListener {
@Bean
@ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)
public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
return new HttpLongPollingDataChangedListener(httpSyncProperties);
}
}
}
HttpLongPollingDataChangedListener 继承自 AbstractDataChangedListener, 他们都实现自接口 DataChangedListener.
DataChangedListener 这个接口我们应该非常熟悉了, 它提供了众多不同数据类型变动的方法, 供 DataChangedEventDispatcher 调用, 这个类更是一个 "老朋友" 了, 作为一个中转站, 辛勤的处理数据同步的事件分类及分发
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
// 持有 DataChangedListener 集合
private List<DataChangedListener> listeners;
// 事件变动时, 通知 DataChangedListener 的不同事件类型的方法
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
}
public interface DataChangedListener {
default void onAppAuthChanged(List<AppAuthData> changed, DataEventTypeEnum eventType) {}
default void onPluginChanged(List<PluginData> changed, DataEventTypeEnum eventType) {}
default void onSelectorChanged(List<SelectorData> changed, DataEventTypeEnum eventType) {}
default void onMetaDataChanged(List<MetaData> changed, DataEventTypeEnum eventType) {}
default void onRuleChanged(List<RuleData> changed, DataEventTypeEnum eventType) {}
}
这两个的作用了解了, 那 AbstractDataChangedListener 又做了什么事情? 举个 onPluginChanged() 的例子:
public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();
@Override
public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
if (CollectionUtils.isEmpty(changed)) {
return;
}
this.updatePluginCache();
this.afterPluginChanged(changed, eventType);
}
// 修改缓存 (可重写)
protected void updatePluginCache() {
this.updateCache(ConfigGroupEnum.PLUGIN, pluginService.listAll());
}
protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
String json = GsonUtils.getInstance().toJson(data);
ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
}
// 钩子, 自定义结束数据变动后要干什么 (可重写)
protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
}
}
对于一个插件数据变动方法 (onPluginChanged), 其实 AbstractDataChangedListener 就是定义了一个模板, 让子类可以按照指定步骤进行工作, 具体每个步骤的工作细节可以由子类自己实现.
其次, 如果不重写它的缓存更新, 就由这个类在 CACHE 中维护.
其他同步策略此时在干什么?
在 DataChangedEventDispatcher 调取 onPluginChanged() 之后, 长轮询模块会怎么实现呢? 不妨先想想其他同步方式在此时做了什么
举例 websocket 模式, 它自己重写了 onPluginChanged(), 发送 websocket 信息给持有会话, 其中就有网关.
public class WebsocketDataChangedListener implements DataChangedListener {
@Override
public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {
WebsocketData<PluginData> websocketData =
new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}
}
再看 zookeeper 模式, 它也重写了 onPluginChanged(), 去修改 zookeeper 上的节点信息, 这样网关端会监听到他们的节点变动.
public class ZookeeperDataChangedListener implements DataChangedListener {
@Override
public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
for (PluginData data : changed) {
String pluginPath = ZkPathConstants.buildPluginPath(data.getName());
// delete
if (eventType == DataEventTypeEnum.DELETE) {
deleteZkPathRecursive(pluginPath);
String selectorParentPath = ZkPathConstants.buildSelectorParentPath(data.getName());
deleteZkPathRecursive(selectorParentPath);
String ruleParentPath = ZkPathConstants.buildRuleParentPath(data.getName());
deleteZkPathRecursive(ruleParentPath);
continue;
}
//create or update
insertZkNode(pluginPath, data);
}
}
}
可以知道, 到这个节骨眼, 其他同步策略已经在忙着通知网关了, 那 Http 长轮询也肯定要做这事.
这两个策略的通知方式也不同, websocket 是好人做到底, 直接找到 session 会话把信息亲自送过去. zookeeper 将节点信息改变后撒手不管, 网关自己监听到变更再做的同步.
那么我们的 Http 长轮询现在要以何种方式去通知网关呢? 接着看.
长轮询实现方式思考
先思考下我自己设计长轮询, 会怎么实现 ?
正常的长轮询实现应该由网关主动请求, 后台接住这个请求并 hold 住, 如果有更新就直接返回, 没有就阻塞一定时间. 而后台则是做好数据的更新, hold 住时检查数据是否有变化.
那这里涉及到三个点:
- 数据怎样知道是有变化的, 是不是设置个最后更新时间, 与网关的请求时间比较, 得出是否有数据修改?
- hold 住之后, 后台怎么获知是否数据更新, 反复遍历还是阻塞等待?
- 那些用于更新的数据放哪里, 用缓存的话, 考虑后台缓存与数据库的交互是怎样的.
HttpLongPollingDataChangedListener 长轮询实现
围绕我们的思考, 看看 HttpLongPollingDataChangedListener 是如何实现的. 先看看关于父类 onPluginChanged() 这块的实现
public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
private final ScheduledExecutorService scheduler;
@Override
protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN));
}
}
Http 长轮询不会直接覆盖 onPluginChanged() 而是直接使用其父类的, 意味着使用了它的 CACHE, 那最终我们的信息获取肯定也少不了分析这个, 先暂放一边.
接下来的逻辑会调用到我们这块实现的 afterPluginChanged() 方法, 这里用了一个定时类型的线程池, 去跑一个 Runnable 类型的任务 DataChangeTask.
class DataChangeTask implements Runnable {
@Override
public void run() {
// 遍历 clients
for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
LongPollingClient client = iter.next();
iter.remove();
// 说明完成 response 响应了
client.sendResponse(Collections.singletonList(groupKey));
log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
}
}
}
数据变动后使用线程池调到了这个方法, 拿取所有 clients
, 一边遍历一边剔除元素, 且调用方法 sendResponse(), 像是标记已完成了响应.
我来猜测下它干了什么, 这里的 clients
很有可能就是网关被 hold 住的请求, 而 sendResponse() 则很有可能就是真的给请求上下文加了响应信息. 还有一个关键动作就是结束 hold, 让网关接收到响应信息, 并在集合中剔除这个请求.
我们现在追踪下 client
的产生, 它是 HttpLongPollingDataChangedListener 里的一个 BlockingQueue 阻塞队列, 在 LongPollingClient 中被定时检测
class LongPollingClient implements Runnable {
@Override
public void run() {
this.asyncTimeoutFuture = scheduler.schedule(() -> {
clients.remove(LongPollingClient.this);
List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
sendResponse(changedGroups);
}, timeoutTime, TimeUnit.MILLISECONDS);
// 这里是关键, 表明来源
clients.add(this);
}
}
先不去分析这个 remove() 的检测代码块, 直接看到最后一句的 add(), 这里就是 clients
数据来源.
找到 LongPollingClient 被调用处, HttpLongPollingDataChangedListener#doLongPolling
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
// ...
// listen for configuration changed.
// 开启同步阻塞请求
final AsyncContext asyncContext = request.startAsync();
// AsyncContext.settimeout() does not timeout properly, so you have to control it yourself
asyncContext.setTimeout(0L);
// block client's thread.
// 线程池调用 LongPollingClient#run
scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}
这里的最后一句会调用并添加 client
, 这里有行关键代码阻塞住了请求:
final AsyncContext asyncContext = request.startAsync();
而在 LongPollingClient#sendResponse 中, 刚刚也分析了, 除了包装注入响应信息, 还会将 hold 住的请求释放
class LongPollingClient implements Runnable {
void sendResponse(final List<ConfigGroupEnum> changedGroups) {
// cancel scheduler
if (null != asyncTimeoutFuture) {
asyncTimeoutFuture.cancel(false);
}
generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
// 同步完成结束阻塞
asyncContext.complete();
}
}
这块分析完了再回到 doLongPolling(), 其中线程池调用这还有个关键点
scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
这里给 LongPollingClient 传入了 60S 的 timeout 时间, 做什么用的呢? 还记得我们在 LongPollingClient#run 时略过的一块代码吗
class LongPollingClient implements Runnable {
@Override
public void run() {
// 定时启动, 延迟时间根据 timeoutTime
this.asyncTimeoutFuture = scheduler.schedule(() -> {
// 移除管理的连接
clients.remove(LongPollingClient.this);
List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
// 这个方法会将阻塞的请求释放
sendResponse(changedGroups);
}, timeoutTime, TimeUnit.MILLISECONDS);
clients.add(this);
}
}
这里我们已经搞懂了后台这块对长轮询流程的实现, 最后再看看 doLongPolling() 是怎么被调用到的, 找到调用类 ConfigController
@ConditionalOnBean(HttpLongPollingDataChangedListener.class)
@RestController
@RequestMapping("/configs")
@Slf4j
public class ConfigController {
@PostMapping(value = "/listener")
public void listener(final HttpServletRequest request, final HttpServletResponse response) {
longPollingListener.doLongPolling(request, response);
}
}
看到这也基本明了, 后台通过这个 Controller 暴露 http 路径供网关调用并监听数据变化.
总结
- 后台通过 Controller 层暴露 API 给网关, 网关请求后台时后台并不是立即返回响应 (数据有无变化), 而是 hold 住请求最大 60 秒的时间. 这些被 hold 住的请求会加入到阻塞队列中作为内存缓存.
- 这 60 秒钟如果有数据变化, 通过 DataChangedEventDispatcher 分发到我们的 HttpLongPollingDataChangedListener , 则 立即调用线程池 在阻塞队列中遍历所有被 hold 住的请求, 塞入响应信息并释放掉.
- 如果 60 秒过后依然没有数据变化, hold 住的请求会被释放, 且阻塞队列的对应请求对象被剔除.
到这里, 我们已经理清它最最基本的长轮询逻辑, 那么对应下一开始的思考, 看有什么结论 or 疑惑.
- 数据怎样知道是有变化的, 是不是设置个最后更新时间, 与网关的请求时间比较, 得出是否有数据修改?
- hold 住之后, 后台怎么获知是否数据更新, 反复遍历还是阻塞等待?
- 那些用于更新的数据放哪里, 用缓存的话, 考虑后台缓存与数据库的交互是怎样的.
针对第 1 点, 我们是如何得知数据有变化的呢?
目前我们分析的数据变动来源是 DataChangedEventDispatcher, 它可不仅仅只会在数据变动时告知我们信息, 每次手动点下后台同步这里立马就来调用了.
那么这里肯定有新旧数据比对之类的东西了, 不然每次调用就直接把网关的阻塞请求放跑了, 这可不成, 白白的 IO 消耗肯定不是个好设计.
针对第 2 点, 我们现在知道了模式是阻塞等待, 利用的是 AsyncContext
这种方式, 这块我也没有了解过, 会出个番外讨论一二.
针对第 3 点, 我们知道后台配置肯定修改完是要落盘到数据库的, 所以这块缓存与数据库的交互也是个值得分析的点. 这些疑问我会在下一章继续分析~