From f08c39d614b0b4d09b42c8584c62a01a926f62e1 Mon Sep 17 00:00:00 2001 From: write2me Date: Wed, 29 Sep 2021 03:20:44 +0800 Subject: [PATCH] #74 add count(distinct) implementation based on rocksdb --- .../client/transform/WindowStream.java | 8 + .../rocketmq/streams/client/WindowTest.java | 11 +- .../common/channel/sink/AbstractSink.java | 4 + rocketmq-streams-script/pom.xml | 5 + .../aggregation/CountAccumulator.java | 26 +- .../aggregation/CountDistinctAccumulator.java | 19 +- .../aggregation/DistinctAccumulator.java | 5 + .../aggregation/DistinctAccumulator2.java | 102 +++++++ .../operator/impl/AggregationScript.java | 2 + rocketmq-streams-state/pom.xml | 10 +- .../rocketmq/streams/state/IStreamState.java | 103 ++++++++ .../state/kv/rocksdb/RocksDBOperator.java | 78 ++++++ .../state/kv/rocksdb/RocksdbState.java | 249 ++++++++++++++++++ .../streams/state/kv/TestRocksdbState.java | 91 +++++++ .../streams/window/shuffle/ShuffleCache.java | 29 +- .../storage/rocksdb/RocksdbStorage.java | 38 +-- .../rocketmq/streams/storage/RocksdbTest.java | 27 ++ 17 files changed, 748 insertions(+), 59 deletions(-) create mode 100644 rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/DistinctAccumulator2.java create mode 100644 rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/IStreamState.java create mode 100644 rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/kv/rocksdb/RocksDBOperator.java create mode 100644 rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/kv/rocksdb/RocksdbState.java create mode 100644 rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestRocksdbState.java diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/WindowStream.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/WindowStream.java index 64ec9bb5..2df207a5 100644 --- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/WindowStream.java +++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/WindowStream.java @@ -86,6 +86,14 @@ public WindowStream count_distinct(String fieldName, String asName) { return this; } + public WindowStream count_distinct_2(String fieldName, String asName) { + String distinctName = "__" + fieldName + "_distinct_" + asName + "__"; + String prefix = distinctName + "=distinct2(" + fieldName + ",HIT_WINDOW_INSTANCE_ID,SHUFFLE_KEY)"; + String suffix = asName + "=count(" + distinctName + ")"; + window.getSelectMap().put(asName, prefix + ";" + suffix); + return this; + } + /** * count_distinct算子(数据量大,容忍较少错误率) * diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/WindowTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/WindowTest.java index 3de80a4c..164fa8ef 100644 --- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/WindowTest.java +++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/WindowTest.java @@ -250,8 +250,9 @@ public void testCountDistinct() { .window(TumblingWindow.of(Time.minutes(5), "time")) .groupBy("user") .setLocalStorageOnly(true) - .count_distinct("page", "pv") - .count_distinct_large("page", "pv_large") + .count_distinct("page", "uv") + .count_distinct_large("page", "uv_large") + .count_distinct_2("page","uv_2") .toDataSteam() .toFile(resultFile.getAbsolutePath()).start(true); @@ -262,9 +263,11 @@ public void testCountDistinct() { for (String line : sessionList) { JSONObject object = JSONObject.parseObject(line); String user = object.getString("user"); - Integer userVisitCount = object.getInteger("pv"); - Integer userVisitCountLarge = object.getInteger("pv_large"); + Integer userVisitCount = object.getInteger("uv"); + Integer userVisitCountBasedRocksDB = object.getInteger("uv_2"); + Integer userVisitCountLarge = object.getInteger("uv_large"); Assert.assertEquals(userVisitCount, userVisitCountLarge); + Assert.assertEquals(userVisitCount, userVisitCountBasedRocksDB); statisticMap.put(user, userVisitCount); } Assert.assertEquals(3, statisticMap.size()); diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java index aa7c1bc0..a3fdee71 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java @@ -161,6 +161,10 @@ public boolean flushMessage(List messages) { String queueId = message.getHeader().getQueueId(); MessageOffset messageOffset = message.getHeader().getMessageOffset(); ISource source = message.getHeader().getSource(); + //TODO why null? + if (source == null) { + continue; + } String pipelineName = message.getHeader().getPiplineName(); String sourceName = CheckPointManager.createSourceName(source, pipelineName); SourceState sourceState = this.sourceName2State.get(sourceName); diff --git a/rocketmq-streams-script/pom.xml b/rocketmq-streams-script/pom.xml index a4e5a41d..a962be4d 100755 --- a/rocketmq-streams-script/pom.xml +++ b/rocketmq-streams-script/pom.xml @@ -28,6 +28,11 @@ ROCKETMQ STREAMS :: script jar + + + org.apache.rocketmq + rocketmq-streams-state + net.agkn diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/CountAccumulator.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/CountAccumulator.java index 2de0e634..b4eae602 100644 --- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/CountAccumulator.java +++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/CountAccumulator.java @@ -17,12 +17,18 @@ package org.apache.rocketmq.streams.script.function.aggregation; import java.util.Iterator; +import java.util.Map; import java.util.Set; import org.apache.rocketmq.streams.common.utils.CollectionUtil; +import org.apache.rocketmq.streams.common.utils.MapKeyUtil; import org.apache.rocketmq.streams.script.annotation.Function; import org.apache.rocketmq.streams.script.annotation.UDAFFunction; import org.apache.rocketmq.streams.script.service.IAccumulator; +import org.apache.rocketmq.streams.state.kv.rocksdb.RocksdbState; +/** + * @author arthur.liang + */ @Function @UDAFFunction("count") public class CountAccumulator implements IAccumulator { @@ -48,10 +54,22 @@ public void accumulate(CountAccum accumulator, Object... parameters) { if (CollectionUtil.isEmpty(parameters) || parameters[0] == null) { return; } - if (parameters[0] instanceof Set) { - //count(distinct(xx)) - //FIXME a trick! use CountDistinctAccumulator instead of the following code - accumulator.count = ((Set)parameters[0]).size(); + if (parameters[0] instanceof DistinctAccumulator2.DistinctAccum2) { + DistinctAccumulator2.DistinctAccum2 distinctAccum2 = (DistinctAccumulator2.DistinctAccum2) parameters[0]; + String prefix = MapKeyUtil.createKey(DistinctAccumulator2.DISTINCT_STATE_PREFIX, distinctAccum2.windowInstanceId, distinctAccum2.groupByMd5); + RocksdbState state = new RocksdbState(); + Iterator> iterator = state.entryIterator(prefix); + int sum = 0; + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry == null) { + break; + } + sum += 1; + } + accumulator.count = sum; + } else if (parameters[0] instanceof Set) { + accumulator.count = ((Set) parameters[0]).size(); } else { accumulator.count += 1; } diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/CountDistinctAccumulator.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/CountDistinctAccumulator.java index 686e761f..1f2ee24d 100644 --- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/CountDistinctAccumulator.java +++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/CountDistinctAccumulator.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.streams.script.function.aggregation; import com.google.common.hash.HashFunction; @@ -9,6 +25,8 @@ import org.apache.rocketmq.streams.script.service.IAccumulator; /** + * count(distinct) implementation based on hyper-log-log algorithm + * * @author arthur.liang */ @Function @@ -49,7 +67,6 @@ public class CountDistinctAccumulator implements IAccumulator { diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/DistinctAccumulator2.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/DistinctAccumulator2.java new file mode 100644 index 00000000..a4ceecd3 --- /dev/null +++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/DistinctAccumulator2.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.script.function.aggregation; + +import java.util.Iterator; +import org.apache.rocketmq.streams.common.utils.CollectionUtil; +import org.apache.rocketmq.streams.common.utils.MapKeyUtil; +import org.apache.rocketmq.streams.common.utils.StringUtil; +import org.apache.rocketmq.streams.script.annotation.Function; +import org.apache.rocketmq.streams.script.annotation.UDAFFunction; +import org.apache.rocketmq.streams.script.service.IAccumulator; +import org.apache.rocketmq.streams.state.kv.rocksdb.RocksdbState; + +/** + * distinct operator based rocksdb state + * + * @author arthur.liang + */ +@Function +@UDAFFunction("distinct2") +public class DistinctAccumulator2 implements IAccumulator { + + public static final String DISTINCT_STATE_PREFIX = "__distinct__"; + private static final Integer PARAMETER_SIZE = 3; + private static RocksdbState state = new RocksdbState(); + + public static class DistinctAccum2 { + public String windowInstanceId; + public String groupByMd5; + } + + @Override + public DistinctAccum2 createAccumulator() { + return new DistinctAccum2(); + } + + @Override + public DistinctAccum2 getValue(DistinctAccum2 accumulator) { + return accumulator; + } + + @Override + public void accumulate(DistinctAccum2 accumulator, Object... parameters) { + if (CollectionUtil.isEmpty(parameters) || parameters.length != PARAMETER_SIZE) { + return; + } + try { + String value = (String) parameters[0]; + String valueMd5 = StringUtil.createMD5Str(value); + String windowInstanceId = (String) parameters[1]; + if (accumulator.windowInstanceId == null && windowInstanceId != null) { + accumulator.windowInstanceId = windowInstanceId; + } + assert accumulator.windowInstanceId.equalsIgnoreCase(windowInstanceId); + String groupByValue = (String) parameters[2]; + String groupByMd5 = StringUtil.createMD5Str(groupByValue); + if (accumulator.groupByMd5 == null && groupByMd5 != null) { + accumulator.groupByMd5 = groupByMd5; + } + assert accumulator.groupByMd5.equalsIgnoreCase(groupByMd5); + String storeKey = MapKeyUtil.createKey(DISTINCT_STATE_PREFIX, accumulator.windowInstanceId, accumulator.groupByMd5, valueMd5); + state.putIfAbsent(storeKey, value); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void merge(DistinctAccum2 accumulator, Iterable its) { + Iterator it = its.iterator(); + while (it.hasNext()) { + DistinctAccum2 commonAccumulator = it.next(); + if (commonAccumulator != null) { + if (accumulator.windowInstanceId == null || accumulator.groupByMd5 == null) { + accumulator.windowInstanceId = commonAccumulator.windowInstanceId; + accumulator.groupByMd5 = commonAccumulator.groupByMd5; + } + assert accumulator.windowInstanceId.equalsIgnoreCase(commonAccumulator.windowInstanceId); + assert accumulator.groupByMd5.equalsIgnoreCase(commonAccumulator.groupByMd5); + } + } + } + + @Override + public void retract(DistinctAccum2 accumulator, String... parameters) { + } + +} \ No newline at end of file diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/AggregationScript.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/AggregationScript.java index 46369240..d7b6553d 100644 --- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/AggregationScript.java +++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/AggregationScript.java @@ -33,6 +33,7 @@ import org.apache.rocketmq.streams.script.function.aggregation.CountAccumulator; import org.apache.rocketmq.streams.script.function.aggregation.CountDistinctAccumulator; import org.apache.rocketmq.streams.script.function.aggregation.DistinctAccumulator; +import org.apache.rocketmq.streams.script.function.aggregation.DistinctAccumulator2; import org.apache.rocketmq.streams.script.function.aggregation.MaxAccumulator; import org.apache.rocketmq.streams.script.function.aggregation.MinAccumulator; import org.apache.rocketmq.streams.script.function.aggregation.SumAccumulator; @@ -50,6 +51,7 @@ public class AggregationScript implements IStreamOperatorjar ROCKETMQ STREAMS :: state - + + org.apache.rocketmq + rocketmq-streams-commons + + + + org.rocksdb + rocksdbjni + diff --git a/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/IStreamState.java b/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/IStreamState.java new file mode 100644 index 00000000..c07550dd --- /dev/null +++ b/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/IStreamState.java @@ -0,0 +1,103 @@ +package org.apache.rocketmq.streams.state; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * @param + * @param + * @author arthur.liang + */ +public interface IStreamState { + + /** + * Returns the value to which the specified key is mapped, or {@code null} if this map contains no mapping for the + * key. + * + * @param key + * @return value + */ + V get(K key); + + /** + * Returns all values to which all specified keys is mapped + * + * @param key + * @return + */ + Map getAll(List key); + + /** + * Associates the specified value with the specified key in this map (optional operation). If the map previously + * contained a mapping for the key, the old value is replaced by the specified value. + * + * @param key + * @param value + * @return + */ + V put(K key, V value); + + /** + * Associates the specified value with the specified key in this map (optional operation). If the map previously + * contained a mapping for the key, the old value will not be replaced by the specified value. + * + * @param key + * @param value + * @return + */ + V putIfAbsent(K key, V value); + + /** + * Removes the mapping for a key from this map if it is present (optional operation). + * + * @param key + * @return + */ + V remove(K key); + + /** + * Removes the mapping for all keys from this map if it is present (optional operation). + * + * @param keys + */ + void removeAll(List keys); + + // Bulk Operations + + /** + * Copies all of the mappings from the specified map to this map (optional operation). + * + * @param map + */ + void putAll(Map map); + + /** + * Removes all of the mappings from this map (optional operation). The map will be empty after this call returns. + */ + void clear(); + + /** + * Returns a {@link Set} view of the keys contained in this map. + * + * @return a set view of the keys contained in this map + */ + Iterator keyIterator(); + + /** + * Returns a {@link Set} view of the mappings contained in this map. + * + * @return a set view of the mappings contained in this map + */ + Iterator> entryIterator(); + + /** + * Returns a {@link Set} view of the specified prefix mappings contained in this map. + * + * @param prefix + * @returna set view of the mappings contained in this map + */ + Iterator> entryIterator(String prefix); + +} diff --git a/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/kv/rocksdb/RocksDBOperator.java b/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/kv/rocksdb/RocksDBOperator.java new file mode 100644 index 00000000..bd6d26c5 --- /dev/null +++ b/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/kv/rocksdb/RocksDBOperator.java @@ -0,0 +1,78 @@ +package org.apache.rocketmq.streams.state.kv.rocksdb; + +import java.io.File; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rocketmq.streams.common.utils.FileUtil; +import org.apache.rocketmq.streams.common.utils.RuntimeUtil; +import org.apache.rocketmq.streams.common.utils.StringUtil; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.TtlDB; +import org.rocksdb.WriteOptions; + +public class RocksDBOperator { + + protected static String DB_PATH = "/tmp/rocksdb"; + + protected static String UTF8 = "UTF8"; + + protected static AtomicBoolean hasCreate = new AtomicBoolean(false); + + protected static RocksDB rocksDB; + + protected WriteOptions writeOptions = new WriteOptions(); + + static { + RocksDB.loadLibrary(); + } + + public RocksDBOperator() { + this(FileUtil.concatFilePath(StringUtil.isEmpty(FileUtil.getJarPath()) ? DB_PATH + File.separator + RuntimeUtil.getDipperInstanceId() : FileUtil.getJarPath() + File.separator + RuntimeUtil.getDipperInstanceId(), "rocksdb")); + } + + public RocksDBOperator(String rocksdbFilePath) { + if (hasCreate.compareAndSet(false, true)) { + synchronized (RocksDBOperator.class) { + if (RocksDBOperator.rocksDB == null) { + synchronized (RocksDBOperator.class) { + if (RocksDBOperator.rocksDB == null) { + try (final Options options = new Options().setCreateIfMissing(true)) { + + try { + File dir = new File(rocksdbFilePath); + if (dir.exists()) { + dir.delete(); + } + dir.mkdirs(); + final TtlDB db = TtlDB.open(options, rocksdbFilePath, 10800, false); + RocksDBOperator.rocksDB = db; + writeOptions.setSync(true); + } catch (RocksDBException e) { + throw new RuntimeException("create rocksdb error " + e.getMessage()); + } + } + } + } + } + } + } + } + + public RocksDB getInstance() { + if (rocksDB == null) { + synchronized (RocksDBOperator.class) { + if (rocksDB == null) { + RocksDBOperator operator = new RocksDBOperator(); + if (rocksDB != null) { + return rocksDB; + } else { + throw new RuntimeException("failed in creating rocksdb!"); + } + } + } + } + return rocksDB; + } + +} diff --git a/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/kv/rocksdb/RocksdbState.java b/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/kv/rocksdb/RocksdbState.java new file mode 100644 index 00000000..09eb99a0 --- /dev/null +++ b/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/kv/rocksdb/RocksdbState.java @@ -0,0 +1,249 @@ +package org.apache.rocketmq.streams.state.kv.rocksdb; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.rocketmq.streams.common.utils.CollectionUtil; +import org.apache.rocketmq.streams.common.utils.StringUtil; +import org.apache.rocketmq.streams.state.IStreamState; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import static org.apache.rocketmq.streams.state.kv.rocksdb.RocksDBOperator.UTF8; + +/** + * kv state based rocksdb + * + * @author arthur.liang + */ +public class RocksdbState implements IStreamState { + + private static RocksDBOperator operator = new RocksDBOperator(); + + //TODO replace + private HashMap cache = new HashMap<>(); + private final static Byte SIGN = 1; + + @Override public String get(String key) { + try { + return getValueFromByte(operator.getInstance().get(getKeyBytes(key))); + } catch (Exception e) { + //throw new RuntimeException("failed in getting key from rocksdb! " + key, e); + return null; + } + } + + @Override public Map getAll(List keys) { + if (CollectionUtil.isEmpty(keys)) { + return new HashMap<>(4); + } + List keyByteList = new ArrayList<>(); + List keyStrList = new ArrayList<>(); + for (String key : keys) { + keyByteList.add(getKeyBytes(key)); + keyStrList.add(key); + } + try { + Map resultMap = new HashMap<>(keys.size()); + Map map = operator.getInstance().multiGet(keyByteList); + Iterator> it = map.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + String key = getValueFromByte(entry.getKey()); + String value = getValueFromByte(entry.getValue()); + resultMap.put(key, value); + } + return resultMap; + } catch (RocksDBException e) { + throw new RuntimeException("failed in getting all from rocksdb!", e); + } + } + + @Override public String put(String key, String value) { + Map map = new HashMap(4) {{ + put(key, value); + }}; + putAll(map); + return null; + } + + @Override public String putIfAbsent(String key, String value) { + //TODO use lru cache + if (cache.containsKey(key)) { + return null; + } + put(key, value); + cache.put(key, SIGN); + return null; + } + + @Override public void putAll(Map map) { + if (map == null) { + return; + } + try { + WriteBatch writeBatch = new WriteBatch(); + Iterator> it = map.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + String key = entry.getKey(); + String value = entry.getValue(); + writeBatch.put(key.getBytes(UTF8), value.getBytes(UTF8)); + } + + WriteOptions writeOptions = new WriteOptions(); + writeOptions.setSync(false); + writeOptions.setDisableWAL(true); + operator.getInstance().write(writeOptions, writeBatch); + writeBatch.close(); + writeOptions.close(); + } catch (Exception e) { + throw new RuntimeException("failed in putting all into rocksdb!", e); + } + } + + @Override public String remove(String key) { + try { + operator.getInstance().delete(getKeyBytes(key)); + } catch (RocksDBException e) { + throw new RuntimeException("failed in removing all from rocksdb! " + key, e); + } + return null; + } + + @Override public void removeAll(List keys) { + for (String key : keys) { + try { + operator.getInstance().delete(getKeyBytes(key)); + } catch (RocksDBException e) { + throw new RuntimeException("failed in removing all from rocksdb! " + key, e); + } + } + } + + @Override public void clear() { + } + + @Override public Iterator keyIterator() { + return null; + } + + @Override public Iterator> entryIterator() { + return null; + } + + @Override public Iterator> entryIterator(String prefix) { + return new RocksDBIterator(prefix); + } + + /** + * 把key转化成byte + * + * @param key + * @return + */ + protected byte[] getKeyBytes(String key) { + try { + if (StringUtil.isEmpty(key)) { + return null; + } + return key.getBytes(UTF8); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("failed in getting byte[] from key! " + key, e); + } + } + + /** + * 把byte转化成值 + * + * @param bytes + * @return + */ + protected static String getValueFromByte(byte[] bytes) { + try { + return new String(bytes, UTF8); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + public static class RocksDBIterator implements Iterator> { + + protected AtomicBoolean hasInit = new AtomicBoolean(false); + + private ReadOptions readOptions = new ReadOptions(); + + private RocksIterator iter; + + protected String keyPrefix; + + public RocksDBIterator(String keyPrefix) { + readOptions.setPrefixSameAsStart(true).setTotalOrderSeek(true); + iter = operator.getInstance().newIterator(readOptions); + this.keyPrefix = keyPrefix; + } + + @Override public boolean hasNext() { + if (hasInit.compareAndSet(false, true)) { + iter.seek(keyPrefix.getBytes()); + } + return iter.isValid(); + } + + @Override public Map.Entry next() { + String key = new String(iter.key()); + if (!key.startsWith(keyPrefix)) { + return null; + } + String value = getValueFromByte(iter.value()); + iter.next(); + return new Element(key, value); + } + + } + + private static class Element implements Map.Entry { + + private Pair pair; + + private Element() { + + } + + public Element(String key, String value) { + pair = Pair.of(key, value); + } + + @Override public String getKey() { + if (pair != null) { + return pair.getKey(); + } + return null; + } + + @Override public String getValue() { + if (pair != null) { + return pair.getRight(); + } + return null; + } + + @Override public String setValue(String value) { + if (pair != null) { + String old = pair.getRight(); + pair.setValue(value); + return old; + } + return null; + } + } + +} diff --git a/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestRocksdbState.java b/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestRocksdbState.java new file mode 100644 index 00000000..cf1a8b7b --- /dev/null +++ b/rocketmq-streams-state/src/test/java/org/apache/rocketmq/streams/state/kv/TestRocksdbState.java @@ -0,0 +1,91 @@ +package org.apache.rocketmq.streams.state.kv; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.rocketmq.streams.state.kv.rocksdb.RocksdbState; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * @author arthur.liang + */ +public class TestRocksdbState { + + private RocksdbState rocksdbState = new RocksdbState(); + + private String key = "window_id"; + + private String value = "window_value"; + + private int sampleSize = 8; + + @Before + public void testAddAll() { + rocksdbState.put(key, value); + Map stateMap = new HashMap<>(8); + for (int i = 0; i < sampleSize; i++) { + stateMap.put(key + "_" + i, value + "_" + i); + } + rocksdbState.putAll(stateMap); + } + + @Test + public void testGetAll() { + String singleValue = rocksdbState.get(key); + Assert.assertEquals(value, singleValue); + List keys = new ArrayList(sampleSize); + for (int i = 0; i < sampleSize; i++) { + keys.add(key + "_" + i); + } + Map valueMap = rocksdbState.getAll(keys); + Assert.assertEquals(8, valueMap.size()); + singleValue = rocksdbState.get("any_key"); + Assert.assertEquals(null, singleValue); + } + + @Test + public void testIterator() { + Iterator> iterator = rocksdbState.entryIterator(key); + Map valueMap = new HashMap<>(sampleSize); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry == null) { + break; + } + valueMap.put(entry.getKey(), entry.getValue()); + } + Assert.assertEquals(9, valueMap.size()); + } + + @Test + public void testDelete() { + rocksdbState.remove(key); + String singleValue = rocksdbState.get(key); + Assert.assertEquals(null, singleValue); + List keys = new ArrayList<>(sampleSize); + for (int i = 0; i < sampleSize; i++) { + keys.add(key + "_" + i); + } + rocksdbState.removeAll(keys); + Map valueMap = rocksdbState.getAll(keys); + Assert.assertEquals(0, valueMap.size()); + } + + @Test + public void testOverWrite() { + String replaceValue = value + "_new"; + rocksdbState.put(key, replaceValue); + String replaceResult = rocksdbState.get(key); + Assert.assertEquals(replaceResult, replaceValue); + } + + @Test + public void testNotOverWrite() { + + } + +} diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java index 421c669b..57c07f1c 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java @@ -107,13 +107,20 @@ protected String generateShuffleKey(IMessage message) { * @param instance2Messages * @param windowInstanceMap */ - protected void groupByWindowInstanceAndQueueId(List messageList, Map, List> instance2Messages, + protected void groupByWindowInstanceAndQueueId(List messageList, + Map, List> instance2Messages, Map windowInstanceMap) { for (IMessage message : messageList) { - - List windowInstances = (List)message.getMessageBody().get(WindowInstance.class.getSimpleName()); + //the queueId will be replace below, so get first here! String queueId = message.getHeader().getQueueId(); - for(WindowInstance windowInstance:windowInstances){ + String oriQueueId = message.getMessageBody().getString(WindowCache.ORIGIN_QUEUE_ID); + String oriOffset = message.getMessageBody().getString(WindowCache.ORIGIN_OFFSET); + Boolean isLong = message.getMessageBody().getBoolean(WindowCache.ORIGIN_QUEUE_IS_LONG); + message.getHeader().setQueueId(oriQueueId); + message.getHeader().setOffset(oriOffset); + message.getHeader().setOffsetIsLong(isLong); + List windowInstances = (List) message.getMessageBody().get(WindowInstance.class.getSimpleName()); + for (WindowInstance windowInstance : windowInstances) { String windowInstanceId = windowInstance.createWindowInstanceId(); Pair queueIdAndInstanceKey = Pair.of(queueId, windowInstanceId); List messages = instance2Messages.get(queueIdAndInstanceKey); @@ -121,17 +128,13 @@ protected void groupByWindowInstanceAndQueueId(List messageList, Map

