Skip to content

Commit

Permalink
apache#74 add count(distinct) implementation based on rocksdb
Browse files Browse the repository at this point in the history
  • Loading branch information
speak2me committed Sep 29, 2021
1 parent 66fe367 commit 9fc1826
Show file tree
Hide file tree
Showing 17 changed files with 748 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ public WindowStream count_distinct(String fieldName, String asName) {
return this;
}

public WindowStream count_distinct_2(String fieldName, String asName) {
String distinctName = "__" + fieldName + "_distinct_" + asName + "__";
String prefix = distinctName + "=distinct2(" + fieldName + ",HIT_WINDOW_INSTANCE_ID,SHUFFLE_KEY)";
String suffix = asName + "=count(" + distinctName + ")";
window.getSelectMap().put(asName, prefix + ";" + suffix);
return this;
}

/**
* count_distinct算子(数据量大,容忍较少错误率)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,9 @@ public void testCountDistinct() {
.window(TumblingWindow.of(Time.minutes(5), "time"))
.groupBy("user")
.setLocalStorageOnly(true)
.count_distinct("page", "pv")
.count_distinct_large("page", "pv_large")
.count_distinct("page", "uv")
.count_distinct_large("page", "uv_large")
.count_distinct_2("page","uv_2")
.toDataSteam()
.toFile(resultFile.getAbsolutePath()).start(true);

Expand All @@ -262,9 +263,11 @@ public void testCountDistinct() {
for (String line : sessionList) {
JSONObject object = JSONObject.parseObject(line);
String user = object.getString("user");
Integer userVisitCount = object.getInteger("pv");
Integer userVisitCountLarge = object.getInteger("pv_large");
Integer userVisitCount = object.getInteger("uv");
Integer userVisitCountBasedRocksDB = object.getInteger("uv_2");
Integer userVisitCountLarge = object.getInteger("uv_large");
Assert.assertEquals(userVisitCount, userVisitCountLarge);
Assert.assertEquals(userVisitCount, userVisitCountBasedRocksDB);
statisticMap.put(user, userVisitCount);
}
Assert.assertEquals(3, statisticMap.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ public boolean flushMessage(List<IMessage> messages) {
String queueId = message.getHeader().getQueueId();
MessageOffset messageOffset = message.getHeader().getMessageOffset();
ISource source = message.getHeader().getSource();
//TODO why null?
if (source == null) {
continue;
}
String pipelineName = message.getHeader().getPiplineName();
String sourceName = CheckPointManager.createSourceName(source, pipelineName);
SourceState sourceState = this.sourceName2State.get(sourceName);
Expand Down
5 changes: 5 additions & 0 deletions rocketmq-streams-script/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
<name>ROCKETMQ STREAMS :: script</name>
<packaging>jar</packaging>
<dependencies>
<!--rocketmq state-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-state</artifactId>
</dependency>
<!--hyperLogLog used in dv computation-->
<dependency>
<groupId>net.agkn</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@
package org.apache.rocketmq.streams.script.function.aggregation;

import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.script.annotation.Function;
import org.apache.rocketmq.streams.script.annotation.UDAFFunction;
import org.apache.rocketmq.streams.script.service.IAccumulator;
import org.apache.rocketmq.streams.state.kv.rocksdb.RocksdbState;

/**
* @author arthur.liang
*/
@Function
@UDAFFunction("count")
public class CountAccumulator implements IAccumulator<Integer, CountAccumulator.CountAccum> {
Expand All @@ -48,10 +54,22 @@ public void accumulate(CountAccum accumulator, Object... parameters) {
if (CollectionUtil.isEmpty(parameters) || parameters[0] == null) {
return;
}
if (parameters[0] instanceof Set) {
//count(distinct(xx))
//FIXME a trick! use CountDistinctAccumulator instead of the following code
accumulator.count = ((Set)parameters[0]).size();
if (parameters[0] instanceof DistinctAccumulator2.DistinctAccum2) {
DistinctAccumulator2.DistinctAccum2 distinctAccum2 = (DistinctAccumulator2.DistinctAccum2) parameters[0];
String prefix = MapKeyUtil.createKey(DistinctAccumulator2.DISTINCT_STATE_PREFIX, distinctAccum2.windowInstanceId, distinctAccum2.groupByMd5);
RocksdbState state = new RocksdbState();
Iterator<Map.Entry<String, String>> iterator = state.entryIterator(prefix);
int sum = 0;
while (iterator.hasNext()) {
Map.Entry<String, String> entry = iterator.next();
if (entry == null) {
break;
}
sum += 1;
}
accumulator.count = sum;
} else if (parameters[0] instanceof Set) {
accumulator.count = ((Set) parameters[0]).size();
} else {
accumulator.count += 1;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.script.function.aggregation;

import com.google.common.hash.HashFunction;
Expand All @@ -9,6 +25,8 @@
import org.apache.rocketmq.streams.script.service.IAccumulator;

/**
* count(distinct) implementation based on hyper-log-log algorithm
*
* @author arthur.liang
*/
@Function
Expand Down Expand Up @@ -49,7 +67,6 @@ public class CountDistinctAccumulator implements IAccumulator<Long, CountDistinc
}

public static class CountDistinctAccum {
//TODO make sure init parameters here
private final HLL hll = new HLL(30, 8);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
import org.apache.rocketmq.streams.script.annotation.UDAFFunction;
import org.apache.rocketmq.streams.script.service.IAccumulator;

/**
* distinct operator based memory
*
* @author arthur.liang
*/
@Function
@UDAFFunction("distinct")
public class DistinctAccumulator implements IAccumulator<Set, DistinctAccumulator.DistinctAccum> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.script.function.aggregation;

import java.util.Iterator;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.script.annotation.Function;
import org.apache.rocketmq.streams.script.annotation.UDAFFunction;
import org.apache.rocketmq.streams.script.service.IAccumulator;
import org.apache.rocketmq.streams.state.kv.rocksdb.RocksdbState;

/**
* distinct operator based rocksdb state
*
* @author arthur.liang
*/
@Function
@UDAFFunction("distinct2")
public class DistinctAccumulator2 implements IAccumulator<DistinctAccumulator2.DistinctAccum2, DistinctAccumulator2.DistinctAccum2> {

public static final String DISTINCT_STATE_PREFIX = "__distinct__";
private static final Integer PARAMETER_SIZE = 3;
private static RocksdbState state = new RocksdbState();

public static class DistinctAccum2 {
public String windowInstanceId;
public String groupByMd5;
}

@Override
public DistinctAccum2 createAccumulator() {
return new DistinctAccum2();
}

@Override
public DistinctAccum2 getValue(DistinctAccum2 accumulator) {
return accumulator;
}

@Override
public void accumulate(DistinctAccum2 accumulator, Object... parameters) {
if (CollectionUtil.isEmpty(parameters) || parameters.length != PARAMETER_SIZE) {
return;
}
try {
String value = (String) parameters[0];
String valueMd5 = StringUtil.createMD5Str(value);
String windowInstanceId = (String) parameters[1];
if (accumulator.windowInstanceId == null && windowInstanceId != null) {
accumulator.windowInstanceId = windowInstanceId;
}
assert accumulator.windowInstanceId.equalsIgnoreCase(windowInstanceId);
String groupByValue = (String) parameters[2];
String groupByMd5 = StringUtil.createMD5Str(groupByValue);
if (accumulator.groupByMd5 == null && groupByMd5 != null) {
accumulator.groupByMd5 = groupByMd5;
}
assert accumulator.groupByMd5.equalsIgnoreCase(groupByMd5);
String storeKey = MapKeyUtil.createKey(DISTINCT_STATE_PREFIX, accumulator.windowInstanceId, accumulator.groupByMd5, valueMd5);
state.putIfAbsent(storeKey, value);
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void merge(DistinctAccum2 accumulator, Iterable<DistinctAccum2> its) {
Iterator<DistinctAccum2> it = its.iterator();
while (it.hasNext()) {
DistinctAccum2 commonAccumulator = it.next();
if (commonAccumulator != null) {
if (accumulator.windowInstanceId == null || accumulator.groupByMd5 == null) {
accumulator.windowInstanceId = commonAccumulator.windowInstanceId;
accumulator.groupByMd5 = commonAccumulator.groupByMd5;
}
assert accumulator.windowInstanceId.equalsIgnoreCase(commonAccumulator.windowInstanceId);
assert accumulator.groupByMd5.equalsIgnoreCase(commonAccumulator.groupByMd5);
}
}
}

@Override
public void retract(DistinctAccum2 accumulator, String... parameters) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.rocketmq.streams.script.function.aggregation.CountAccumulator;
import org.apache.rocketmq.streams.script.function.aggregation.CountDistinctAccumulator;
import org.apache.rocketmq.streams.script.function.aggregation.DistinctAccumulator;
import org.apache.rocketmq.streams.script.function.aggregation.DistinctAccumulator2;
import org.apache.rocketmq.streams.script.function.aggregation.MaxAccumulator;
import org.apache.rocketmq.streams.script.function.aggregation.MinAccumulator;
import org.apache.rocketmq.streams.script.function.aggregation.SumAccumulator;
Expand All @@ -50,6 +51,7 @@ public class AggregationScript implements IStreamOperator<IMessage, List<IMessag
put("min", MinAccumulator.class);
put("count", CountAccumulator.class);
put("distinct", DistinctAccumulator.class);
put("distinct2", DistinctAccumulator2.class);
put("sum", SumAccumulator.class);
put("avg", AverageAccumulator.class);
put("concat_distinct", ConcatDistinctAccumulator.class);
Expand Down
10 changes: 9 additions & 1 deletion rocketmq-streams-state/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@
<packaging>jar</packaging>
<name>ROCKETMQ STREAMS :: state</name>
<dependencies>

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-commons</artifactId>
</dependency>
<!--rocksdb-->
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package org.apache.rocketmq.streams.state;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* @param <K>
* @param <V>
* @author arthur.liang
*/
public interface IStreamState<K, V> {

/**
* Returns the value to which the specified key is mapped, or {@code null} if this map contains no mapping for the
* key.
*
* @param key
* @return value
*/
V get(K key);

/**
* Returns all values to which all specified keys is mapped
*
* @param key
* @return
*/
Map<K, V> getAll(List<K> key);

/**
* Associates the specified value with the specified key in this map (optional operation). If the map previously
* contained a mapping for the key, the old value is replaced by the specified value.
*
* @param key
* @param value
* @return
*/
V put(K key, V value);

/**
* Associates the specified value with the specified key in this map (optional operation). If the map previously
* contained a mapping for the key, the old value will not be replaced by the specified value.
*
* @param key
* @param value
* @return
*/
V putIfAbsent(K key, V value);

/**
* Removes the mapping for a key from this map if it is present (optional operation).
*
* @param key
* @return
*/
V remove(K key);

/**
* Removes the mapping for all keys from this map if it is present (optional operation).
*
* @param keys
*/
void removeAll(List<K> keys);

// Bulk Operations

/**
* Copies all of the mappings from the specified map to this map (optional operation).
*
* @param map
*/
void putAll(Map<? extends K, ? extends V> map);

/**
* Removes all of the mappings from this map (optional operation). The map will be empty after this call returns.
*/
void clear();

/**
* Returns a {@link Set} view of the keys contained in this map.
*
* @return a set view of the keys contained in this map
*/
Iterator<K> keyIterator();

/**
* Returns a {@link Set} view of the mappings contained in this map.
*
* @return a set view of the mappings contained in this map
*/
Iterator<Map.Entry<K, V>> entryIterator();

/**
* Returns a {@link Set} view of the specified prefix mappings contained in this map.
*
* @param prefix
* @returna set view of the mappings contained in this map
*/
Iterator<Map.Entry<K, V>> entryIterator(String prefix);

}
Loading

0 comments on commit 9fc1826

Please sign in to comment.