Skip to content

Commit

Permalink
Merge pull request #46 from yuanxiaodong/window
Browse files Browse the repository at this point in the history
[ISSUE #48]add bitset cache to optimize regex
  • Loading branch information
duhenglucky authored Sep 2, 2021
2 parents 1502952 + f261840 commit 2532777
Show file tree
Hide file tree
Showing 22 changed files with 160 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.rocketmq.streams.source.RocketMQSource;

@AutoService(IChannelBuilder.class)
@ServiceName(value = RocketMQChannelBuilder.TYPE, aliasName = "RocketMQSource")
@ServiceName(value = RocketMQChannelBuilder.TYPE, aliasName = "RocketMQSource",name="metaq")
public class RocketMQChannelBuilder extends AbstractSupportShuffleChannelBuilder {
public static final String TYPE = "rocketmq";

Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,37 @@
import org.apache.rocketmq.streams.common.component.AbstractComponent;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;

public class CheckpointStrategy implements Strategy {
public class WindowStrategy implements Strategy {

private final Properties properties;

private CheckpointStrategy(Long pollingTime) {
private WindowStrategy() {
properties = new Properties();
properties.put(AbstractComponent.CONNECT_TYPE, IConfigurableService.MEMORY_SERVICE_NAME);
properties.put(AbstractComponent.POLLING_TIME, pollingTime + "");
}

private CheckpointStrategy(String filePath, Long pollingTime) {
properties = new Properties();
properties.put(AbstractComponent.CONNECT_TYPE, IConfigurableService.FILE_SERVICE_NAME);
properties.put(IConfigurableService.FILE_PATH_NAME, filePath);
properties.put(AbstractComponent.POLLING_TIME, pollingTime + "");
}

private CheckpointStrategy(String url, String username, String password, Long pollingTime) {
private WindowStrategy(String url, String username, String password) {
properties = new Properties();
properties.put(AbstractComponent.JDBC_DRIVER, AbstractComponent.DEFAULT_JDBC_DRIVER);
properties.put(AbstractComponent.JDBC_URL, url);
properties.put(AbstractComponent.JDBC_USERNAME, username);
properties.put(AbstractComponent.JDBC_PASSWORD, password);
properties.put(AbstractComponent.JDBC_TABLE_NAME, AbstractComponent.DEFAULT_JDBC_TABLE_NAME);
properties.put(AbstractComponent.POLLING_TIME, pollingTime + "");
properties.put(AbstractComponent.CONNECT_TYPE, IConfigurableService.DEFAULT_SERVICE_NAME);
}

@Override
public Properties getStrategyProperties() {
return this.properties;
}

public static Strategy db(String url, String username, String password, Long pollingTime) {
return new CheckpointStrategy(url, username, password, pollingTime);
public static Strategy exactlyOnce(String url, String username, String password) {
return new WindowStrategy(url, username, password);
}

public static Strategy file(String filePath, Long pollingTime) {
return new CheckpointStrategy(filePath, pollingTime);
}
public static Strategy highPerformance() {

public static Strategy mem(Long pollingTime) {
return new CheckpointStrategy(pollingTime);
return new WindowStrategy();
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.streams.client.source.DataStreamSource;
import org.apache.rocketmq.streams.client.strategy.CheckpointStrategy;
import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
import org.apache.rocketmq.streams.client.transform.window.Time;
import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
import org.apache.rocketmq.streams.common.functions.MapFunction;
Expand Down Expand Up @@ -64,7 +64,7 @@ public void testDBCheckPoint() {
.fromRocketmq("topic_xxxx02", "consumer_xxxx02", "127.0.0.1:9876")
.map(message -> message + "--")
.toPrint(1)
.with(CheckpointStrategy.db("", "", "", 0L))
.with(WindowStrategy.exactlyOnce("", "", ""))
.start();
}

Expand All @@ -74,7 +74,7 @@ public void testFileCheckPoint() {
.fromFile("/Users/junjie.cheng/text.txt", false)
.map(message -> message + "--")
.toPrint(1)
.with(CheckpointStrategy.mem(0L))
.with(WindowStrategy.highPerformance())
.start();
}

Expand All @@ -98,7 +98,7 @@ public JSONObject map(String message) throws Exception {
.sum("score", "scoreValue")
.toDataSteam()
.toPrint(1)
.with(CheckpointStrategy.db("", "", "", 1000L))
.with(WindowStrategy.exactlyOnce("", "", ""))
.start();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.apache.rocketmq.streams.common.cache.compress;

import org.apache.rocketmq.streams.common.utils.NumberUtils;

public class BitSetCache {
protected ByteArrayValueKV cache;
protected int byteSetSize;
protected int capacity;

private class BitSet{
private byte[] bytes;

public BitSet(){
bytes=new byte[byteSetSize];
}
public BitSet(byte[] bytes){
this.bytes=bytes;
}
public void set(int index){
if(index>byteSetSize){
throw new RuntimeException("the index exceed max index, max index is "+byteSetSize+", real is "+index);
}
int byteIndex=index/8;
int bitIndex=index%8;
byte byteElement=bytes[byteIndex];
byteElement = (byte) (byteElement|(1 << bitIndex));
bytes[byteIndex]=byteElement;
}
public boolean get(int index){
if(index>byteSetSize){
throw new RuntimeException("the index exceed max index, max index is "+byteSetSize+", real is "+index);
}
int byteIndex=index/8;
int bitIndex=index%8;
byte byteElement=bytes[byteIndex];
boolean isTrue = ((byteElement & (1 << bitIndex)) != 0);
return isTrue;
}

public byte[] getBytes(){
return bytes;
}
}

public BitSet createBitSet(){
return new BitSet();
}


public BitSetCache(int bitSetSize, int capacity){
cache=new ByteArrayValueKV(capacity,true);
this.byteSetSize=bitSetSize/8+bitSetSize%8;
this.capacity=capacity;
}


public void put(String key,BitSet bitSet){
if(cache.size>cache.capacity){
synchronized (this){
if(cache.size>cache.capacity){
cache=new ByteArrayValueKV(capacity,true);
}
}
}
cache.put(key,bitSet.getBytes());

}

public static void main(String[] args) {
BitSetCache bitSetCache=new BitSetCache(150,30000);
BitSet bitSet=bitSetCache.createBitSet();
bitSet.set(13);
bitSetCache.put("fdsdf",bitSet);
BitSet bitSet1=bitSetCache.get("fdsdf");
System.out.println(bitSet1.get(13));
}

public BitSet get(String key){
byte[] bytes=cache.get(key);
if(bytes==null){
return null;
}
return new BitSet(bytes);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,16 @@
*/
public class OutputPrintChannel extends AbstractSink {

private static int counter = 1;
private transient boolean start = false;
private static long startTime = System.currentTimeMillis();
private static long begin = startTime;
private static int step = 40000;

@Override
protected boolean batchInsert(List<IMessage> messages) {
StringBuilder stringBuilder = new StringBuilder();
for (IMessage msg : messages) {
stringBuilder.append(msg.getMessageValue().toString() + PrintUtil.LINE);
System.out.println(msg.getMessageBody().toJSONString());
}
System.out.println(stringBuilder.toString());
return false;
}




}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ protected void checkAndCreateTopic() {
if (!hasCreated) {
synchronized (this) {
if (!hasCreated) {
createTopicIfNotExist(splitNum);
try {
createTopicIfNotExist(splitNum);
}catch (Exception e){
e.printStackTrace();
}

hasCreated = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public Message deepCopy() {
jsonObject.put(key, message.get(key));
}
Message message = new Message(jsonObject);
message.setSystemMessage(getSystemMessage());
message.isJsonMessage=isJsonMessage;
message.header = getHeader().copy();
return message;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public MessageHeader copy() {
header.msgRouteFromLable = msgRouteFromLable;
header.logFingerprintValue = logFingerprintValue;
header.messageQueue = messageQueue;
header.checkpointQueueIds=checkpointQueueIds;
return header;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,8 @@
String value() default "";

String aliasName() default "";



String name() default "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ protected boolean executeStage(AbstractStage stage, T t, AbstractContext context
} else if (systemMessage instanceof RemoveSplitMessage) {
stage.removeSplit(t, context, (RemoveSplitMessage)systemMessage);
} else {
if(systemMessage==null){
return true;
}
throw new RuntimeException("can not support this system message " + systemMessage.getClass().getName());
}
if (stage.isAsyncNode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,19 @@ public abstract class AbstractWindowStage<T extends IMessage> extends ChainStage

@Override
public void checkpoint(IMessage message, AbstractContext context, CheckPointMessage checkPointMessage) {
if(window.getWindowCache()==null){//over window windowcache is null
return;
}
if(message.getHeader().isNeedFlush()){
if(message.getHeader().getCheckpointQueueIds()!=null&&message.getHeader().getCheckpointQueueIds().size()>0){
if(window.getWindowCache()!=null&&message.getHeader().getCheckpointQueueIds()!=null&&message.getHeader().getCheckpointQueueIds().size()>0){
window.getWindowCache().checkpoint(message.getHeader().getCheckpointQueueIds());
}else {
Set<String> queueIds=new HashSet<>();
queueIds.add(message.getHeader().getQueueId());
window.getWindowCache().checkpoint(queueIds);
if(window.getWindowCache()!=null){
Set<String> queueIds=new HashSet<>();
queueIds.add(message.getHeader().getQueueId());
window.getWindowCache().checkpoint(queueIds);
}

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected IMessage doProcess(IMessage message, AbstractContext context) {
*/
if(openMockChannel()){
if(mockSink!=null){
mockSink.batchAdd(message);
mockSink.batchAdd(message.deepCopy());
return message;
}
return message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,12 @@ public static Object invokeGetMethod(String fieldName, Object obj) {
public static Object invoke(Object object, String methodName, Class[] classes, Object[] objects) {
try {
Class clazz = object.getClass();
Method method= clazz.getDeclaredMethod(methodName, classes);
method.setAccessible(true);
Method method=clazz.getMethod(methodName, classes);
if(method==null){
method= clazz.getDeclaredMethod(methodName, classes);
method.setAccessible(true);
}

return method.invoke(object, objects);
} catch (Exception e) {
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

@Function
public class ScriptFunction extends AbstractExpressionFunction {
ScriptComponent scriptComponent = ScriptComponent.getInstance();
public static final String SPLIT_SIGN = "######";//对参数进行分隔
public static final String QUOTATION_CONVERT = "^^^^";//单引号转换

Expand Down
Loading

0 comments on commit 2532777

Please sign in to comment.