forked from apache/rocketmq-streams
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
apache#74 add count(distinct) implementation based on rocksdb
- Loading branch information
Showing
17 changed files
with
748 additions
and
59 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
102 changes: 102 additions & 0 deletions
102
...in/java/org/apache/rocketmq/streams/script/function/aggregation/DistinctAccumulator2.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.rocketmq.streams.script.function.aggregation; | ||
|
||
import java.util.Iterator; | ||
import org.apache.rocketmq.streams.common.utils.CollectionUtil; | ||
import org.apache.rocketmq.streams.common.utils.MapKeyUtil; | ||
import org.apache.rocketmq.streams.common.utils.StringUtil; | ||
import org.apache.rocketmq.streams.script.annotation.Function; | ||
import org.apache.rocketmq.streams.script.annotation.UDAFFunction; | ||
import org.apache.rocketmq.streams.script.service.IAccumulator; | ||
import org.apache.rocketmq.streams.state.kv.rocksdb.RocksdbState; | ||
|
||
/** | ||
* distinct operator based rocksdb state | ||
* | ||
* @author arthur.liang | ||
*/ | ||
@Function | ||
@UDAFFunction("distinct2") | ||
public class DistinctAccumulator2 implements IAccumulator<DistinctAccumulator2.DistinctAccum2, DistinctAccumulator2.DistinctAccum2> { | ||
|
||
public static final String DISTINCT_STATE_PREFIX = "__distinct__"; | ||
private static final Integer PARAMETER_SIZE = 3; | ||
private static RocksdbState state = new RocksdbState(); | ||
|
||
public static class DistinctAccum2 { | ||
public String windowInstanceId; | ||
public String groupByMd5; | ||
} | ||
|
||
@Override | ||
public DistinctAccum2 createAccumulator() { | ||
return new DistinctAccum2(); | ||
} | ||
|
||
@Override | ||
public DistinctAccum2 getValue(DistinctAccum2 accumulator) { | ||
return accumulator; | ||
} | ||
|
||
@Override | ||
public void accumulate(DistinctAccum2 accumulator, Object... parameters) { | ||
if (CollectionUtil.isEmpty(parameters) || parameters.length != PARAMETER_SIZE) { | ||
return; | ||
} | ||
try { | ||
String value = (String) parameters[0]; | ||
String valueMd5 = StringUtil.createMD5Str(value); | ||
String windowInstanceId = (String) parameters[1]; | ||
if (accumulator.windowInstanceId == null && windowInstanceId != null) { | ||
accumulator.windowInstanceId = windowInstanceId; | ||
} | ||
assert accumulator.windowInstanceId.equalsIgnoreCase(windowInstanceId); | ||
String groupByValue = (String) parameters[2]; | ||
String groupByMd5 = StringUtil.createMD5Str(groupByValue); | ||
if (accumulator.groupByMd5 == null && groupByMd5 != null) { | ||
accumulator.groupByMd5 = groupByMd5; | ||
} | ||
assert accumulator.groupByMd5.equalsIgnoreCase(groupByMd5); | ||
String storeKey = MapKeyUtil.createKey(DISTINCT_STATE_PREFIX, accumulator.windowInstanceId, accumulator.groupByMd5, valueMd5); | ||
state.putIfAbsent(storeKey, value); | ||
} catch (Exception e) { | ||
e.printStackTrace(); | ||
} | ||
} | ||
|
||
@Override | ||
public void merge(DistinctAccum2 accumulator, Iterable<DistinctAccum2> its) { | ||
Iterator<DistinctAccum2> it = its.iterator(); | ||
while (it.hasNext()) { | ||
DistinctAccum2 commonAccumulator = it.next(); | ||
if (commonAccumulator != null) { | ||
if (accumulator.windowInstanceId == null || accumulator.groupByMd5 == null) { | ||
accumulator.windowInstanceId = commonAccumulator.windowInstanceId; | ||
accumulator.groupByMd5 = commonAccumulator.groupByMd5; | ||
} | ||
assert accumulator.windowInstanceId.equalsIgnoreCase(commonAccumulator.windowInstanceId); | ||
assert accumulator.groupByMd5.equalsIgnoreCase(commonAccumulator.groupByMd5); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void retract(DistinctAccum2 accumulator, String... parameters) { | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
103 changes: 103 additions & 0 deletions
103
rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/IStreamState.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package org.apache.rocketmq.streams.state; | ||
|
||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
/** | ||
* @param <K> | ||
* @param <V> | ||
* @author arthur.liang | ||
*/ | ||
public interface IStreamState<K, V> { | ||
|
||
/** | ||
* Returns the value to which the specified key is mapped, or {@code null} if this map contains no mapping for the | ||
* key. | ||
* | ||
* @param key | ||
* @return value | ||
*/ | ||
V get(K key); | ||
|
||
/** | ||
* Returns all values to which all specified keys is mapped | ||
* | ||
* @param key | ||
* @return | ||
*/ | ||
Map<K, V> getAll(List<K> key); | ||
|
||
/** | ||
* Associates the specified value with the specified key in this map (optional operation). If the map previously | ||
* contained a mapping for the key, the old value is replaced by the specified value. | ||
* | ||
* @param key | ||
* @param value | ||
* @return | ||
*/ | ||
V put(K key, V value); | ||
|
||
/** | ||
* Associates the specified value with the specified key in this map (optional operation). If the map previously | ||
* contained a mapping for the key, the old value will not be replaced by the specified value. | ||
* | ||
* @param key | ||
* @param value | ||
* @return | ||
*/ | ||
V putIfAbsent(K key, V value); | ||
|
||
/** | ||
* Removes the mapping for a key from this map if it is present (optional operation). | ||
* | ||
* @param key | ||
* @return | ||
*/ | ||
V remove(K key); | ||
|
||
/** | ||
* Removes the mapping for all keys from this map if it is present (optional operation). | ||
* | ||
* @param keys | ||
*/ | ||
void removeAll(List<K> keys); | ||
|
||
// Bulk Operations | ||
|
||
/** | ||
* Copies all of the mappings from the specified map to this map (optional operation). | ||
* | ||
* @param map | ||
*/ | ||
void putAll(Map<? extends K, ? extends V> map); | ||
|
||
/** | ||
* Removes all of the mappings from this map (optional operation). The map will be empty after this call returns. | ||
*/ | ||
void clear(); | ||
|
||
/** | ||
* Returns a {@link Set} view of the keys contained in this map. | ||
* | ||
* @return a set view of the keys contained in this map | ||
*/ | ||
Iterator<K> keyIterator(); | ||
|
||
/** | ||
* Returns a {@link Set} view of the mappings contained in this map. | ||
* | ||
* @return a set view of the mappings contained in this map | ||
*/ | ||
Iterator<Map.Entry<K, V>> entryIterator(); | ||
|
||
/** | ||
* Returns a {@link Set} view of the specified prefix mappings contained in this map. | ||
* | ||
* @param prefix | ||
* @returna set view of the mappings contained in this map | ||
*/ | ||
Iterator<Map.Entry<K, V>> entryIterator(String prefix); | ||
|
||
} |
Oops, something went wrong.