Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix some error and make a code style. #39

Merged
merged 7 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
package org.apache.rocketmq.streams.db.sink;

import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner;
import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.functions.MultiTableSplitFunction;
import org.apache.rocketmq.streams.common.utils.Base64Utils;
import org.apache.rocketmq.streams.common.utils.InstantiationUtil;

public class SelfMultiTableSink extends AbstractMultiTableSink implements IAfterConfiguableRefreshListerner {
public class SelfMultiTableSink extends AbstractMultiTableSink implements IAfterConfigurableRefreshListener {
protected String multiTableSplitFunctionSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码
protected transient MultiTableSplitFunction<IMessage> multiTableSplitFunction;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.rocketmq.streams.source;

import com.alibaba.fastjson.JSONObject;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.client.AccessChannel;
Expand Down Expand Up @@ -54,6 +56,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;

public class RocketMQSource extends AbstractSupportOffsetResetSource {
Expand All @@ -72,12 +75,12 @@ public class RocketMQSource extends AbstractSupportOffsetResetSource {
protected String namesrvAddr;

protected transient ConsumeFromWhere consumeFromWhere;//默认从哪里消费,不会被持久化。不设置默认从尾部消费
protected transient String consumerOffset;//从哪里开始消费
protected transient String consumeTimestamp;//从哪里开始消费

public RocketMQSource() {}
public RocketMQSource() {
}

public RocketMQSource(String topic, String tags, String groupName, String endpoint,
String namesrvAddr, String accessKey, String secretKey, String instanceId) {
public RocketMQSource(String topic, String tags, String groupName, String namesrvAddr) {
this.topic = topic;
this.tags = tags;
this.groupName = groupName;
Expand All @@ -93,7 +96,7 @@ protected boolean initConfigurable() {
protected boolean startSource() {
try {
destroyConsumer();
consumer=startConsumer();
consumer = startConsumer();
return true;
} catch (Exception e) {
setInitSuccess(false);
Expand All @@ -108,22 +111,20 @@ protected DefaultMQPushConsumer startConsumer() {
if (pullIntervalMs != null) {
consumer.setPullInterval(pullIntervalMs);
}
// consumer.setConsumeThreadMax(maxThread);
// consumer.setConsumeThreadMin(maxThread);

consumer.setPersistConsumerOffsetInterval((int)this.checkpointTime);
consumer.setPersistConsumerOffsetInterval((int) this.checkpointTime);
consumer.setConsumeMessageBatchMaxSize(maxFetchLogGroupSize);
consumer.setAccessChannel(AccessChannel.CLOUD);
consumer.setNamesrvAddr(this.namesrvAddr);
if (consumeFromWhere != null) {
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
if (consumerOffset != null) {
consumer.setConsumeTimestamp(consumerOffset);
consumer.setConsumeFromWhere(consumeFromWhere);
if (consumeTimestamp != null) {
consumer.setConsumeTimestamp(consumeTimestamp);
}
}

consumer.subscribe(topic, tags);
consumer.registerMessageListener((MessageListenerOrderly)(msgs, context) -> {
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
try {
int i = 0;
for (MessageExt msg : msgs) {
Expand All @@ -143,7 +144,7 @@ protected DefaultMQPushConsumer startConsumer() {
i++;
}
} catch (Exception e) {
LOG.error("消费rocketmq报错:" + e, e);
LOG.error("consume message from rocketmq error " + e, e);
}

return ConsumeOrderlyStatus.SUCCESS;// 返回消费成功
Expand All @@ -159,8 +160,9 @@ protected DefaultMQPushConsumer startConsumer() {
throw new RuntimeException("start rocketmq channel error " + topic, e);
}
}

@Override
public List<ISplit> getAllSplits(){
public List<ISplit> getAllSplits() {
try {
Set<MessageQueue> rocketmqQueueSet = consumer.fetchSubscribeMessageQueues(this.topic);
List<ISplit> queueList = new ArrayList<>();
Expand All @@ -176,31 +178,32 @@ public List<ISplit> getAllSplits(){
return queueList;
} catch (MQClientException e) {
e.printStackTrace();
throw new RuntimeException("get all splits error ",e);
throw new RuntimeException("get all splits error ", e);
}
}


@Override
public Map<String,List<ISplit>> getWorkingSplitsGroupByInstances(){
public Map<String, List<ISplit>> getWorkingSplitsGroupByInstances() {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExt.setVipChannelEnabled(false);
defaultMQAdminExt.setAdminExtGroup(UUID.randomUUID().toString());
defaultMQAdminExt.setInstanceName(this.consumer.getInstanceName());
try {
defaultMQAdminExt.start();

Map<org.apache.rocketmq.common.message.MessageQueue, String> queue2Instances= getMessageQueueAllocationResult(defaultMQAdminExt,this.groupName);
Map<String,List<ISplit>> instanceOwnerQueues=new HashMap<>();
for(org.apache.rocketmq.common.message.MessageQueue messageQueue:queue2Instances.keySet()){
RocketMQMessageQueue rocketmqMessageQueue = new RocketMQMessageQueue(new MessageQueue(messageQueue.getTopic(),messageQueue.getBrokerName(),messageQueue.getQueueId()));
if(isNotDataSplit(rocketmqMessageQueue.getQueueId())){
continue;
}
String instanceName=queue2Instances.get(messageQueue);
List<ISplit> splits=instanceOwnerQueues.get(instanceName);
if(splits==null){
splits=new ArrayList<>();
instanceOwnerQueues.put(instanceName,splits);
String instanceName = queue2Instances.get(messageQueue);
List<ISplit> splits = instanceOwnerQueues.get(instanceName);
if (splits == null) {
splits = new ArrayList<>();
instanceOwnerQueues.put(instanceName, splits);
}
splits.add(rocketmqMessageQueue);
}
Expand All @@ -212,21 +215,22 @@ public Map<String,List<ISplit>> getWorkingSplitsGroupByInstances(){
defaultMQAdminExt.shutdown();
}
}

protected Map<org.apache.rocketmq.common.message.MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt, String groupName) {
HashMap results = new HashMap();

try {
ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
Iterator var5 = consumerConnection.getConnectionSet().iterator();

while(var5.hasNext()) {
Connection connection = (Connection)var5.next();
while (var5.hasNext()) {
Connection connection = (Connection) var5.next();
String clientId = connection.getClientId();
ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId, false);
Iterator var9 = consumerRunningInfo.getMqTable().keySet().iterator();

while(var9.hasNext()) {
org.apache.rocketmq.common.message.MessageQueue messageQueue = (org.apache.rocketmq.common.message.MessageQueue)var9.next();
while (var9.hasNext()) {
org.apache.rocketmq.common.message.MessageQueue messageQueue = (org.apache.rocketmq.common.message.MessageQueue) var9.next();
results.put(messageQueue, clientId.split("@")[1]);
}
}
Expand Down Expand Up @@ -309,7 +313,7 @@ public boolean supportOffsetRest() {

public void destroyConsumer() {
List<DefaultMQPushConsumer> oldConsumers = new ArrayList<>();
if(consumer!=null){
if (consumer != null) {
oldConsumers.add(consumer);
}
try {
Expand Down Expand Up @@ -367,11 +371,11 @@ public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
this.consumeFromWhere = consumeFromWhere;
}

public String getConsumerOffset() {
return consumerOffset;
public String getConsumeTimestamp() {
return consumeTimestamp;
}

public void setConsumerOffset(String consumerOffset) {
this.consumerOffset = consumerOffset;
public void setConsumeTimestamp(String consumeTimestamp) {
this.consumeTimestamp = consumeTimestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package org.apache.rocketmq.streams.client.source;

import com.google.common.collect.Sets;

import java.util.Set;

import org.apache.rocketmq.streams.client.transform.DataStream;
import org.apache.rocketmq.streams.common.channel.impl.file.FileSource;
import org.apache.rocketmq.streams.common.channel.source.ISource;
Expand All @@ -29,11 +25,9 @@

public class DataStreamSource {
protected PipelineBuilder mainPipelineBuilder;
protected Set<PipelineBuilder> otherPipelineBuilders;

public DataStreamSource(String namespace, String pipelineName) {
this.mainPipelineBuilder = new PipelineBuilder(namespace, pipelineName);
this.otherPipelineBuilders = Sets.newHashSet();
}

public static DataStreamSource create(String namespace, String pipelineName) {
Expand All @@ -48,7 +42,7 @@ public DataStream fromFile(String filePath, Boolean isJsonData) {
FileSource fileChannel = new FileSource(filePath);
fileChannel.setJsonData(isJsonData);
this.mainPipelineBuilder.setSource(fileChannel);
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null);
return new DataStream(this.mainPipelineBuilder, null);
}

public DataStream fromRocketmq(String topic, String groupName, String namesrvAddress) {
Expand All @@ -67,12 +61,12 @@ public DataStream fromRocketmq(String topic, String groupName, String tags, bool
rocketMQSource.setJsonData(isJson);
rocketMQSource.setNamesrvAddr(namesrvAddress);
this.mainPipelineBuilder.setSource(rocketMQSource);
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null);
return new DataStream(this.mainPipelineBuilder, null);
}

public DataStream from(ISource<?> source) {
this.mainPipelineBuilder.setSource(source);
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null);
return new DataStream(this.mainPipelineBuilder,null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public DataStream(String namespace, String pipelineName) {
this.otherPipelineBuilders = Sets.newHashSet();
}

public DataStream(PipelineBuilder pipelineBuilder, ChainStage<?> currentChainStage) {
this.mainPipelineBuilder = pipelineBuilder;
this.currentChainStage = currentChainStage;
}

public DataStream(PipelineBuilder pipelineBuilder, Set<PipelineBuilder> pipelineBuilders, ChainStage<?> currentChainStage) {
this.mainPipelineBuilder = pipelineBuilder;
this.otherPipelineBuilders = pipelineBuilders;
Expand All @@ -75,6 +80,7 @@ public DataStream(PipelineBuilder pipelineBuilder, Set<PipelineBuilder> pipeline
public DataStream script(String script) {
ChainStage<?> stage = this.mainPipelineBuilder.createStage(new ScriptOperator(script));
this.mainPipelineBuilder.setTopologyStages(currentChainStage, stage);

return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, stage);
}

Expand Down Expand Up @@ -358,6 +364,8 @@ protected void start(boolean isAsyn) {
if (this.mainPipelineBuilder == null) {
return;
}


ConfigurableComponent configurableComponent = ComponentCreator.getComponent(mainPipelineBuilder.getPipelineNameSpace(), ConfigurableComponent.class, ConfigureFileKey.CONNECT_TYPE + ":memory");
ChainPipeline pipeline = this.mainPipelineBuilder.build(configurableComponent.getService());
pipeline.startChannel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner;
import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.context.IMessage;

public class MemorySink extends AbstractSupportShuffleSink implements IAfterConfiguableRefreshListerner {
public class MemorySink extends AbstractSupportShuffleSink implements IAfterConfigurableRefreshListener {
/**
* 是否启动qps的统计
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package org.apache.rocketmq.streams.common.channel.impl.memory;

import org.apache.rocketmq.streams.common.channel.source.AbstractUnreliableSource;
import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner;
import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;

public class MemorySource extends AbstractUnreliableSource implements IAfterConfiguableRefreshListerner {
public class MemorySource extends AbstractUnreliableSource implements IAfterConfigurableRefreshListener {

protected String cacheName;
protected transient MemoryCache memoryCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.streams.common.configurable;

public interface IAfterConfiguableRefreshListerner {
public interface IAfterConfigurableRefreshListener {

/**
* 当configurable数据全部加载完成时,调用实现这个接口的configurable对象
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
*/
package org.apache.rocketmq.streams.common.functions;

import java.io.Serializable;

public interface FilterFunction<T> extends Function, Serializable {
public interface FilterFunction<T> extends Function {

boolean filter(T value) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.io.Serializable;
import java.util.List;

public interface FlatMapFunction <T, O> extends Function, Serializable {
public interface FlatMapFunction <T, O> extends Function{

List<T> flatMap(O message) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@
*/
package org.apache.rocketmq.streams.common.functions;

public interface Function extends java.io.Serializable {}
public interface Function {

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
*/
package org.apache.rocketmq.streams.common.functions;

import java.io.Serializable;

public interface MapFunction<T, O> extends Function, Serializable {
public interface MapFunction<T, O> extends Function {

T map(O message) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
*/
package org.apache.rocketmq.streams.common.functions;

import java.io.Serializable;

public interface ReduceFunction<R, O> extends Function, Serializable {
public interface ReduceFunction<R, O> extends Function{

R reduce(R acccumulator, O msg);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner;
import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.Context;
Expand All @@ -40,7 +40,7 @@
*
* @param <T>
*/
public abstract class AbstractMutilPipelineChainPipline<T extends IMessage> extends ChainStage<T> implements IAfterConfiguableRefreshListerner {
public abstract class AbstractMutilPipelineChainPipline<T extends IMessage> extends ChainStage<T> implements IAfterConfigurableRefreshListener {
/**
* pipeline name,这是一个汇聚节点,会有多个pipline,这里存的是pipline name
*/
Expand Down
Loading