Skip to content

Commit

Permalink
Merge pull request #132 from ni-ze/replaceDB
Browse files Browse the repository at this point in the history
[RIP-35] Use rocketmq instead of mysql as state store
  • Loading branch information
ni-ze authored Jun 29, 2022
2 parents 8ba8e00 + e5fd612 commit b654e66
Show file tree
Hide file tree
Showing 196 changed files with 2,509 additions and 15,231 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ mvn clean -DskipTests install -U
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-clients</artifactId>
<!--Newest version-->
<version>1.0.2-preview-SNAPSHOT</version>
<version>${version}</version>
</dependency>
</dependencies>

Expand Down
44 changes: 10 additions & 34 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,28 +83,21 @@

<modules>
<module>rocketmq-streams-commons</module>
<module>rocketmq-streams-dim</module>
<module>rocketmq-streams-transport-minio</module>
<module>rocketmq-streams-script</module>
<module>rocketmq-streams-configurable</module>
<module>rocketmq-streams-serviceloader</module>
<module>rocketmq-streams-filter</module>
<module>rocketmq-streams-schedule</module>
<module>rocketmq-streams-lease</module>
<module>rocketmq-streams-db-operator</module>
<module>rocketmq-streams-dbinit</module>
<module>rocketmq-streams-window</module>
<module>rocketmq-streams-clients</module>
<module>rocketmq-streams-channel-rocketmq</module>
<module>rocketmq-streams-channel-db</module>
<module>rocketmq-streams-channel-http</module>
<module>rocketmq-streams-state</module>
<module>rocketmq-streams-examples</module>
<module>rocketmq-streams-checkpoint</module>
<module>rocketmq-streams-connectors</module>
<module>rocketmq-streams-channel-syslog</module>
<module>rocketmq-streams-channel-es</module>
<module>rocketmq-streams-runner</module>
<module>rocketmq-streams-channel-mqtt</module>
</modules>

Expand Down Expand Up @@ -133,7 +126,7 @@
<groovy.version>2.1.8</groovy.version>
<disruptor.version>3.2.0</disruptor.version>
<rocksdbjni.version>6.6.4</rocksdbjni.version>
<rocketmq.version>4.5.2</rocketmq.version>
<rocketmq.version>4.9.4</rocketmq.version>
<hyperscan.version>5.4.0-2.0.0</hyperscan.version>
<platform.version>3.5.2</platform.version>
<gson.version>2.8.9</gson.version>
Expand Down Expand Up @@ -189,6 +182,7 @@
<exclude>**/*.sql</exclude>
<exclude>**/*.properties</exclude>
<exclude>docs/**/*</exclude>
<exclude>**/*.sql</exclude>
</excludes>
</configuration>
</plugin>
Expand Down Expand Up @@ -276,11 +270,6 @@
<artifactId>rocketmq-streams-channel-mqtt</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-dbinit</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-clients</artifactId>
Expand Down Expand Up @@ -326,16 +315,7 @@
<artifactId>rocketmq-streams-filter</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-lease</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-schedule</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-script</artifactId>
Expand Down Expand Up @@ -386,11 +366,7 @@
<artifactId>rocketmq-streams-channel-rocketmq</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-connectors</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-examples</artifactId>
Expand Down Expand Up @@ -422,6 +398,12 @@
<version>${rocketmq.version}</version>
</dependency>

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>${rocketmq.version}</version>
</dependency>

<!-- ================================================= -->
<!-- tool library -->
<!-- ================================================= -->
Expand Down Expand Up @@ -508,12 +490,6 @@
<version>${spring.version}</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector.version}</version>
</dependency>

<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
Expand All @@ -56,6 +58,7 @@ public class RocketMQSink extends AbstractSupportShuffleSink {

private Long pullIntervalMs;
private String namesrvAddr;
private RPCHook rpcHook;

public RocketMQSink() {
}
Expand Down Expand Up @@ -146,7 +149,7 @@ protected void initProducer() {
synchronized (this) {
if (producer == null) {
destroy();
producer = new DefaultMQProducer(groupName + "producer", true, null);
producer = new DefaultMQProducer(null, groupName + "producer", rpcHook,false, null);
try {
//please not use the code,the name srv addr may be empty in jmenv
// if (this.namesrvAddr == null || "".equals(this.namesrvAddr)) {
Expand Down Expand Up @@ -201,6 +204,7 @@ protected void createTopicIfNotExist(int splitNum) {
defaultMQAdminExt.setVipChannelEnabled(false);
defaultMQAdminExt.setNamesrvAddr(this.getNamesrvAddr());
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
defaultMQAdminExt.setAdminExtGroup(topic.trim());
TopicConfig topicConfig = new TopicConfig();
topicConfig.setReadQueueNums(splitNum);
topicConfig.setWriteQueueNums(splitNum);
Expand Down Expand Up @@ -243,19 +247,17 @@ public List<ISplit> getSplitList() {
List<ISplit> messageQueues = new ArrayList<>();
try {

if (messageQueues == null || messageQueues.size() == 0) {
List<MessageQueue> metaqQueueSet = producer.fetchPublishMessageQueues(topic);
List<ISplit> queueList = new ArrayList<>();
for (MessageQueue queue : metaqQueueSet) {
RocketMQMessageQueue rocketMQMessageQueue = new RocketMQMessageQueue(queue);
queueList.add(rocketMQMessageQueue);
List<MessageQueue> messageQueueSet = producer.fetchPublishMessageQueues(topic);
List<ISplit> queueList = new ArrayList<>();
for (MessageQueue queue : messageQueueSet) {
RocketMQMessageQueue rocketMQMessageQueue = new RocketMQMessageQueue(queue);
queueList.add(rocketMQMessageQueue);

}
Collections.sort(queueList);
messageQueues = queueList;
}
} catch (Exception e) {
throw new RuntimeException(e);
Collections.sort(queueList);
messageQueues = queueList;
} catch (MQClientException e) {
return messageQueues;
}

return messageQueues;
Expand Down Expand Up @@ -346,4 +348,12 @@ public boolean isOrder() {
public void setOrder(boolean order) {
this.order = order;
}

public RPCHook getRpcHook() {
return rpcHook;
}

public void setRpcHook(RPCHook rpcHook) {
this.rpcHook = rpcHook;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.apache.rocketmq.streams.source;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

public class MessageListenerDelegator implements MessageQueueListener {
private final MessageQueueListener delegator;
private final Set<MessageQueue> lastDivided = new HashSet<>();
private final Set<MessageQueue> removingQueue = new HashSet<>();
private final AtomicBoolean needSync = new AtomicBoolean(false);
private final Object mutex = new Object();

public MessageListenerDelegator(MessageQueueListener delegator) {
this.delegator = delegator;
}


@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
//上一次分配有,但是这一次没有,需要对这些mq进行状态移除
for (MessageQueue last : lastDivided) {
if (!mqDivided.contains(last)) {
removingQueue.add(last);
}
}

this.lastDivided.clear();
this.lastDivided.addAll(mqDivided);

needSync.set(true);
delegator.messageQueueChanged(topic, mqAll, mqDivided);

synchronized (this.mutex) {
this.mutex.notifyAll();
}
}

public Set<MessageQueue> getLastDivided() {
return Collections.unmodifiableSet(this.lastDivided);
}

public Set<MessageQueue> getRemovingQueue() {
return Collections.unmodifiableSet(this.removingQueue);
}


public boolean needSync() {
return this.needSync.get();
}

public void hasSynchronized() {
this.needSync.set(false);
}

public Object getMutex() {
return mutex;
}
}
Loading

0 comments on commit b654e66

Please sign in to comment.