Skip to content

Commit

Permalink
Merge pull request #207 from ni-ze/supportRsqldb
Browse files Browse the repository at this point in the history
[ISSue #208]Support rsqldb
  • Loading branch information
ni-ze authored Sep 5, 2022
2 parents cea2d6c + fa74f37 commit fb6187d
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class RocketMQSource extends AbstractSupportShuffleSource {
private transient DefaultLitePullConsumer pullConsumer;
private transient ExecutorService executorService;
private transient PullTask[] pullTasks;
private transient volatile AtomicBoolean committing = new AtomicBoolean(false);

public RocketMQSource() {
}
Expand Down Expand Up @@ -112,9 +114,9 @@ protected boolean startSource() {
this.pullConsumer.start();

return true;
} catch (MQClientException e) {
} catch (Throwable t) {
setInitSuccess(false);
throw new RuntimeException("start rocketmq channel error " + topic, e);
throw new RuntimeException("start rocketmq channel error " + topic, t);
}
}

Expand Down Expand Up @@ -242,11 +244,29 @@ public void destroyConsumer() {
this.executorService.shutdown();

//关闭消费实例
this.pullConsumer.shutdown();
try {
synchronized (committing) {
while (committing.get()) {
committing.wait();
}
}

this.pullConsumer.shutdown();
} catch (Throwable t) {
LOG.error(t);
}

}

public void commit(Set<MessageQueue> messageQueues) {
this.pullConsumer.commit(messageQueues, true);
if (this.pullConsumer.isRunning()) {
synchronized (committing) {
committing.set(true);
this.pullConsumer.commit(messageQueues, true);
committing.set(false);
committing.notifyAll();
}
}
}

@Override
Expand Down Expand Up @@ -302,57 +322,65 @@ public void run() {
}

while (!this.isStopped) {

if (this.delegator.needSync()) {
synchronized (this.pullConsumer) {
if (this.delegator.needSync()) {
afterRebalance();
try {
if (this.delegator.needSync()) {
synchronized (this.pullConsumer) {
if (this.delegator.needSync()) {
afterRebalance();
}
}
}
}


List<MessageExt> msgs = pullConsumer.poll(pullTimeout);
List<MessageExt> msgs = pullConsumer.poll(pullTimeout);

int i = 0;
for (MessageExt msg : msgs) {
JSONObject jsonObject = create(msg.getBody(), msg.getProperties());
int i = 0;
for (MessageExt msg : msgs) {
JSONObject jsonObject = create(msg.getBody(), msg.getProperties());

String topic = msg.getTopic();
int queueId = msg.getQueueId();
String brokerName = msg.getBrokerName();
MessageQueue queue = new MessageQueue(topic, brokerName, queueId);
String unionQueueId = RocketMQMessageQueue.getQueueId(queue);
String topic = msg.getTopic();
int queueId = msg.getQueueId();
String brokerName = msg.getBrokerName();
MessageQueue queue = new MessageQueue(topic, brokerName, queueId);
String unionQueueId = RocketMQMessageQueue.getQueueId(queue);


String offset = msg.getQueueOffset() + "";
org.apache.rocketmq.streams.common.context.Message message = createMessage(jsonObject, unionQueueId, offset, false);
message.getHeader().setOffsetIsLong(true);
String offset = msg.getQueueOffset() + "";
org.apache.rocketmq.streams.common.context.Message message = createMessage(jsonObject, unionQueueId, offset, false);
message.getHeader().setOffsetIsLong(true);

if (i == msgs.size() - 1) {
message.getHeader().setNeedFlush(true);
if (i == msgs.size() - 1) {
message.getHeader().setNeedFlush(true);
}
executeMessage(message);
i++;
}
executeMessage(message);
i++;
}

//拉取的批量消息处理完成以后判断是否提交位点;
synchronized (this.pullConsumer) {
if (System.currentTimeMillis() - lastCommit >= commitInternalMs || isStopped) {
lastCommit = System.currentTimeMillis();
//向broker提交消费位点,todo 从consumer那里拿不到正在消费哪些messageQueue
commit(this.delegator.getLastDivided());
//拉取的批量消息处理完成以后判断是否提交位点;
synchronized (this.pullConsumer) {
if (System.currentTimeMillis() - lastCommit >= commitInternalMs && !isStopped) {
lastCommit = System.currentTimeMillis();
//向broker提交消费位点,todo 从consumer那里拿不到正在消费哪些messageQueue
commit(this.delegator.getLastDivided());
}
}
} catch (Throwable t) {
LOG.error(t);
}
}
}

public void shutdown() {
Set<MessageQueue> lastDivided = this.delegator.getLastDivided();
if (lastDivided != null && lastDivided.size() != 0) {
commit(lastDivided);
}

this.isStopped = true;
}
}

private void newRebalance(Set<MessageQueue> allQueueInLastRebalance){
private void newRebalance(Set<MessageQueue> allQueueInLastRebalance) {
Set<String> temp = new HashSet<>();
for (MessageQueue queue : allQueueInLastRebalance) {
String unionQueueId = RocketMQMessageQueue.getQueueId(queue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
*/
package org.apache.rocketmq.streams.common.channel.impl.memory;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.source.AbstractBatchSource;
import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;

public class MemorySource extends AbstractBatchSource implements IAfterConfigurableRefreshListener {
private static final Log logger = LogFactory.getLog(MemorySource.class);

protected String cacheName;
protected transient MemoryCache memoryCache;
Expand Down Expand Up @@ -51,8 +54,9 @@ public void run() {
Thread.sleep(1000);
}

} catch (Exception e) {
throw new RuntimeException(e);
} catch (Throwable t) {
logger.error("MemorySource error:");
t.printStackTrace();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public ScanFunctionService(boolean scanDipper) {
if (scanDipper) {
scan.scanPackage("org.apache.rocketmq.streams.script.function.impl");
scan.scanPackage("org.apache.rocketmq.streams.filter.function");
scan.scanPackage("org.apache.rocketmq.streams.dim.function");
// scan.scanPackage("org.apache.rocketmq.streams.dim.function");
}
}

Expand Down

0 comments on commit fb6187d

Please sign in to comment.