Skip to content

Commit

Permalink
Merge pull request #98 from duhenglucky/main_strategy
Browse files Browse the repository at this point in the history
[ISSUE #99] Add default rebalance strategy
  • Loading branch information
j-ching authored Dec 8, 2021
2 parents 2b9d15b + 8fff972 commit d0d88fb
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class RocketMQSource extends AbstractSupportShuffleSource {

protected Long pullIntervalMs;

protected String strategyName;
protected String strategyName = STRATEGY_AVERAGE;

protected transient DefaultMQPushConsumer consumer;
protected transient ConsumeFromWhere consumeFromWhere;//默认从哪里消费,不会被持久化。不设置默认从尾部消费
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.configurable.service;

import com.alibaba.fastjson.JSONObject;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -28,7 +27,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.component.AbstractComponent;
Expand Down Expand Up @@ -94,7 +92,7 @@ protected void updateConfiguresCache(List<IConfigurable> configureList) {

protected boolean equals(String key, List<?> newConfigureList) {
for (Object o : newConfigureList) {
IConfigurable configure = (IConfigurable)o;
IConfigurable configure = (IConfigurable) o;
String tempKey = getConfigureKey(configure.getNameSpace(), configure.getType(), configure.getConfigureName());
if (key.equals(tempKey)) {
IConfigurable oldConfigure = configurableMap.get(key);
Expand All @@ -115,31 +113,24 @@ public <T extends IConfigurable> List<T> queryConfigurableByType(String type) {
}
List<T> result = new ArrayList<T>();
for (IConfigurable configurable : list) {
result.add((T)configurable);
result.add((T) configurable);
}
return result;
}

@Override
public boolean refreshConfigurable(String namespace) {
//每次刷新,重新刷新配置文件
//if(ComponentCreator.propertiesPath!=null){
// ComponentCreator.setProperties(ComponentCreator.propertiesPath);
//}

this.namespace = namespace;
// Map<String, List<IConfigurable>> namespace2ConfigurableMap = new HashMap<>();
Map<String, List<IConfigurable>> tempType2ConfigurableMap = new HashMap<>();
Map<String, IConfigurable> tempName2ConfigurableMap = new HashMap<>();
GetConfigureResult configures = loadConfigurable(namespace);
// updateConfiguresCache(configures.getConfigure());
if (configures != null && configures.isQuerySuccess() && configures.getConfigurables() != null) {
// List<Configure> configureList = filterConfigure(configures.getConfigure());
List<IConfigurable> configurables = configures.getConfigurables();
List<IConfigurable> configurableList = checkAndUpdateConfigurables(configurables, tempType2ConfigurableMap, tempName2ConfigurableMap);
// this.namespace2ConfigurableMap = namespace2ConfigurableMap;
for (IConfigurable configurable : configurableList) {
if (configurable instanceof IAfterConfigurableRefreshListener) {
((IAfterConfigurableRefreshListener)configurable).doProcessAfterRefreshConfigurable(this);
((IAfterConfigurableRefreshListener) configurable).doProcessAfterRefreshConfigurable(this);
}
}
return true;
Expand All @@ -149,10 +140,12 @@ public boolean refreshConfigurable(String namespace) {

@Override
public <T> T queryConfigurable(String configurableType, String name) {
return (T)queryConfigurableByIdent(configurableType, name);
return (T) queryConfigurableByIdent(configurableType, name);
}

protected List<IConfigurable> checkAndUpdateConfigurables(List<IConfigurable> configurables, Map<String, List<IConfigurable>> tempType2ConfigurableMap, Map<String, IConfigurable> tempName2ConfigurableMap) {
protected List<IConfigurable> checkAndUpdateConfigurables(List<IConfigurable> configurables,
Map<String, List<IConfigurable>> tempType2ConfigurableMap,
Map<String, IConfigurable> tempName2ConfigurableMap) {
List<IConfigurable> configurableList = new ArrayList<>();
for (IConfigurable configurable : configurables) {
try {
Expand Down Expand Up @@ -185,7 +178,7 @@ private void destroyOldConfigurables(Map<String, IConfigurable> tempName2Configu

private void destroyOldConfigurable(IConfigurable oldConfigurable) {
if (AbstractConfigurable.class.isInstance(oldConfigurable)) {
((AbstractConfigurable)oldConfigurable).destroy();
((AbstractConfigurable) oldConfigurable).destroy();
}
String key = getConfigureKey(oldConfigurable.getNameSpace(), oldConfigurable.getType(),
oldConfigurable.getConfigureName());
Expand All @@ -194,17 +187,14 @@ private void destroyOldConfigurable(IConfigurable oldConfigurable) {

protected void initConfigurable(IConfigurable configurable) {
if (AbstractConfigurable.class.isInstance(configurable)) {
AbstractConfigurable abstractConfigurable = (AbstractConfigurable)configurable;
AbstractConfigurable abstractConfigurable = (AbstractConfigurable) configurable;
abstractConfigurable.setConfigurableService(this);
}

configurable.init();

}

/**
* 内部使用
*/
private ScheduledExecutorService scheduledExecutorService;

@Override
Expand Down Expand Up @@ -232,10 +222,6 @@ public void run() {
}, polingTime, polingTime, TimeUnit.SECONDS);
}
}
// @Override
// public List<IConfigurable> queryConfigurable(String nameSpace) {
// return namespace2ConfigurableMap.get(nameSpace);
// }

@Override
public List<IConfigurable> queryConfigurable(String type) {
Expand All @@ -260,7 +246,6 @@ public IConfigurable queryConfigurableByIdent(String type, String name) {

@Override
public void update(IConfigurable configurable) {
// update(configurable,name2ConfigurableMap,type2ConfigurableMap);
updateConfigurable(configurable);
}

Expand All @@ -284,7 +269,6 @@ protected boolean update(IConfigurable configurable, Map<String, IConfigurable>
IConfigurable oldConfigurable = this.name2ConfigurableMap.get(nameKey);
if (equals(configureKey, configurableList)) {
configurable = oldConfigurable;
// name2ConfigurableMap.put(nameKey, name2ConfigurableMap.get(nameKey));
} else {
destroyOldConfigurable(oldConfigurable);
initConfigurable(configurable);
Expand All @@ -297,14 +281,12 @@ protected boolean update(IConfigurable configurable, Map<String, IConfigurable>
updateConfiguresCache(configurable);
name2ConfigurableMap.put(nameKey, configurable);
String typeKey = MapKeyUtil.createKey(configurable.getType());
// put2Map(namespace2ConfigurableMap, namespace, configurable);
put2Map(type2ConfigurableMap, typeKey, configurable);
return isUpdate;
}

@Override
public void insert(IConfigurable configurable) {
// update(configurable,name2ConfigurableMap,type2ConfigurableMap);
insertConfigurable(configurable);
}

Expand Down Expand Up @@ -371,7 +353,6 @@ protected Configure createConfigure(IConfigurable configurable) {
jsonObject.put(CLASS_NAME, configurable.getClass().getName());
configure.setJsonValue(jsonObject.toJSONString());
}
// configure.createIdentification();
return configure;
}

Expand All @@ -383,7 +364,7 @@ public <T> Map<String, T> queryConfigurableMapByType(String type) {
}
Map<String, T> result = new HashMap<String, T>();
for (IConfigurable configurable : configurables) {
result.put(configurable.getConfigureName(), (T)configurable);
result.put(configurable.getConfigureName(), (T) configurable);
}
return result;
}
Expand Down Expand Up @@ -423,7 +404,7 @@ protected IConfigurable createConfigurableFromJson(String namespace, String type
configurable.setNameSpace(namespace);
configurable.setType(type);
if (AbstractConfigurable.class.isInstance(configurable)) {
AbstractConfigurable abstractConfigurable = (AbstractConfigurable)configurable;
AbstractConfigurable abstractConfigurable = (AbstractConfigurable) configurable;
abstractConfigurable.setConfigurableService(this);
}
configurable.toObject(jsonValue);
Expand All @@ -450,7 +431,7 @@ protected IConfigurable convertConfigurable(Configure configure) {
jsonString);
if (configurable instanceof Entity) {
// add by wangtl 20171110 Configurable接口第三方包也在用,故不能Configurable里加接口,只能加到抽象类里,这里强转下
Entity abs = (Entity)configurable;
Entity abs = (Entity) configurable;
abs.setId(configure.getId());
abs.setGmtCreate(configure.getGmtCreate());
abs.setGmtModified(configure.getGmtModified());
Expand Down

0 comments on commit d0d88fb

Please sign in to comment.