Skip to content

Commit

Permalink
fix bug Flink Server side JM TM启动 classloader 初始化并发异常 #169
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Nov 28, 2022
1 parent 01b4778 commit b9df59a
Showing 1 changed file with 103 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.plugin;
Expand Down Expand Up @@ -218,7 +218,7 @@ public static Manifest createFlinkIncrJobManifestCfgAttrs(TargetResName collecti
RobustReflectionConverter.PluginMetas pluginMetas = null;
try {
pluginMetas = RobustReflectionConverter.getUnCacheableThreadMetas();
MQListenerFactory sourceFactory = HeteroEnum.getIncrSourceListenerFactory(collection.getName());
MQListenerFactory sourceFactory = HeteroEnum.getIncrSourceListenerFactory(collection.getName());
sourceFactory.create();

// 先收集plugmeta,特别是通过dataXWriter的dataSource关联的元数据
Expand Down Expand Up @@ -319,23 +319,39 @@ public Set<String> getPluginNames() {
return pluginMetas.stream().map((m) -> m.getPluginName()).collect(Collectors.toSet());
}

/**
* Flink 远端 会启多个VM,一个JM,多个TM,问题是多个VM在运行时 本地插件/配置的目录对应的是一个,为了避免当其中一个VM更新之后,其他VM就中的pluginManager->UberClassLoader就
* 不更新了,所以本地需要有两个配置快照的副本,一个是localSnaphsot,另外一个 cacheSnaphsot
*
* @param localSnaphsot 每次从本地文件系统中去load
* @param cacheSnaphsot 缓存在VM内存中的,用来和最新的远端快照做对比
* @throws Exception
*/
public void synchronizTpisAndConfs(PluginAndCfgsSnapshot localSnaphsot
, Optional<PluginAndCfgsSnapshot> cacheSnaphsot) throws Exception {
this.synchronizTpisAndConfs(localSnaphsot);
if (cacheSnaphsot.isPresent()) {
this.updatePluginManager(cacheSnaphsot.get());
}
}


/**
* 通过将远程仓库中的plugin tpi的最近更新时间和本地tpi的最新更新时间经过对比,计算出需要更新的插件集合
*
* @param localSnaphsot
* @return
*/
public void synchronizTpisAndConfs(PluginAndCfgsSnapshot localSnaphsot) throws Exception {
private void synchronizTpisAndConfs(PluginAndCfgsSnapshot localSnaphsot) throws Exception {
if (!localSnaphsot.appMetas.isPresent()) {
throw new IllegalArgumentException("localSnaphsot.appMetas must be present");
}
Set<PluginMeta> result = Sets.newHashSet();
StringBuffer updateTpisLogger = new StringBuffer("\nplugin synchronize------------------------------\n");

StringBuffer updateTpisLogger = new StringBuffer("\nplugin synchronizTpisAndConfs------------------------------\n");

Long localTimestamp;
File cfg = null;
boolean cfgChanged = false;
//boolean cfgChanged = false;
// URL globalCfg = null;
updateTpisLogger.append(">>global cfg compare:\n");
for (Map.Entry<String, Long> entry : this.globalPluginStoreLastModify.entrySet()) {
Expand All @@ -346,7 +362,7 @@ public void synchronizTpisAndConfs(PluginAndCfgsSnapshot localSnaphsot) throws E
cfg = CenterResource.copyFromRemote2Local(Config.KEY_TIS_PLUGIN_CONFIG + "/" + entry.getKey(), true);
FileUtils.writeStringToFile(
PluginStore.getLastModifyTimeStampFile(cfg), String.valueOf(entry.getValue()), TisUTF8.get());
cfgChanged = true;
// cfgChanged = true;
updateTpisLogger.append(entry.getKey()).append(localTimestamp == null
? "[" + entry.getValue() + "] local is none"
: " center ver:" + entry.getValue()
Expand All @@ -357,7 +373,8 @@ public void synchronizTpisAndConfs(PluginAndCfgsSnapshot localSnaphsot) throws E

updateTpisLogger.append(">>app cfg compare:\n");
updateTpisLogger.append("center:").append(this.appLastModifyTimestamp)
.append(this.appLastModifyTimestamp > localSnaphsot.appLastModifyTimestamp ? " > " : " <= ").append("local:").append(localSnaphsot.appLastModifyTimestamp).append("\n");
.append(this.appLastModifyTimestamp > localSnaphsot.appLastModifyTimestamp ? " > " : " <= ")
.append("local:").append(localSnaphsot.appLastModifyTimestamp).append("\n");
if (this.appLastModifyTimestamp > localSnaphsot.appLastModifyTimestamp) {
// 更新app相关配置,下载并更新本地配置
KeyedPluginStore.AppKey appKey = new KeyedPluginStore.AppKey(null, false, this.collection.getName(), null);
Expand All @@ -383,33 +400,49 @@ public Void p(int status, InputStream stream, Map<String, List<String>> headerFi
return null;
}
});
cfgChanged = true;
// cfgChanged = true;
}

updateTpisLogger.append(">>center repository:")
.append(pluginMetas.stream().map((meta) -> meta.toString()).collect(Collectors.joining(",")));
updateTpisLogger.append("\n>>local:")
.append(localSnaphsot.pluginMetas.stream()
.map((meta) -> meta.toString())
.collect(Collectors.joining(","))).append("\n");
updateTpisLogger.append(">>compare result\n");
Map<String, PluginMeta> locals = localSnaphsot.pluginMetas.stream()
.collect(Collectors.toMap((m) -> m.getKey(), (m) -> m));
PluginMeta m = null;
for (PluginMeta meta : pluginMetas) {
m = locals.get(meta.getKey());
if (m == null || meta.getLastModifyTimeStamp() > m.getLastModifyTimeStamp()) {
result.add(meta);
updateTpisLogger.append(meta.getKey()).append(m == null
? " local is none"
: " center repository ver:" + meta.getLastModifyTimeStamp()
+ " > local ver:" + m.getLastModifyTimeStamp()).append("\n");
}
}
Set<PluginMeta> result = getShallUpdatePluginMeta(localSnaphsot, updateTpisLogger);

for (PluginMeta update : result) {
update.copyFromRemote(Collections.emptyList(), true, true);
}
// TIS tis = TIS.get();
// PluginManager pluginManager = tis.getPluginManager();
// Set<PluginMeta> loaded = Sets.newHashSet();
// PluginWrapperList batch = new PluginWrapperList();
// for (PluginMeta update : result) {
// dynamicLoad(pluginManager, update, batch, result, loaded);
// }
//
// if (batch.size() > 0) {
// pluginManager.start(batch);
// updateTpisLogger.append("\ndynamic reload plugins:" + batch.getBatchNames());
// }
// Thread.sleep(3000l);
// if (cfgChanged) {
// TIS.cleanPluginStore();
// tis.cleanExtensionCache();
// }

logger.info(updateTpisLogger.append("\n------------------------------").toString());
// return result;
}

/**
* 更新本地pluginManger,激活插件
*
* @param localCacheSnaphsot
* @throws Exception
*/
private void updatePluginManager(PluginAndCfgsSnapshot localCacheSnaphsot) throws Exception {
StringBuffer updateTpisLogger = new StringBuffer("\nplugin updatePluginManager synchronize------------------------------\n");
Set<PluginMeta> result = getShallUpdatePluginMeta(localCacheSnaphsot, updateTpisLogger);

// for (PluginMeta update : result) {
// update.copyFromRemote(Collections.emptyList(), true, true);
// }
TIS tis = TIS.get();
PluginManager pluginManager = tis.getPluginManager();
Set<PluginMeta> loaded = Sets.newHashSet();
Expand All @@ -421,17 +454,41 @@ public Void p(int status, InputStream stream, Map<String, List<String>> headerFi
if (batch.size() > 0) {
pluginManager.start(batch);
updateTpisLogger.append("\ndynamic reload plugins:" + batch.getBatchNames());
}
Thread.sleep(3000l);
if (cfgChanged) {
Thread.sleep(3000l);
TIS.cleanPluginStore();
tis.cleanExtensionCache();
}


logger.info(updateTpisLogger.append("\n------------------------------").toString());
// return result;
}

private Set<PluginMeta> getShallUpdatePluginMeta(PluginAndCfgsSnapshot localSnaphsot, StringBuffer updateTpisLogger) {
Set<PluginMeta> result = new HashSet<>();
updateTpisLogger.append(">>center repository:")
.append(pluginMetas.stream().map((meta) -> meta.toString()).collect(Collectors.joining(",")));
updateTpisLogger.append("\n>>local:")
.append(localSnaphsot.pluginMetas.stream()
.map((meta) -> meta.toString())
.collect(Collectors.joining(","))).append("\n");
updateTpisLogger.append(">>compare result\n");
Map<String, PluginMeta> locals = localSnaphsot.pluginMetas.stream()
.collect(Collectors.toMap((m) -> m.getKey(), (m) -> m));
PluginMeta m = null;
for (PluginMeta meta : pluginMetas) {
m = locals.get(meta.getKey());
if (m == null || meta.getLastModifyTimeStamp() > m.getLastModifyTimeStamp()) {
result.add(meta);
updateTpisLogger.append(meta.getKey()).append(m == null
? " local is none"
: " center repository ver:" + meta.getLastModifyTimeStamp()
+ " > local ver:" + m.getLastModifyTimeStamp()).append("\n");
}
}
return result;
}

/**
* 为了去除batch plugin中的重复机器,用一个List包裹一下
*/
Expand Down

0 comments on commit b9df59a

Please sign in to comment.