(); instance2Messages.put(queueIdAndInstanceKey, messages); } - messages.add(message); + //in case of changing message concurrently in hop window + IMessage cloneMessage = message.deepCopy(); + //bring window instance id into accumulator computation + cloneMessage.getMessageBody().put("HIT_WINDOW_INSTANCE_ID", windowInstance.createWindowInstanceId()); + messages.add(cloneMessage); windowInstanceMap.put(windowInstanceId, windowInstance); } - - String oriQueueId = message.getMessageBody().getString(WindowCache.ORIGIN_QUEUE_ID); - String oriOffset = message.getMessageBody().getString(WindowCache.ORIGIN_OFFSET); - Boolean isLong = message.getMessageBody().getBoolean(WindowCache.ORIGIN_QUEUE_IS_LONG); - message.getHeader().setQueueId(oriQueueId); - message.getHeader().setOffset(oriOffset); - message.getHeader().setOffsetIsLong(isLong); - } } diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/rocksdb/RocksdbStorage.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/rocksdb/RocksdbStorage.java index de7b57c1..3bb98fbe 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/rocksdb/RocksdbStorage.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/rocksdb/RocksdbStorage.java @@ -36,6 +36,7 @@ import org.apache.rocketmq.streams.common.utils.ReflectUtil; import org.apache.rocketmq.streams.common.utils.RuntimeUtil; import org.apache.rocketmq.streams.common.utils.StringUtil; +import org.apache.rocketmq.streams.state.kv.rocksdb.RocksDBOperator; import org.apache.rocketmq.streams.window.model.WindowInstance; import org.apache.rocketmq.streams.window.state.WindowBaseValue; import org.apache.rocketmq.streams.window.storage.AbstractWindowStorage; @@ -53,45 +54,10 @@ public class RocksdbStorage extends AbstractWindowSto protected static String DB_PATH = "/tmp/rocksdb"; protected static String UTF8 = "UTF8"; protected static AtomicBoolean hasCreate = new AtomicBoolean(false); - protected static RocksDB rocksDB; + protected static RocksDB rocksDB = new RocksDBOperator().getInstance(); protected WriteOptions writeOptions = new WriteOptions(); - static { - RocksDB.loadLibrary(); - } - - public RocksdbStorage() { - this(FileUtil.concatFilePath(StringUtil.isEmpty(FileUtil.getJarPath()) ? DB_PATH + File.separator + RuntimeUtil.getDipperInstanceId() : FileUtil.getJarPath() + File.separator + RuntimeUtil.getDipperInstanceId(), "rocksdb")); - } - public RocksdbStorage(String rocksdbFilePath) { - if (hasCreate.compareAndSet(false, true)) { - synchronized (RocksdbStorage.class) { - if (RocksdbStorage.rocksDB == null) { - synchronized (RocksdbStorage.class) { - if (RocksdbStorage.rocksDB == null) { - try (final Options options = new Options().setCreateIfMissing(true)) { - - try { - File dir = new File(rocksdbFilePath); - if (dir.exists()) { - dir.delete(); - } - dir.mkdirs(); - final TtlDB db = TtlDB.open(options, rocksdbFilePath, 10800, false); - RocksdbStorage.rocksDB = db; - writeOptions.setSync(true); - } catch (RocksDBException e) { - throw new RuntimeException("create rocksdb error " + e.getMessage()); - } - } - } - } - } - } - } - - } @Override public void removeKeys(Collection keys) { diff --git a/rocketmq-streams-window/src/test/java/org/apache/rocketmq/streams/storage/RocksdbTest.java b/rocketmq-streams-window/src/test/java/org/apache/rocketmq/streams/storage/RocksdbTest.java index b2f0c708..c708fdd3 100644 --- a/rocketmq-streams-window/src/test/java/org/apache/rocketmq/streams/storage/RocksdbTest.java +++ b/rocketmq-streams-window/src/test/java/org/apache/rocketmq/streams/storage/RocksdbTest.java @@ -20,6 +20,33 @@ public class RocksdbTest { private static RocksdbStorage storage = new RocksdbStorage<>(); + @Test + public void testMultiProcess() { + // + RocksdbStorage storage1 = new RocksdbStorage(); + RocksdbStorage storage2 = new RocksdbStorage(); + // + // + WindowBaseValue value1 = new WindowBaseValue(); + value1.setStartTime("2021-09-07 11:00:00"); + value1.setEndTime("2021-09-07 11:10:00"); + value1.setFireTime("2021-09-07 11:11:00"); + WindowBaseValue value2 = new WindowBaseValue(); + value2.setStartTime("2021-09-07 12:00:00"); + value2.setEndTime("2021-09-07 12:10:00"); + value2.setFireTime("2021-09-07 12:11:00"); + // + storage1.put("storage_1", value1); + storage2.put("storage_2", value2); + // + RocksdbStorage storage3 = new RocksdbStorage(); + Map valueMap = storage3.multiGet(WindowBaseValue.class, new ArrayList() {{ + add("storage_1"); + add("storage_2"); + }}); + Assert.assertEquals(2, valueMap.size()); + } + @Test public void testMultiValues() { //