diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java index 96922e67..dc5271ad 100644 --- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java +++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java @@ -17,14 +17,14 @@ package org.apache.rocketmq.streams.db.sink; import org.apache.rocketmq.streams.common.channel.split.ISplit; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.IMessage; import org.apache.rocketmq.streams.common.functions.MultiTableSplitFunction; import org.apache.rocketmq.streams.common.utils.Base64Utils; import org.apache.rocketmq.streams.common.utils.InstantiationUtil; -public class SelfMultiTableSink extends AbstractMultiTableSink implements IAfterConfiguableRefreshListerner { +public class SelfMultiTableSink extends AbstractMultiTableSink implements IAfterConfigurableRefreshListener { protected String multiTableSplitFunctionSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码 protected transient MultiTableSplitFunction multiTableSplitFunction; diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java index 8c8842b0..d8b9f7f4 100644 --- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java +++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java @@ -18,10 +18,12 @@ package org.apache.rocketmq.streams.source; import com.alibaba.fastjson.JSONObject; + import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.UUID; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.client.AccessChannel; @@ -54,6 +56,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; + import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; public class RocketMQSource extends AbstractSupportOffsetResetSource { @@ -72,12 +75,12 @@ public class RocketMQSource extends AbstractSupportOffsetResetSource { protected String namesrvAddr; protected transient ConsumeFromWhere consumeFromWhere;//默认从哪里消费,不会被持久化。不设置默认从尾部消费 - protected transient String consumerOffset;//从哪里开始消费 + protected transient String consumeTimestamp;//从哪里开始消费 - public RocketMQSource() {} + public RocketMQSource() { + } - public RocketMQSource(String topic, String tags, String groupName, String endpoint, - String namesrvAddr, String accessKey, String secretKey, String instanceId) { + public RocketMQSource(String topic, String tags, String groupName, String namesrvAddr) { this.topic = topic; this.tags = tags; this.groupName = groupName; @@ -93,7 +96,7 @@ protected boolean initConfigurable() { protected boolean startSource() { try { destroyConsumer(); - consumer=startConsumer(); + consumer = startConsumer(); return true; } catch (Exception e) { setInitSuccess(false); @@ -108,22 +111,20 @@ protected DefaultMQPushConsumer startConsumer() { if (pullIntervalMs != null) { consumer.setPullInterval(pullIntervalMs); } - // consumer.setConsumeThreadMax(maxThread); - // consumer.setConsumeThreadMin(maxThread); - consumer.setPersistConsumerOffsetInterval((int)this.checkpointTime); + consumer.setPersistConsumerOffsetInterval((int) this.checkpointTime); consumer.setConsumeMessageBatchMaxSize(maxFetchLogGroupSize); consumer.setAccessChannel(AccessChannel.CLOUD); consumer.setNamesrvAddr(this.namesrvAddr); if (consumeFromWhere != null) { - consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); - if (consumerOffset != null) { - consumer.setConsumeTimestamp(consumerOffset); + consumer.setConsumeFromWhere(consumeFromWhere); + if (consumeTimestamp != null) { + consumer.setConsumeTimestamp(consumeTimestamp); } } consumer.subscribe(topic, tags); - consumer.registerMessageListener((MessageListenerOrderly)(msgs, context) -> { + consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { try { int i = 0; for (MessageExt msg : msgs) { @@ -143,7 +144,7 @@ protected DefaultMQPushConsumer startConsumer() { i++; } } catch (Exception e) { - LOG.error("消费rocketmq报错:" + e, e); + LOG.error("consume message from rocketmq error " + e, e); } return ConsumeOrderlyStatus.SUCCESS;// 返回消费成功 @@ -159,8 +160,9 @@ protected DefaultMQPushConsumer startConsumer() { throw new RuntimeException("start rocketmq channel error " + topic, e); } } + @Override - public List getAllSplits(){ + public List getAllSplits() { try { Set rocketmqQueueSet = consumer.fetchSubscribeMessageQueues(this.topic); List queueList = new ArrayList<>(); @@ -176,19 +178,20 @@ public List getAllSplits(){ return queueList; } catch (MQClientException e) { e.printStackTrace(); - throw new RuntimeException("get all splits error ",e); + throw new RuntimeException("get all splits error ", e); } } @Override - public Map> getWorkingSplitsGroupByInstances(){ + public Map> getWorkingSplitsGroupByInstances() { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(); defaultMQAdminExt.setVipChannelEnabled(false); defaultMQAdminExt.setAdminExtGroup(UUID.randomUUID().toString()); defaultMQAdminExt.setInstanceName(this.consumer.getInstanceName()); try { defaultMQAdminExt.start(); + Map queue2Instances= getMessageQueueAllocationResult(defaultMQAdminExt,this.groupName); Map> instanceOwnerQueues=new HashMap<>(); for(org.apache.rocketmq.common.message.MessageQueue messageQueue:queue2Instances.keySet()){ @@ -196,11 +199,11 @@ public Map> getWorkingSplitsGroupByInstances(){ if(isNotDataSplit(rocketmqMessageQueue.getQueueId())){ continue; } - String instanceName=queue2Instances.get(messageQueue); - List splits=instanceOwnerQueues.get(instanceName); - if(splits==null){ - splits=new ArrayList<>(); - instanceOwnerQueues.put(instanceName,splits); + String instanceName = queue2Instances.get(messageQueue); + List splits = instanceOwnerQueues.get(instanceName); + if (splits == null) { + splits = new ArrayList<>(); + instanceOwnerQueues.put(instanceName, splits); } splits.add(rocketmqMessageQueue); } @@ -212,6 +215,7 @@ public Map> getWorkingSplitsGroupByInstances(){ defaultMQAdminExt.shutdown(); } } + protected Map getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt, String groupName) { HashMap results = new HashMap(); @@ -219,14 +223,14 @@ protected Map getMessag ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName); Iterator var5 = consumerConnection.getConnectionSet().iterator(); - while(var5.hasNext()) { - Connection connection = (Connection)var5.next(); + while (var5.hasNext()) { + Connection connection = (Connection) var5.next(); String clientId = connection.getClientId(); ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId, false); Iterator var9 = consumerRunningInfo.getMqTable().keySet().iterator(); - while(var9.hasNext()) { - org.apache.rocketmq.common.message.MessageQueue messageQueue = (org.apache.rocketmq.common.message.MessageQueue)var9.next(); + while (var9.hasNext()) { + org.apache.rocketmq.common.message.MessageQueue messageQueue = (org.apache.rocketmq.common.message.MessageQueue) var9.next(); results.put(messageQueue, clientId.split("@")[1]); } } @@ -309,7 +313,7 @@ public boolean supportOffsetRest() { public void destroyConsumer() { List oldConsumers = new ArrayList<>(); - if(consumer!=null){ + if (consumer != null) { oldConsumers.add(consumer); } try { @@ -367,11 +371,11 @@ public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { this.consumeFromWhere = consumeFromWhere; } - public String getConsumerOffset() { - return consumerOffset; + public String getConsumeTimestamp() { + return consumeTimestamp; } - public void setConsumerOffset(String consumerOffset) { - this.consumerOffset = consumerOffset; + public void setConsumeTimestamp(String consumeTimestamp) { + this.consumeTimestamp = consumeTimestamp; } } \ No newline at end of file diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java index a40fff96..7797832a 100644 --- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java +++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java @@ -17,10 +17,6 @@ package org.apache.rocketmq.streams.client.source; -import com.google.common.collect.Sets; - -import java.util.Set; - import org.apache.rocketmq.streams.client.transform.DataStream; import org.apache.rocketmq.streams.common.channel.impl.file.FileSource; import org.apache.rocketmq.streams.common.channel.source.ISource; @@ -29,11 +25,9 @@ public class DataStreamSource { protected PipelineBuilder mainPipelineBuilder; - protected Set otherPipelineBuilders; public DataStreamSource(String namespace, String pipelineName) { this.mainPipelineBuilder = new PipelineBuilder(namespace, pipelineName); - this.otherPipelineBuilders = Sets.newHashSet(); } public static DataStreamSource create(String namespace, String pipelineName) { @@ -48,7 +42,7 @@ public DataStream fromFile(String filePath, Boolean isJsonData) { FileSource fileChannel = new FileSource(filePath); fileChannel.setJsonData(isJsonData); this.mainPipelineBuilder.setSource(fileChannel); - return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null); + return new DataStream(this.mainPipelineBuilder, null); } public DataStream fromRocketmq(String topic, String groupName, String namesrvAddress) { @@ -67,12 +61,12 @@ public DataStream fromRocketmq(String topic, String groupName, String tags, bool rocketMQSource.setJsonData(isJson); rocketMQSource.setNamesrvAddr(namesrvAddress); this.mainPipelineBuilder.setSource(rocketMQSource); - return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null); + return new DataStream(this.mainPipelineBuilder, null); } public DataStream from(ISource source) { this.mainPipelineBuilder.setSource(source); - return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null); + return new DataStream(this.mainPipelineBuilder,null); } } diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java index 4796e1f3..3dd9afd9 100644 --- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java +++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java @@ -66,6 +66,11 @@ public DataStream(String namespace, String pipelineName) { this.otherPipelineBuilders = Sets.newHashSet(); } + public DataStream(PipelineBuilder pipelineBuilder, ChainStage currentChainStage) { + this.mainPipelineBuilder = pipelineBuilder; + this.currentChainStage = currentChainStage; + } + public DataStream(PipelineBuilder pipelineBuilder, Set pipelineBuilders, ChainStage currentChainStage) { this.mainPipelineBuilder = pipelineBuilder; this.otherPipelineBuilders = pipelineBuilders; @@ -75,6 +80,7 @@ public DataStream(PipelineBuilder pipelineBuilder, Set pipeline public DataStream script(String script) { ChainStage stage = this.mainPipelineBuilder.createStage(new ScriptOperator(script)); this.mainPipelineBuilder.setTopologyStages(currentChainStage, stage); + return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, stage); } @@ -358,6 +364,8 @@ protected void start(boolean isAsyn) { if (this.mainPipelineBuilder == null) { return; } + + ConfigurableComponent configurableComponent = ComponentCreator.getComponent(mainPipelineBuilder.getPipelineNameSpace(), ConfigurableComponent.class, ConfigureFileKey.CONNECT_TYPE + ":memory"); ChainPipeline pipeline = this.mainPipelineBuilder.build(configurableComponent.getService()); pipeline.startChannel(); diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySink.java index 01744471..3e54b6d3 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySink.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySink.java @@ -21,11 +21,11 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink; import org.apache.rocketmq.streams.common.channel.split.ISplit; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.IMessage; -public class MemorySink extends AbstractSupportShuffleSink implements IAfterConfiguableRefreshListerner { +public class MemorySink extends AbstractSupportShuffleSink implements IAfterConfigurableRefreshListener { /** * 是否启动qps的统计 */ diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySource.java index 806c36d4..7a4ec688 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySource.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySource.java @@ -17,10 +17,10 @@ package org.apache.rocketmq.streams.common.channel.impl.memory; import org.apache.rocketmq.streams.common.channel.source.AbstractUnreliableSource; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; -public class MemorySource extends AbstractUnreliableSource implements IAfterConfiguableRefreshListerner { +public class MemorySource extends AbstractUnreliableSource implements IAfterConfigurableRefreshListener { protected String cacheName; protected transient MemoryCache memoryCache; diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configurable/IAfterConfiguableRefreshListerner.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configurable/IAfterConfigurableRefreshListener.java similarity index 95% rename from rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configurable/IAfterConfiguableRefreshListerner.java rename to rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configurable/IAfterConfigurableRefreshListener.java index 7d78dfff..d9f06c1d 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configurable/IAfterConfiguableRefreshListerner.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configurable/IAfterConfigurableRefreshListener.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.streams.common.configurable; -public interface IAfterConfiguableRefreshListerner { +public interface IAfterConfigurableRefreshListener { /** * 当configurable数据全部加载完成时,调用实现这个接口的configurable对象 diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FilterFunction.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FilterFunction.java index 9c24c2eb..350b6fdf 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FilterFunction.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FilterFunction.java @@ -16,9 +16,7 @@ */ package org.apache.rocketmq.streams.common.functions; -import java.io.Serializable; - -public interface FilterFunction extends Function, Serializable { +public interface FilterFunction extends Function { boolean filter(T value) throws Exception; } diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java index 99ef2227..5319f410 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java @@ -19,7 +19,7 @@ import java.io.Serializable; import java.util.List; -public interface FlatMapFunction extends Function, Serializable { +public interface FlatMapFunction extends Function{ List flatMap(O message) throws Exception; } diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/Function.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/Function.java index f6e283c3..b2bad7ba 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/Function.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/Function.java @@ -16,4 +16,6 @@ */ package org.apache.rocketmq.streams.common.functions; -public interface Function extends java.io.Serializable {} +public interface Function { + +} diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/MapFunction.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/MapFunction.java index f3fbec31..baef0ec4 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/MapFunction.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/MapFunction.java @@ -16,9 +16,7 @@ */ package org.apache.rocketmq.streams.common.functions; -import java.io.Serializable; - -public interface MapFunction extends Function, Serializable { +public interface MapFunction extends Function { T map(O message) throws Exception; } diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/ReduceFunction.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/ReduceFunction.java index 6e819acf..d6c1c686 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/ReduceFunction.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/ReduceFunction.java @@ -16,9 +16,7 @@ */ package org.apache.rocketmq.streams.common.functions; -import java.io.Serializable; - -public interface ReduceFunction extends Function, Serializable { +public interface ReduceFunction extends Function{ R reduce(R acccumulator, O msg); } diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/AbstractMutilPipelineChainPipline.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/AbstractMutilPipelineChainPipline.java index fabfbf9c..b8f1715a 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/AbstractMutilPipelineChainPipline.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/AbstractMutilPipelineChainPipline.java @@ -25,7 +25,7 @@ import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage; import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage; import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.AbstractContext; import org.apache.rocketmq.streams.common.context.Context; @@ -40,7 +40,7 @@ * * @param */ -public abstract class AbstractMutilPipelineChainPipline extends ChainStage implements IAfterConfiguableRefreshListerner { +public abstract class AbstractMutilPipelineChainPipline extends ChainStage implements IAfterConfigurableRefreshListener { /** * pipeline name,这是一个汇聚节点,会有多个pipline,这里存的是pipline name */ diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java index cf9b6836..13f13124 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java @@ -25,7 +25,7 @@ import org.apache.rocketmq.streams.common.channel.IChannel; import org.apache.rocketmq.streams.common.channel.source.ISource; import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.AbstractContext; import org.apache.rocketmq.streams.common.context.IMessage; @@ -44,7 +44,7 @@ /** * 数据流拓扑结构,包含了source 算子,sink */ -public class ChainPipeline extends Pipeline implements IAfterConfiguableRefreshListerner, Serializable { +public class ChainPipeline extends Pipeline implements IAfterConfigurableRefreshListener, Serializable { private static final long serialVersionUID = -5189371682717444347L; @@ -324,7 +324,7 @@ public void setSource(ISource source) { public void doProcessAfterRefreshConfigurable(IConfigurableService configurableService) { for (AbstractStage stage : getStages()) { stage.setPipeline(this); - if (IAfterConfiguableRefreshListerner.class.isInstance(stage)) { + if (IAfterConfigurableRefreshListener.class.isInstance(stage)) { if (AbstractConfigurable.class.isInstance(stage)) { AbstractConfigurable abstractConfigurable = (AbstractConfigurable)stage; if (abstractConfigurable.isInitSuccess() == false && this.isInitSuccess() == false) { @@ -332,8 +332,8 @@ public void doProcessAfterRefreshConfigurable(IConfigurableService configurableS return; } } - IAfterConfiguableRefreshListerner afterConfiguableRefreshListerner = - (IAfterConfiguableRefreshListerner)stage; + IAfterConfigurableRefreshListener afterConfiguableRefreshListerner = + (IAfterConfigurableRefreshListener)stage; afterConfiguableRefreshListerner.doProcessAfterRefreshConfigurable(configurableService); diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java index f27f328b..f108a5f6 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java @@ -16,12 +16,6 @@ */ package org.apache.rocketmq.streams.common.topology.builder; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.rocketmq.streams.common.channel.sink.ISink; import org.apache.rocketmq.streams.common.channel.source.ISource; import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable; @@ -35,7 +29,13 @@ import org.apache.rocketmq.streams.common.utils.NameCreatorUtil; import org.apache.rocketmq.streams.common.utils.StringUtil; -public class PipelineBuilder implements Serializable { +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class PipelineBuilder { /** * 最终产出的pipeline @@ -81,9 +81,6 @@ public PipelineBuilder(String namespace, String pipelineName) { * @param source 数据源 */ public void setSource(ISource source) { - //source.addConfigurables(this); - //pipeline.setSource((ISource)source.createStageChain(this)); - this.addConfigurables(source); this.pipeline.setSource(source); } diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java index d7f08ba0..49b64cac 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java @@ -17,27 +17,21 @@ package org.apache.rocketmq.streams.common.topology.stages; import java.util.HashSet; -import java.util.List; -import java.util.Map; import java.util.Set; -import org.apache.rocketmq.streams.common.channel.source.AbstractSource; import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage; import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage; -import org.apache.rocketmq.streams.common.channel.split.ISplit; import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage; import org.apache.rocketmq.streams.common.checkpoint.CheckPointState; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.AbstractContext; import org.apache.rocketmq.streams.common.context.IMessage; import org.apache.rocketmq.streams.common.topology.ChainStage; import org.apache.rocketmq.streams.common.topology.model.IWindow; -import org.apache.rocketmq.streams.common.topology.model.Pipeline; -import org.apache.rocketmq.streams.common.utils.StringUtil; public abstract class AbstractWindowStage extends ChainStage implements - IAfterConfiguableRefreshListerner { + IAfterConfigurableRefreshListener { protected String windowName; protected transient IWindow window; diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/FilterChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/FilterChainStage.java index 57dea236..ff5e6da7 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/FilterChainStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/FilterChainStage.java @@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.streams.common.component.ComponentCreator; import org.apache.rocketmq.streams.common.component.IComponent; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.AbstractContext; import org.apache.rocketmq.streams.common.context.IMessage; @@ -45,7 +45,7 @@ import org.apache.rocketmq.streams.common.utils.StringUtil; import org.apache.rocketmq.streams.common.utils.TraceUtil; -public class FilterChainStage extends AbstractStatelessChainStage implements IAfterConfiguableRefreshListerner { +public class FilterChainStage extends AbstractStatelessChainStage implements IAfterConfigurableRefreshListener { protected transient AtomicInteger count = new AtomicInteger(0); protected transient Map map = new HashMap<>(); private List names; diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/NewSQLChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/NewSQLChainStage.java index a649e12e..213fd9aa 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/NewSQLChainStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/NewSQLChainStage.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.streams.common.topology.stages; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurable; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.AbstractContext; @@ -24,7 +24,7 @@ import org.apache.rocketmq.streams.common.interfaces.IStreamOperator; import org.apache.rocketmq.streams.common.topology.model.IStageHandle; -public class NewSQLChainStage extends AbstractStatelessChainStage implements IAfterConfiguableRefreshListerner { +public class NewSQLChainStage extends AbstractStatelessChainStage implements IAfterConfigurableRefreshListener { protected String sqlMessageProcessorName; protected transient IStreamOperator messageProcessor; diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OpenAPIChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OpenAPIChainStage.java index a136c417..f3561a7a 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OpenAPIChainStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OpenAPIChainStage.java @@ -17,14 +17,14 @@ package org.apache.rocketmq.streams.common.topology.stages; import org.apache.rocketmq.streams.common.channel.IChannel; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.AbstractContext; import org.apache.rocketmq.streams.common.context.IMessage; import org.apache.rocketmq.streams.common.interfaces.IStreamOperator; import org.apache.rocketmq.streams.common.topology.model.IStageHandle; -public class OpenAPIChainStage extends AbstractStatelessChainStage implements IAfterConfiguableRefreshListerner { +public class OpenAPIChainStage extends AbstractStatelessChainStage implements IAfterConfigurableRefreshListener { protected String openApiChannelName; protected transient IChannel channel; protected transient IStageHandle handle = new IStageHandle() { diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java index f6230427..9ffba1a3 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java @@ -25,7 +25,7 @@ import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage; import org.apache.rocketmq.streams.common.checkpoint.CheckPointState; import org.apache.rocketmq.streams.common.component.ComponentCreator; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence; import org.apache.rocketmq.streams.common.context.AbstractContext; @@ -35,7 +35,7 @@ import org.apache.rocketmq.streams.common.topology.model.IStageHandle; import org.apache.rocketmq.streams.common.utils.StringUtil; -public class OutputChainStage extends ChainStage implements IAfterConfiguableRefreshListerner { +public class OutputChainStage extends ChainStage implements IAfterConfigurableRefreshListener { public static final String OUT_MOCK_SWITCH = "out.mock.switch";//在配置文件中,是否打开mock的开关 private String sinkName; diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/PythonChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/PythonChainStage.java index 55af67de..ba7b44fb 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/PythonChainStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/PythonChainStage.java @@ -17,14 +17,14 @@ package org.apache.rocketmq.streams.common.topology.stages; import org.apache.rocketmq.streams.common.channel.IChannel; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.AbstractContext; import org.apache.rocketmq.streams.common.context.IMessage; import org.apache.rocketmq.streams.common.interfaces.IStreamOperator; import org.apache.rocketmq.streams.common.topology.model.IStageHandle; -public class PythonChainStage extends AbstractStatelessChainStage implements IAfterConfiguableRefreshListerner { +public class PythonChainStage extends AbstractStatelessChainStage implements IAfterConfigurableRefreshListener { protected String pythonChannelName; protected transient IChannel channel; protected transient IStageHandle handle = new IStageHandle() { diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/RightJoinChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/RightJoinChainStage.java index dabbe156..e587700b 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/RightJoinChainStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/RightJoinChainStage.java @@ -19,7 +19,7 @@ import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage; import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage; import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.AbstractContext; import org.apache.rocketmq.streams.common.context.IMessage; @@ -31,7 +31,7 @@ /** * 双流join的右流的处理逻辑 */ -public class RightJoinChainStage extends ChainStage implements IAfterConfiguableRefreshListerner { +public class RightJoinChainStage extends ChainStage implements IAfterConfigurableRefreshListener { protected String pipelineName; protected transient Pipeline pipline; diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/SQLChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/SQLChainStage.java index 9f35e5a2..e126173c 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/SQLChainStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/SQLChainStage.java @@ -17,7 +17,7 @@ package org.apache.rocketmq.streams.common.topology.stages; import org.apache.rocketmq.streams.common.channel.IChannel; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.AbstractContext; import org.apache.rocketmq.streams.common.context.IMessage; @@ -25,7 +25,7 @@ import org.apache.rocketmq.streams.common.topology.model.IStageHandle; @Deprecated -public class SQLChainStage extends AbstractStatelessChainStage implements IAfterConfiguableRefreshListerner { +public class SQLChainStage extends AbstractStatelessChainStage implements IAfterConfigurableRefreshListener { protected String dbChannelName; protected transient IChannel channel; protected transient IStageHandle handle = new IStageHandle() { diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/ScriptChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/ScriptChainStage.java index 85fcbe40..e0e96335 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/ScriptChainStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/ScriptChainStage.java @@ -20,7 +20,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.streams.common.component.ComponentCreator; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.AbstractContext; import org.apache.rocketmq.streams.common.context.IMessage; @@ -30,7 +30,7 @@ import org.apache.rocketmq.streams.common.utils.StringUtil; import org.apache.rocketmq.streams.common.utils.TraceUtil; -public class ScriptChainStage extends AbstractStatelessChainStage implements IAfterConfiguableRefreshListerner { +public class ScriptChainStage extends AbstractStatelessChainStage implements IAfterConfigurableRefreshListener { private static final Log LOG = LogFactory.getLog(ScriptChainStage.class); protected String scriptName; diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/SubPiplineChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/SubPiplineChainStage.java index 0298a373..5ab09c3f 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/SubPiplineChainStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/SubPiplineChainStage.java @@ -27,7 +27,7 @@ import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage; import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage; import org.apache.rocketmq.streams.common.component.ComponentCreator; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence; import org.apache.rocketmq.streams.common.context.AbstractContext; @@ -44,7 +44,7 @@ import org.apache.rocketmq.streams.common.utils.StringUtil; public class -SubPiplineChainStage extends ChainStage implements IAfterConfiguableRefreshListerner { +SubPiplineChainStage extends ChainStage implements IAfterConfigurableRefreshListener { private static String INNER_MESSAGE = "inner_message";//保存原始信息,用于收集触发规则的原始日志时,使用 private static String FIRE_RUEL = "fireRule";//触发的规则 private static String FIRE_RULE_OUT_FILE_NAME = "dipper.msg";//触发的规则和原始数据,输出的文件名 diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/WindowChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/WindowChainStage.java index 38515654..fff987f4 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/WindowChainStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/WindowChainStage.java @@ -16,14 +16,14 @@ */ package org.apache.rocketmq.streams.common.topology.stages; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurable; import org.apache.rocketmq.streams.common.context.AbstractContext; import org.apache.rocketmq.streams.common.context.IMessage; import org.apache.rocketmq.streams.common.topology.model.IStageHandle; import org.apache.rocketmq.streams.common.topology.model.IWindow; -public class WindowChainStage extends AbstractWindowStage implements IAfterConfiguableRefreshListerner { +public class WindowChainStage extends AbstractWindowStage implements IAfterConfigurableRefreshListener { private static final long serialVersionUID = -6592591896560866562L; diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java index b09ae115..dc2c58e7 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java @@ -16,8 +16,7 @@ */ package org.apache.rocketmq.streams.common.topology.stages.udf; -import java.io.Serializable; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.AbstractContext; import org.apache.rocketmq.streams.common.context.IMessage; @@ -30,12 +29,10 @@ /** * 给用户提供自定义的抽象类 */ -public abstract class StageBuilder extends AbstractStatelessChainStage - implements IStageBuilder, Serializable, IAfterConfiguableRefreshListerner { +public abstract class StageBuilder extends AbstractStatelessChainStage implements IStageBuilder, IAfterConfigurableRefreshListener { @Override protected boolean initConfigurable() { - return true; } diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java index b5cd886c..f6653ba4 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java @@ -17,7 +17,7 @@ package org.apache.rocketmq.streams.common.topology.stages.udf; import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.AbstractContext; import org.apache.rocketmq.streams.common.context.IMessage; @@ -29,7 +29,7 @@ /** * 所有给用户自定义代码的通用类,会转化成这个stage */ -public class UDFChainStage extends AbstractStatelessChainStage implements IAfterConfiguableRefreshListerner { +public class UDFChainStage extends AbstractStatelessChainStage implements IAfterConfigurableRefreshListener { protected String udfOperatorClassSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码 protected transient StageBuilder selfChainStage; diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFUnionChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFUnionChainStage.java index 32b969ce..66483cfa 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFUnionChainStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFUnionChainStage.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.streams.common.topology.stages.udf; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.AbstractContext; import org.apache.rocketmq.streams.common.context.IMessage; @@ -25,7 +25,7 @@ import org.apache.rocketmq.streams.common.topology.model.Union; import org.apache.rocketmq.streams.common.topology.stages.AbstractStatelessChainStage; -public class UDFUnionChainStage extends AbstractStatelessChainStage implements IAfterConfiguableRefreshListerner { +public class UDFUnionChainStage extends AbstractStatelessChainStage implements IAfterConfigurableRefreshListener { protected String unionName;//union对象的名字 protected boolean isMainStream = false;//是主流,在union时,主流.union(支流) diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/AbstractConfigurableService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/AbstractConfigurableService.java index 6c05e1b8..8c68870a 100644 --- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/AbstractConfigurableService.java +++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/AbstractConfigurableService.java @@ -32,7 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.streams.common.component.AbstractComponent; import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurable; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.datatype.DataType; @@ -139,8 +139,8 @@ public boolean refreshConfigurable(String namespace) { configures.getConfigurables()); // this.namespace2ConfigurableMap = namespace2ConfigurableMap; for (IConfigurable configurable : configurableList) { - if (configurable instanceof IAfterConfiguableRefreshListerner) { - ((IAfterConfiguableRefreshListerner)configurable).doProcessAfterRefreshConfigurable(this); + if (configurable instanceof IAfterConfigurableRefreshListener) { + ((IAfterConfigurableRefreshListener)configurable).doProcessAfterRefreshConfigurable(this); } } return true; diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java index f9659cd6..896e6213 100644 --- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java +++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java @@ -31,7 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.streams.common.component.AbstractComponent; import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurable; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.datatype.DataType; @@ -138,8 +138,8 @@ public boolean refreshConfigurable(String namespace) { configures.getConfigurables()); // this.namespace2ConfigurableMap = namespace2ConfigurableMap; for (IConfigurable configurable : configurableList) { - if (configurable instanceof IAfterConfiguableRefreshListerner) { - ((IAfterConfiguableRefreshListerner)configurable).doProcessAfterRefreshConfigurable(this); + if (configurable instanceof IAfterConfigurableRefreshListener) { + ((IAfterConfigurableRefreshListener)configurable).doProcessAfterRefreshConfigurable(this); } } return true; diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AbstractIntelligenceCache.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AbstractIntelligenceCache.java index 61fab28f..b01fe67f 100644 --- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AbstractIntelligenceCache.java +++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AbstractIntelligenceCache.java @@ -36,7 +36,7 @@ import org.apache.rocketmq.streams.common.channel.sink.ISink; import org.apache.rocketmq.streams.common.component.ComponentCreator; import org.apache.rocketmq.streams.common.configurable.BasedConfigurable; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence; import org.apache.rocketmq.streams.common.configure.ConfigureFileKey; @@ -47,7 +47,7 @@ import org.apache.rocketmq.streams.http.source.util.HttpUtil; public abstract class AbstractIntelligenceCache extends BasedConfigurable implements - IAfterConfiguableRefreshListerner { + IAfterConfigurableRefreshListener { private static final Log LOG = LogFactory.getLog(AbstractIntelligenceCache.class); diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AccountIntelligenceCache.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AccountIntelligenceCache.java index 68181f01..1cbda460 100644 --- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AccountIntelligenceCache.java +++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AccountIntelligenceCache.java @@ -22,12 +22,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.streams.common.cache.compress.impl.IntValueKV; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; /** * table: ads_yunsec_abnormal_account */ -public class AccountIntelligenceCache extends AbstractIntelligenceCache implements IAfterConfiguableRefreshListerner { +public class AccountIntelligenceCache extends AbstractIntelligenceCache implements IAfterConfigurableRefreshListener { private static final Log LOG = LogFactory.getLog(AccountIntelligenceCache.class); diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/DomainIntelligenceCache.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/DomainIntelligenceCache.java index bc2c6d3b..1926a676 100644 --- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/DomainIntelligenceCache.java +++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/DomainIntelligenceCache.java @@ -23,9 +23,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.streams.common.cache.compress.impl.IntValueKV; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; -public class DomainIntelligenceCache extends AbstractIntelligenceCache implements IAfterConfiguableRefreshListerner { +public class DomainIntelligenceCache extends AbstractIntelligenceCache implements IAfterConfigurableRefreshListener { private static final Log LOG = LogFactory.getLog(DomainIntelligenceCache.class); protected transient String keyName = "domain"; diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/IPIntelligenceCache.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/IPIntelligenceCache.java index ac7d815e..2f0962a8 100644 --- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/IPIntelligenceCache.java +++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/IPIntelligenceCache.java @@ -24,11 +24,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.streams.common.cache.compress.impl.IntValueKV; import org.apache.rocketmq.streams.common.component.ComponentCreator; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.dboperator.IDBDriver; import org.apache.rocketmq.streams.db.driver.DriverBuilder; -public class IPIntelligenceCache extends AbstractIntelligenceCache implements IAfterConfiguableRefreshListerner { +public class IPIntelligenceCache extends AbstractIntelligenceCache implements IAfterConfigurableRefreshListener { private static final Log LOG = LogFactory.getLog(IPIntelligenceCache.class); protected transient String keyName = "ip"; diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/URLIntelligenceCache.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/URLIntelligenceCache.java index af1b202f..b1263dae 100644 --- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/URLIntelligenceCache.java +++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/URLIntelligenceCache.java @@ -23,9 +23,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.streams.common.cache.compress.impl.IntValueKV; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; -public class URLIntelligenceCache extends AbstractIntelligenceCache implements IAfterConfiguableRefreshListerner { +public class URLIntelligenceCache extends AbstractIntelligenceCache implements IAfterConfigurableRefreshListener { private static final Log LOG = LogFactory.getLog(URLIntelligenceCache.class); diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java index 93f13c3c..7d9e9e93 100644 --- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java +++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java @@ -33,11 +33,7 @@ public static void main(String[] args) { RMQ_TOPIC, TAGS, RMQ_CONSUMER_GROUP_NAME, - "", - NAMESRV_ADDRESS, - "", - "", - "" + NAMESRV_ADDRESS )) .map(message -> message) .toPrint(1) diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/Rule.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/Rule.java index f71ac759..73938d1d 100644 --- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/Rule.java +++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/Rule.java @@ -27,7 +27,7 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.rocketmq.streams.common.channel.sink.ISink; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurable; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.AbstractContext; @@ -53,7 +53,7 @@ import org.apache.rocketmq.streams.filter.operator.var.Var; import org.apache.rocketmq.streams.filter.optimization.ExpressionOptimization; -public class Rule extends AbstractRule implements IAfterConfiguableRefreshListerner, +public class Rule extends AbstractRule implements IAfterConfigurableRefreshListener, IStageBuilder { private transient volatile Map varMap = new HashMap<>(); private transient volatile Map expressionMap = new HashMap<>(); diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/action/impl/ChannelAction.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/action/impl/ChannelAction.java index 99410dad..2868fbe8 100644 --- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/action/impl/ChannelAction.java +++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/action/impl/ChannelAction.java @@ -19,13 +19,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.streams.common.channel.IChannel; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.filter.context.RuleContext; import org.apache.rocketmq.streams.filter.operator.Rule; import org.apache.rocketmq.streams.filter.operator.action.Action; -public class ChannelAction extends Action implements IAfterConfiguableRefreshListerner { +public class ChannelAction extends Action implements IAfterConfigurableRefreshListener { private static final Log LOG = LogFactory.getLog(ChannelAction.class); protected String channelName; diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/FunctionScript.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/FunctionScript.java index 715c2d1f..c7b761f9 100644 --- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/FunctionScript.java +++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/FunctionScript.java @@ -45,9 +45,7 @@ /** * * 对外提供的脚本算子,通过输入脚本,来实现业务逻辑 * 脚本存储的成员变量是value字段 */ -public class FunctionScript extends AbstractScript, FunctionContext> implements - IStreamOperator>, - IStageBuilder { +public class FunctionScript extends AbstractScript, FunctionContext> implements IStreamOperator>, IStageBuilder { private static final Log LOG = LogFactory.getLog(FunctionScript.class); diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java index 5d376cb5..d05e5481 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java @@ -21,7 +21,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.context.IMessage; import org.apache.rocketmq.streams.window.model.WindowInstance; @@ -29,7 +29,7 @@ import org.apache.rocketmq.streams.window.source.WindowRireSource; import org.apache.rocketmq.streams.window.storage.WindowStorage; -public abstract class AbstractShuffleWindow extends AbstractWindow implements IAfterConfiguableRefreshListerner { +public abstract class AbstractShuffleWindow extends AbstractWindow implements IAfterConfigurableRefreshListener { protected transient ShuffleChannel shuffleChannel; protected transient AtomicBoolean hasCreated = new AtomicBoolean(false);