diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java index 14aa2d2f1063..80483b26404a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java @@ -80,6 +80,7 @@ import org.apache.pinot.core.query.utils.idset.IdSets; import org.apache.pinot.segment.local.customobject.AvgPair; import org.apache.pinot.segment.local.customobject.CovarianceTuple; +import org.apache.pinot.segment.local.customobject.CpcSketchAccumulator; import org.apache.pinot.segment.local.customobject.DoubleLongPair; import org.apache.pinot.segment.local.customobject.FloatLongPair; import org.apache.pinot.segment.local.customobject.IntLongPair; @@ -89,6 +90,7 @@ import org.apache.pinot.segment.local.customobject.QuantileDigest; import org.apache.pinot.segment.local.customobject.StringLongPair; import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator; +import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator; import org.apache.pinot.segment.local.customobject.VarianceTuple; import org.apache.pinot.segment.local.utils.GeometrySerializer; import org.apache.pinot.spi.utils.BigDecimalUtils; @@ -156,7 +158,9 @@ public enum ObjectType { FloatArrayList(44), StringArrayList(45), UltraLogLog(46), - ThetaSketchAccumulator(47); + ThetaSketchAccumulator(47), + TupleIntSketchAccumulator(48), + CpcSketchAccumulator(49); private final int _value; @@ -277,6 +281,10 @@ public static ObjectType getObjectType(Object value) { return ObjectType.UltraLogLog; } else if (value instanceof ThetaSketchAccumulator) { return ObjectType.ThetaSketchAccumulator; + } else if (value instanceof TupleIntSketchAccumulator) { + return ObjectType.TupleIntSketchAccumulator; + } else if (value instanceof CpcSketchAccumulator) { + return ObjectType.CpcSketchAccumulator; } else { throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName()); } @@ -1587,7 +1595,7 @@ public UltraLogLog deserialize(ByteBuffer byteBuffer) { } }; - public static final ObjectSerDe DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE = + public static final ObjectSerDe DATA_SKETCH_THETA_ACCUMULATOR_SER_DE = new ObjectSerDe() { @Override @@ -1614,6 +1622,62 @@ public ThetaSketchAccumulator deserialize(ByteBuffer byteBuffer) { } }; + public static final ObjectSerDe DATA_SKETCH_INT_TUPLE_ACCUMULATOR_SER_DE = + new ObjectSerDe() { + + @Override + public byte[] serialize(TupleIntSketchAccumulator tupleIntSketchBuffer) { + org.apache.datasketches.tuple.Sketch sketch = tupleIntSketchBuffer.getResult(); + return sketch.toByteArray(); + } + + @Override + public TupleIntSketchAccumulator deserialize(byte[] bytes) { + return deserialize(ByteBuffer.wrap(bytes)); + } + + // Note: The accumulator is designed to serialize as a sketch and should + // not be deserialized in practice. + @Override + public TupleIntSketchAccumulator deserialize(ByteBuffer byteBuffer) { + TupleIntSketchAccumulator tupleIntSketchAccumulator = new TupleIntSketchAccumulator(); + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + org.apache.datasketches.tuple.Sketch sketch = + org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes), + new IntegerSummaryDeserializer()); + tupleIntSketchAccumulator.apply(sketch); + return tupleIntSketchAccumulator; + } + }; + + public static final ObjectSerDe DATA_SKETCH_CPC_ACCUMULATOR_SER_DE = + new ObjectSerDe() { + + @Override + public byte[] serialize(CpcSketchAccumulator cpcSketchBuffer) { + CpcSketch sketch = cpcSketchBuffer.getResult(); + return sketch.toByteArray(); + } + + @Override + public CpcSketchAccumulator deserialize(byte[] bytes) { + return deserialize(ByteBuffer.wrap(bytes)); + } + + // Note: The accumulator is designed to serialize as a sketch and should + // not be deserialized in practice. + @Override + public CpcSketchAccumulator deserialize(ByteBuffer byteBuffer) { + CpcSketchAccumulator cpcSketchAccumulator = new CpcSketchAccumulator(); + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + CpcSketch sketch = CpcSketch.heapify(Memory.wrap(bytes)); + cpcSketchAccumulator.apply(sketch); + return cpcSketchAccumulator; + } + }; + // NOTE: DO NOT change the order, it has to be the same order as the ObjectType //@formatter:off private static final ObjectSerDe[] SER_DES = { @@ -1664,7 +1728,9 @@ public ThetaSketchAccumulator deserialize(ByteBuffer byteBuffer) { FLOAT_ARRAY_LIST_SER_DE, STRING_ARRAY_LIST_SER_DE, ULTRA_LOG_LOG_OBJECT_SER_DE, - DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE, + DATA_SKETCH_THETA_ACCUMULATOR_SER_DE, + DATA_SKETCH_INT_TUPLE_ACCUMULATOR_SER_DE, + DATA_SKETCH_CPC_ACCUMULATOR_SER_DE, }; //@formatter:on diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java index 3b3718dba2b8..16fd6751b2d9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java @@ -19,12 +19,12 @@ package org.apache.pinot.core.query.aggregation.function; import java.util.List; -import org.apache.datasketches.tuple.CompactSketch; +import org.apache.datasketches.tuple.Sketch; import org.apache.datasketches.tuple.TupleSketchIterator; -import org.apache.datasketches.tuple.Union; import org.apache.datasketches.tuple.aninteger.IntegerSummary; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator; import org.apache.pinot.segment.spi.AggregationFunctionType; @@ -48,22 +48,20 @@ public ColumnDataType getFinalResultColumnType() { } @Override - public Comparable extractFinalResult(List> integerSummarySketches) { - if (integerSummarySketches == null) { + public Comparable extractFinalResult(TupleIntSketchAccumulator accumulator) { + accumulator.setNominalEntries(_nominalEntries); + accumulator.setSetOperations(_setOps); + accumulator.setThreshold(_accumulatorThreshold); + Sketch result = accumulator.getResult(); + if (result.isEmpty() || result.getRetainedEntries() == 0) { + // there is nothing to average, return null return null; } - Union union = new Union<>(_entries, _setOps); - integerSummarySketches.forEach(union::union); - double retainedTotal = 0L; - CompactSketch result = union.getResult(); TupleSketchIterator summaries = result.iterator(); + double retainedTotal = 0L; while (summaries.next()) { retainedTotal += summaries.getSummary().getValue(); } - if (result.getRetainedEntries() == 0) { - // there is nothing to average, return null - return null; - } double estimate = retainedTotal / result.getRetainedEntries(); return Math.round(estimate); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java index 1946200842aa..4a33086bb881 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java @@ -21,16 +21,17 @@ import com.google.common.base.Preconditions; import java.util.List; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.datasketches.cpc.CpcSketch; -import org.apache.datasketches.cpc.CpcUnion; +import org.apache.datasketches.memory.Memory; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.BlockValSet; -import org.apache.pinot.core.common.ObjectSerDeUtils; import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.segment.local.customobject.CpcSketchAccumulator; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.spi.data.FieldSpec; @@ -80,8 +81,10 @@ */ @SuppressWarnings({"rawtypes"}) public class DistinctCountCPCSketchAggregationFunction - extends BaseSingleInputAggregationFunction { - protected final int _lgK; + extends BaseSingleInputAggregationFunction { + private static final int DEFAULT_ACCUMULATOR_THRESHOLD = 2; + protected int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD; + protected int _lgNominalEntries; public DistinctCountCPCSketchAggregationFunction(List arguments) { super(arguments.get(0)); @@ -92,9 +95,22 @@ public DistinctCountCPCSketchAggregationFunction(List argumen Preconditions.checkArgument(numExpressions <= 2, "DistinctCountCPC expects 1 or 2 arguments, got: %s", numExpressions); if (arguments.size() == 2) { - _lgK = arguments.get(1).getLiteral().getIntValue(); + ExpressionContext secondArgument = arguments.get(1); + Preconditions.checkArgument(secondArgument.getType() == ExpressionContext.Type.LITERAL, + "CPC Sketch Aggregation Function expects the second argument to be a literal (parameters)," + " but got: ", + secondArgument.getType()); + + if (secondArgument.getLiteral().getType() == FieldSpec.DataType.STRING) { + Parameters parameters = new Parameters(secondArgument.getLiteral().getStringValue()); + // Allows the user to trade-off memory usage for merge CPU; higher values use more memory + _accumulatorThreshold = parameters.getAccumulatorThreshold(); + // Nominal entries controls sketch accuracy and size + _lgNominalEntries = parameters.getLgNominalEntries(); + } else { + _lgNominalEntries = secondArgument.getLiteral().getIntValue(); + } } else { - _lgK = CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK; + _lgNominalEntries = CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK; } } @@ -123,15 +139,11 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde if (storedType == DataType.BYTES) { byte[][] bytesValues = blockValSet.getBytesValuesSV(); try { - CpcSketch cpcSketch = aggregationResultHolder.getResult(); - CpcUnion union = new CpcUnion(_lgK); - if (cpcSketch != null) { - union.update(cpcSketch); - } - for (int i = 0; i < length; i++) { - union.update(ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytesValues[i])); + CpcSketchAccumulator cpcSketchAccumulator = getAccumulator(aggregationResultHolder); + CpcSketch[] sketches = deserializeSketches(bytesValues, length); + for (CpcSketch sketch : sketches) { + cpcSketchAccumulator.apply(sketch); } - aggregationResultHolder.setValue(union.getResult()); } catch (Exception e) { throw new RuntimeException("Caught exception while merging CPC sketches", e); } @@ -182,6 +194,8 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde default: throw new IllegalStateException("Illegal data type for DISTINCT_COUNT_CPC aggregation function: " + storedType); } + CpcSketchAccumulator cpcSketchAccumulator = getAccumulator(aggregationResultHolder); + cpcSketchAccumulator.apply(cpcSketch); } @Override @@ -191,24 +205,17 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol // Treat BYTES value as serialized CPC Sketch DataType storedType = blockValSet.getValueType().getStoredType(); - if (storedType == DataType.BYTES) { + if (storedType == FieldSpec.DataType.BYTES) { byte[][] bytesValues = blockValSet.getBytesValuesSV(); try { + CpcSketch[] sketches = deserializeSketches(bytesValues, length); for (int i = 0; i < length; i++) { - CpcSketch value = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytesValues[i]); - int groupKey = groupKeyArray[i]; - CpcSketch cpcSketch = groupByResultHolder.getResult(groupKey); - if (cpcSketch != null) { - CpcUnion union = new CpcUnion(_lgK); - union.update(cpcSketch); - union.update(value); - groupByResultHolder.setValueForKey(groupKey, union.getResult()); - } else { - groupByResultHolder.setValueForKey(groupKey, value); - } + CpcSketchAccumulator cpcSketchAccumulator = getAccumulator(groupByResultHolder, groupKeyArray[i]); + CpcSketch sketch = sketches[i]; + cpcSketchAccumulator.apply(sketch); } } catch (Exception e) { - throw new RuntimeException("Caught exception while merging CPC sketches", e); + throw new RuntimeException("Caught exception while aggregating CPC Sketches", e); } return; } @@ -267,25 +274,19 @@ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResult // Treat BYTES value as serialized CPC Sketch DataType storedType = blockValSet.getValueType().getStoredType(); - if (storedType == DataType.BYTES) { + boolean singleValue = blockValSet.isSingleValue(); + + if (singleValue && storedType == DataType.BYTES) { byte[][] bytesValues = blockValSet.getBytesValuesSV(); try { + CpcSketch[] sketches = deserializeSketches(bytesValues, length); for (int i = 0; i < length; i++) { - CpcSketch value = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytesValues[i]); for (int groupKey : groupKeysArray[i]) { - CpcSketch cpcSketch = groupByResultHolder.getResult(groupKey); - if (cpcSketch != null) { - CpcUnion union = new CpcUnion(_lgK); - union.update(cpcSketch); - union.update(value); - groupByResultHolder.setValueForKey(groupKey, union.getResult()); - } else { - groupByResultHolder.setValueForKey(groupKey, value); - } + getAccumulator(groupByResultHolder, groupKey).apply(sketches[i]); } } } catch (Exception e) { - throw new RuntimeException("Caught exception while merging CPC sketches", e); + throw new RuntimeException("Caught exception while aggregating CPC sketches", e); } return; } @@ -348,51 +349,50 @@ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResult } @Override - public CpcSketch extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + public CpcSketchAccumulator extractAggregationResult(AggregationResultHolder aggregationResultHolder) { Object result = aggregationResultHolder.getResult(); if (result == null) { - return new CpcSketch(_lgK); + return new CpcSketchAccumulator(_lgNominalEntries, _accumulatorThreshold); } - if (result instanceof DictIdsWrapper) { + if (result instanceof CpcSketch) { + return convertSketchAccumulator(result); + } else if (result instanceof DictIdsWrapper) { // For dictionary-encoded expression, convert dictionary ids to CpcSketch - return convertToCpcSketch((DictIdsWrapper) result); + return convertSketchAccumulator(dictionaryToCpcSketch((DictIdsWrapper) result)); } else { - // For non-dictionary-encoded expression, directly return the CpcSketch - return (CpcSketch) result; + return (CpcSketchAccumulator) result; } } @Override - public CpcSketch extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + public CpcSketchAccumulator extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { Object result = groupByResultHolder.getResult(groupKey); if (result == null) { - return new CpcSketch(_lgK); + return new CpcSketchAccumulator(_lgNominalEntries, _accumulatorThreshold); } - if (result instanceof DictIdsWrapper) { + if (result instanceof CpcSketch) { + return convertSketchAccumulator(result); + } else if (result instanceof DictIdsWrapper) { // For dictionary-encoded expression, convert dictionary ids to CpcSketch - return convertToCpcSketch((DictIdsWrapper) result); + return convertSketchAccumulator(dictionaryToCpcSketch((DictIdsWrapper) result)); } else { - // For non-dictionary-encoded expression, directly return the CpcSketch - return (CpcSketch) result; + return (CpcSketchAccumulator) result; } } @Override - public CpcSketch merge(CpcSketch intermediateResult1, CpcSketch intermediateResult2) { - if (intermediateResult1 == null && intermediateResult2 != null) { + public CpcSketchAccumulator merge(CpcSketchAccumulator intermediateResult1, + CpcSketchAccumulator intermediateResult2) { + if (intermediateResult1 == null || intermediateResult1.isEmpty()) { return intermediateResult2; - } else if (intermediateResult1 != null && intermediateResult2 == null) { + } + if (intermediateResult2 == null || intermediateResult2.isEmpty()) { return intermediateResult1; - } else if (intermediateResult1 == null) { - return new CpcSketch(_lgK); } - - CpcUnion union = new CpcUnion(_lgK); - union.update(intermediateResult1); - union.update(intermediateResult2); - return union.getResult(); + intermediateResult1.merge(intermediateResult2); + return intermediateResult1; } @Override @@ -406,8 +406,22 @@ public DataSchema.ColumnDataType getFinalResultColumnType() { } @Override - public Comparable extractFinalResult(CpcSketch intermediateResult) { - return Math.round(intermediateResult.getEstimate()); + public Comparable extractFinalResult(CpcSketchAccumulator intermediateResult) { + intermediateResult.setLgNominalEntries(_lgNominalEntries); + intermediateResult.setThreshold(_accumulatorThreshold); + return Math.round(intermediateResult.getResult().getEstimate()); + } + + /** + * Returns the CpcSketch from the result holder or creates a new one if it does not exist. + */ + protected CpcSketch getCpcSketch(AggregationResultHolder aggregationResultHolder) { + CpcSketch cpcSketch = aggregationResultHolder.getResult(); + if (cpcSketch == null) { + cpcSketch = new CpcSketch(_lgNominalEntries); + aggregationResultHolder.setValue(cpcSketch); + } + return cpcSketch; } /** @@ -423,18 +437,6 @@ protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder aggregati return dictIdsWrapper._dictIdBitmap; } - /** - * Returns the CpcSketch from the result holder or creates a new one if it does not exist. - */ - protected CpcSketch getCpcSketch(AggregationResultHolder aggregationResultHolder) { - CpcSketch cpcSketch = aggregationResultHolder.getResult(); - if (cpcSketch == null) { - cpcSketch = new CpcSketch(_lgK); - aggregationResultHolder.setValue(cpcSketch); - } - return cpcSketch; - } - /** * Returns the dictionary id bitmap for the given group key or creates a new one if it does not exist. */ @@ -454,7 +456,7 @@ protected static RoaringBitmap getDictIdBitmap(GroupByResultHolder groupByResult protected CpcSketch getCpcSketch(GroupByResultHolder groupByResultHolder, int groupKey) { CpcSketch cpcSketch = groupByResultHolder.getResult(groupKey); if (cpcSketch == null) { - cpcSketch = new CpcSketch(_lgK); + cpcSketch = new CpcSketch(_lgNominalEntries); groupByResultHolder.setValueForKey(groupKey, cpcSketch); } return cpcSketch; @@ -470,8 +472,8 @@ private static void setDictIdForGroupKeys(GroupByResultHolder groupByResultHolde } } - private CpcSketch convertToCpcSketch(DictIdsWrapper dictIdsWrapper) { - CpcSketch cpcSketch = new CpcSketch(_lgK); + private CpcSketch dictionaryToCpcSketch(DictIdsWrapper dictIdsWrapper) { + CpcSketch cpcSketch = new CpcSketch(_lgNominalEntries); Dictionary dictionary = dictIdsWrapper._dictionary; RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap; PeekableIntIterator iterator = dictIdBitmap.getIntIterator(); @@ -528,6 +530,56 @@ private void addObjectsToSketch(Object[] rawValues, CpcSketch sketch) { } } + /** + * Returns the accumulator from the result holder or creates a new one if it does not exist. + */ + private CpcSketchAccumulator getAccumulator(AggregationResultHolder aggregationResultHolder) { + CpcSketchAccumulator accumulator = aggregationResultHolder.getResult(); + if (accumulator == null) { + accumulator = new CpcSketchAccumulator(_lgNominalEntries, _accumulatorThreshold); + aggregationResultHolder.setValue(accumulator); + } + return accumulator; + } + + /** + * Returns the accumulator for the given group key or creates a new one if it does not exist. + */ + private CpcSketchAccumulator getAccumulator(GroupByResultHolder groupByResultHolder, int groupKey) { + CpcSketchAccumulator accumulator = groupByResultHolder.getResult(groupKey); + if (accumulator == null) { + accumulator = new CpcSketchAccumulator(_lgNominalEntries, _accumulatorThreshold); + groupByResultHolder.setValueForKey(groupKey, accumulator); + } + return accumulator; + } + + /** + * Deserializes the sketches from the bytes. + */ + @SuppressWarnings({"unchecked"}) + private CpcSketch[] deserializeSketches(byte[][] serializedSketches, int length) { + CpcSketch[] sketches = new CpcSketch[length]; + for (int i = 0; i < length; i++) { + sketches[i] = CpcSketch.heapify(Memory.wrap(serializedSketches[i])); + } + return sketches; + } + + // This ensures backward compatibility with servers that still return sketches directly. + // The AggregationDataTableReducer casts intermediate results to Objects and although the code compiles, + // types might still be incompatible at runtime due to type erasure. + // Due to performance overheads of redundant casts, this should be removed at some future point. + protected CpcSketchAccumulator convertSketchAccumulator(Object result) { + if (result instanceof CpcSketch) { + CpcSketch sketch = (CpcSketch) result; + CpcSketchAccumulator accumulator = new CpcSketchAccumulator(_lgNominalEntries, _accumulatorThreshold); + accumulator.apply(sketch); + return accumulator; + } + return (CpcSketchAccumulator) result; + } + private static final class DictIdsWrapper { final Dictionary _dictionary; final RoaringBitmap _dictIdBitmap; @@ -537,4 +589,44 @@ private DictIdsWrapper(Dictionary dictionary) { _dictIdBitmap = new RoaringBitmap(); } } + + /** + * Helper class to wrap the CpcSketch parameters. The initial values for the parameters are set to the + * same defaults in the Apache Datasketches library. + */ + private static class Parameters { + private static final char PARAMETER_DELIMITER = ';'; + private static final char PARAMETER_KEY_VALUE_SEPARATOR = '='; + private static final String NOMINAL_ENTRIES_KEY = "nominalEntries"; + private static final String ACCUMULATOR_THRESHOLD_KEY = "accumulatorThreshold"; + + private int _nominalEntries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK); + private int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD; + + Parameters(String parametersString) { + StringUtils.deleteWhitespace(parametersString); + String[] keyValuePairs = StringUtils.split(parametersString, PARAMETER_DELIMITER); + for (String keyValuePair : keyValuePairs) { + String[] keyAndValue = StringUtils.split(keyValuePair, PARAMETER_KEY_VALUE_SEPARATOR); + Preconditions.checkArgument(keyAndValue.length == 2, "Invalid parameter: %s", keyValuePair); + String key = keyAndValue[0]; + String value = keyAndValue[1]; + if (key.equalsIgnoreCase(NOMINAL_ENTRIES_KEY)) { + _nominalEntries = Integer.parseInt(value); + } else if (key.equalsIgnoreCase(ACCUMULATOR_THRESHOLD_KEY)) { + _accumulatorThreshold = Integer.parseInt(value); + } else { + throw new IllegalArgumentException("Invalid parameter key: " + key); + } + } + } + + int getLgNominalEntries() { + return org.apache.datasketches.common.Util.exactLog2OfInt(_nominalEntries); + } + + int getAccumulatorThreshold() { + return _accumulatorThreshold; + } + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java index 087337472d6a..68ec18e4011a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java @@ -19,11 +19,10 @@ package org.apache.pinot.core.query.aggregation.function; import java.util.List; -import org.apache.datasketches.tuple.CompactSketch; -import org.apache.datasketches.tuple.Union; import org.apache.datasketches.tuple.aninteger.IntegerSummary; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator; import org.apache.pinot.segment.spi.AggregationFunctionType; @@ -46,9 +45,10 @@ public ColumnDataType getFinalResultColumnType() { } @Override - public Comparable extractFinalResult(List> integerSummarySketches) { - Union union = new Union<>(_entries, _setOps); - integerSummarySketches.forEach(union::union); - return Double.valueOf(union.getResult().getEstimate()).longValue(); + public Comparable extractFinalResult(TupleIntSketchAccumulator accumulator) { + accumulator.setNominalEntries(_nominalEntries); + accumulator.setSetOperations(_setOps); + accumulator.setThreshold(_accumulatorThreshold); + return Double.valueOf(accumulator.getResult().getEstimate()).longValue(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java index ab153c8835ff..ff3a587881ca 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java @@ -19,9 +19,9 @@ package org.apache.pinot.core.query.aggregation.function; import java.util.List; -import org.apache.datasketches.cpc.CpcSketch; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.segment.local.customobject.CpcSketchAccumulator; import org.apache.pinot.segment.local.customobject.SerializedCPCSketch; import org.apache.pinot.segment.spi.AggregationFunctionType; @@ -47,7 +47,9 @@ public ColumnDataType getFinalResultColumnType() { } @Override - public SerializedCPCSketch extractFinalResult(CpcSketch sketch) { - return new SerializedCPCSketch(sketch); + public SerializedCPCSketch extractFinalResult(CpcSketchAccumulator intermediateResult) { + intermediateResult.setLgNominalEntries(_lgNominalEntries); + intermediateResult.setThreshold(_accumulatorThreshold); + return new SerializedCPCSketch(intermediateResult.getResult()); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java index 1fdace955c01..992ef5d7a16f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java @@ -19,41 +19,93 @@ package org.apache.pinot.core.query.aggregation.function; import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Base64; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import org.apache.datasketches.tuple.CompactSketch; +import org.apache.commons.lang3.StringUtils; +import org.apache.datasketches.memory.Memory; import org.apache.datasketches.tuple.Sketch; -import org.apache.datasketches.tuple.Union; +import org.apache.datasketches.tuple.Sketches; import org.apache.datasketches.tuple.aninteger.IntegerSummary; +import org.apache.datasketches.tuple.aninteger.IntegerSummaryDeserializer; import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.BlockValSet; -import org.apache.pinot.core.common.ObjectSerDeUtils; import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.utils.CommonConstants; -/*** - * This is the base class for all Integer Tuple Sketch aggregations +/** + * The {@code IntegerTupleSketchAggregationFunction} is the base class for all integer-based Tuple Sketch aggregations. + * Apache Datasketches Tuple Sketches are an extension of the Apache Datasketches Theta Sketch. Tuple sketches store an + * additional summary value with each retained entry which makes the sketch ideal for summarizing attributes + * such as impressions or clicks. + * + * Tuple sketches are interoperable with the Theta Sketch and enable set operations over a stream of data, and can + * also be used for cardinality estimation. + * + * Note: The current implementation of this aggregation function is limited to binary columns that contain sketches + * built outside of Pinot. * - * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more + * Usage examples: + *
    + *
  • + * Simple union (1 or 2 arguments): main expression to aggregate on, followed by an optional Tuple sketch size + * argument. The second argument is the sketch lgK – the given log_base2 of k, and defaults to 16. + * The "raw" equivalents return serialised sketches in base64-encoded strings. + *

    DISTINCT_COUNT_TUPLE_SKETCH(col)

    + *

    DISTINCT_COUNT_TUPLE_SKETCH(col, 12)

    + *

    DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col)

    + *

    DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col, 12)

    + *
  • + * Extracting a cardinality estimate from a CPC sketch: + *

    GET_INT_TUPLE_SKETCH_ESTIMATE(sketch_bytes)

    + *

    GET_INT_TUPLE_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_TUPLE_SKETCH(col))

    + *
  • + *
  • + * Union between two sketches summaries are merged using addition for hash keys in common: + *

    + * INT_SUM_TUPLE_SKETCH_UNION( + * DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col1), + * DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col2) + * ) + *

    + *
  • + *
  • + * Union between two sketches summaries are merged using maximum for hash keys in common: + *

    + * INT_MAX_TUPLE_SKETCH_UNION( + * DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col1), + * DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col2) + * ) + *

    + *
  • + *
  • + * Union between two sketches summaries are merged using minimum for hash keys in common: + *

    + * INT_MIN_TUPLE_SKETCH_UNION( + * DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col1), + * DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(col2) + * ) + *

    + *
  • + *
*/ +@SuppressWarnings({"rawtypes"}) public class IntegerTupleSketchAggregationFunction - extends BaseSingleInputAggregationFunction>, Comparable> { + extends BaseSingleInputAggregationFunction { + private static final int DEFAULT_ACCUMULATOR_THRESHOLD = 2; final ExpressionContext _expressionContext; final IntegerSummarySetOperations _setOps; - final int _entries; + protected int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD; + protected int _nominalEntries; public IntegerTupleSketchAggregationFunction(List arguments, IntegerSummary.Mode mode) { super(arguments.get(0)); @@ -65,11 +117,20 @@ public IntegerTupleSketchAggregationFunction(List arguments, if (arguments.size() == 2) { ExpressionContext secondArgument = arguments.get(1); Preconditions.checkArgument(secondArgument.getType() == ExpressionContext.Type.LITERAL, - "Tuple Sketch Aggregation Function expects the second argument to be a literal (number of entries to keep)," - + " but got: ", secondArgument.getType()); - _entries = secondArgument.getLiteral().getIntValue(); + "Tuple Sketch Aggregation Function expects the second argument to be a literal (parameters)," + " but got: ", + secondArgument.getType()); + + if (secondArgument.getLiteral().getType() == FieldSpec.DataType.STRING) { + Parameters parameters = new Parameters(secondArgument.getLiteral().getStringValue()); + // Allows the user to trade-off memory usage for merge CPU; higher values use more memory + _accumulatorThreshold = parameters.getAccumulatorThreshold(); + // Nominal entries controls sketch accuracy and size + _nominalEntries = parameters.getNominalEntries(); + } else { + _nominalEntries = secondArgument.getLiteral().getIntValue(); + } } else { - _entries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK); + _nominalEntries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK); } } @@ -99,20 +160,13 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde if (storedType == FieldSpec.DataType.BYTES) { byte[][] bytesValues = blockValSet.getBytesValuesSV(); try { - List> integerSketch = aggregationResultHolder.getResult(); - if (integerSketch != null) { - List> sketches = - Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize) - .map(Sketch::compact).collect(Collectors.toList()); - aggregationResultHolder.setValue(merge(aggregationResultHolder.getResult(), sketches)); - } else { - List> sketches = - Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize) - .map(Sketch::compact).collect(Collectors.toList()); - aggregationResultHolder.setValue(sketches); + TupleIntSketchAccumulator tupleIntSketchAccumulator = getAccumulator(aggregationResultHolder); + Sketch[] sketches = deserializeSketches(bytesValues, length); + for (Sketch sketch : sketches) { + tupleIntSketchAccumulator.apply(sketch); } } catch (Exception e) { - throw new RuntimeException("Caught exception while merging Tuple Sketches", e); + throw new RuntimeException("Caught exception while aggregating Tuple Sketches", e); } } else { throw new IllegalStateException("Illegal data type for " + getType() + " aggregation function: " + storedType); @@ -131,21 +185,14 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol if (storedType == FieldSpec.DataType.BYTES) { byte[][] bytesValues = blockValSet.getBytesValuesSV(); try { + Sketch[] sketches = deserializeSketches(bytesValues, length); for (int i = 0; i < length; i++) { - byte[] value = bytesValues[i]; - int groupKey = groupKeyArray[i]; - CompactSketch newSketch = - ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact(); - if (groupByResultHolder.getResult(groupKey) == null) { - ArrayList> newList = new ArrayList<>(); - newList.add(newSketch); - groupByResultHolder.setValueForKey(groupKey, newList); - } else { - groupByResultHolder.>>getResult(groupKey).add(newSketch); - } + TupleIntSketchAccumulator tupleIntSketchAccumulator = getAccumulator(groupByResultHolder, groupKeyArray[i]); + Sketch sketch = sketches[i]; + tupleIntSketchAccumulator.apply(sketch); } } catch (Exception e) { - throw new RuntimeException("Caught exception while merging Tuple Sketches", e); + throw new RuntimeException("Caught exception while aggregating Tuple Sketches", e); } } else { throw new IllegalStateException( @@ -156,47 +203,55 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol @Override public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, Map blockValSetMap) { - byte[][] valueArray = blockValSetMap.get(_expression).getBytesValuesSV(); - for (int i = 0; i < length; i++) { - byte[] value = valueArray[i]; - CompactSketch newSketch = - ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact(); - for (int groupKey : groupKeysArray[i]) { - if (groupByResultHolder.getResult(groupKey) == null) { - groupByResultHolder.setValueForKey(groupKey, Collections.singletonList(newSketch)); - } else { - groupByResultHolder.>>getResult(groupKey).add(newSketch); + BlockValSet blockValSet = blockValSetMap.get(_expression); + + // Treat BYTES value as serialized Integer Tuple Sketch + FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType(); + boolean singleValue = blockValSet.isSingleValue(); + + if (singleValue && storedType == FieldSpec.DataType.BYTES) { + byte[][] bytesValues = blockValSetMap.get(_expression).getBytesValuesSV(); + try { + Sketch[] sketches = deserializeSketches(bytesValues, length); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + getAccumulator(groupByResultHolder, groupKey).apply(sketches[i]); + } } + } catch (Exception e) { + throw new RuntimeException("Caught exception while aggregating Tuple Sketches", e); } + } else { + throw new IllegalStateException( + "Illegal data type for INTEGER_TUPLE_SKETCH_UNION aggregation function: " + storedType); } } @Override - public List> extractAggregationResult(AggregationResultHolder aggregationResultHolder) { - return aggregationResultHolder.getResult(); + public TupleIntSketchAccumulator extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + TupleIntSketchAccumulator result = aggregationResultHolder.getResult(); + if (result == null) { + return new TupleIntSketchAccumulator(_setOps, _nominalEntries, _accumulatorThreshold); + } + return result; } @Override - public List> extractGroupByResult(GroupByResultHolder groupByResultHolder, - int groupKey) { + public TupleIntSketchAccumulator extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { return groupByResultHolder.getResult(groupKey); } @Override - public List> merge(List> intermediateResult1, - List> intermediateResult2) { - if (intermediateResult1 == null && intermediateResult2 != null) { + public TupleIntSketchAccumulator merge(TupleIntSketchAccumulator intermediateResult1, + TupleIntSketchAccumulator intermediateResult2) { + if (intermediateResult1 == null || intermediateResult1.isEmpty()) { return intermediateResult2; - } else if (intermediateResult1 != null && intermediateResult2 == null) { + } + if (intermediateResult2 == null || intermediateResult2.isEmpty()) { return intermediateResult1; - } else if (intermediateResult1 == null && intermediateResult2 == null) { - return new ArrayList<>(0); } - ArrayList> merged = - new ArrayList<>(intermediateResult1.size() + intermediateResult2.size()); - merged.addAll(intermediateResult1); - merged.addAll(intermediateResult2); - return merged; + intermediateResult1.merge(intermediateResult2); + return intermediateResult1; } @Override @@ -210,12 +265,86 @@ public ColumnDataType getFinalResultColumnType() { } @Override - public Comparable extractFinalResult(List> integerSummarySketches) { - if (integerSummarySketches == null) { - return null; + public Comparable extractFinalResult(TupleIntSketchAccumulator accumulator) { + accumulator.setNominalEntries(_nominalEntries); + accumulator.setSetOperations(_setOps); + accumulator.setThreshold(_accumulatorThreshold); + return Base64.getEncoder().encodeToString(accumulator.getResult().toByteArray()); + } + + /** + * Returns the accumulator from the result holder or creates a new one if it does not exist. + */ + private TupleIntSketchAccumulator getAccumulator(AggregationResultHolder aggregationResultHolder) { + TupleIntSketchAccumulator accumulator = aggregationResultHolder.getResult(); + if (accumulator == null) { + accumulator = new TupleIntSketchAccumulator(_setOps, _nominalEntries, _accumulatorThreshold); + aggregationResultHolder.setValue(accumulator); + } + return accumulator; + } + + /** + * Returns the accumulator for the given group key or creates a new one if it does not exist. + */ + private TupleIntSketchAccumulator getAccumulator(GroupByResultHolder groupByResultHolder, int groupKey) { + TupleIntSketchAccumulator accumulator = groupByResultHolder.getResult(groupKey); + if (accumulator == null) { + accumulator = new TupleIntSketchAccumulator(_setOps, _nominalEntries, _accumulatorThreshold); + groupByResultHolder.setValueForKey(groupKey, accumulator); + } + return accumulator; + } + + /** + * Deserializes the sketches from the bytes. + */ + @SuppressWarnings({"unchecked"}) + private Sketch[] deserializeSketches(byte[][] serializedSketches, int length) { + Sketch[] sketches = new Sketch[length]; + for (int i = 0; i < length; i++) { + sketches[i] = Sketches.heapifySketch(Memory.wrap(serializedSketches[i]), new IntegerSummaryDeserializer()); + } + return sketches; + } + + /** + * Helper class to wrap the tuple-sketch parameters. The initial values for the parameters are set to the + * same defaults in the Apache Datasketches library. + */ + private static class Parameters { + private static final char PARAMETER_DELIMITER = ';'; + private static final char PARAMETER_KEY_VALUE_SEPARATOR = '='; + private static final String NOMINAL_ENTRIES_KEY = "nominalEntries"; + private static final String ACCUMULATOR_THRESHOLD_KEY = "accumulatorThreshold"; + + private int _nominalEntries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK); + private int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD; + + Parameters(String parametersString) { + StringUtils.deleteWhitespace(parametersString); + String[] keyValuePairs = StringUtils.split(parametersString, PARAMETER_DELIMITER); + for (String keyValuePair : keyValuePairs) { + String[] keyAndValue = StringUtils.split(keyValuePair, PARAMETER_KEY_VALUE_SEPARATOR); + Preconditions.checkArgument(keyAndValue.length == 2, "Invalid parameter: %s", keyValuePair); + String key = keyAndValue[0]; + String value = keyAndValue[1]; + if (key.equalsIgnoreCase(NOMINAL_ENTRIES_KEY)) { + _nominalEntries = Integer.parseInt(value); + } else if (key.equalsIgnoreCase(ACCUMULATOR_THRESHOLD_KEY)) { + _accumulatorThreshold = Integer.parseInt(value); + } else { + throw new IllegalArgumentException("Invalid parameter key: " + key); + } + } + } + + int getNominalEntries() { + return _nominalEntries; + } + + int getAccumulatorThreshold() { + return _accumulatorThreshold; } - Union union = new Union<>(_entries, _setOps); - integerSummarySketches.forEach(union::union); - return Base64.getEncoder().encodeToString(union.getResult().toByteArray()); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java index 33f746a1da22..d37854b1b0af 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java @@ -19,12 +19,12 @@ package org.apache.pinot.core.query.aggregation.function; import java.util.List; -import org.apache.datasketches.tuple.CompactSketch; +import org.apache.datasketches.tuple.Sketch; import org.apache.datasketches.tuple.TupleSketchIterator; -import org.apache.datasketches.tuple.Union; import org.apache.datasketches.tuple.aninteger.IntegerSummary; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator; import org.apache.pinot.segment.spi.AggregationFunctionType; @@ -46,14 +46,12 @@ public ColumnDataType getFinalResultColumnType() { } @Override - public Comparable extractFinalResult(List> integerSummarySketches) { - if (integerSummarySketches == null) { - return null; - } - Union union = new Union<>(_entries, _setOps); - integerSummarySketches.forEach(union::union); + public Comparable extractFinalResult(TupleIntSketchAccumulator accumulator) { double retainedTotal = 0L; - CompactSketch result = union.getResult(); + accumulator.setNominalEntries(_nominalEntries); + accumulator.setSetOperations(_setOps); + accumulator.setThreshold(_accumulatorThreshold); + Sketch result = accumulator.getResult(); TupleSketchIterator summaries = result.iterator(); while (summaries.next()) { retainedTotal += summaries.getSummary().getValue(); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java index 01c39b0105f7..b397bd151cf2 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java @@ -43,9 +43,13 @@ import org.apache.datasketches.theta.Sketch; import org.apache.datasketches.theta.Sketches; import org.apache.datasketches.theta.UpdateSketch; +import org.apache.datasketches.tuple.aninteger.IntegerSketch; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; +import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations; import org.apache.pinot.core.query.aggregation.function.PercentileEstAggregationFunction; import org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction; import org.apache.pinot.segment.local.customobject.AvgPair; +import org.apache.pinot.segment.local.customobject.CpcSketchAccumulator; import org.apache.pinot.segment.local.customobject.DoubleLongPair; import org.apache.pinot.segment.local.customobject.FloatLongPair; import org.apache.pinot.segment.local.customobject.IntLongPair; @@ -54,6 +58,7 @@ import org.apache.pinot.segment.local.customobject.QuantileDigest; import org.apache.pinot.segment.local.customobject.StringLongPair; import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator; +import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator; import org.apache.pinot.segment.local.customobject.ValueLongPair; import org.apache.pinot.segment.local.utils.UltraLogLogUtils; import org.testng.annotations.Test; @@ -522,4 +527,53 @@ public void testThetaSketchAccumulator() { assertEquals(actual.getResult().toByteArray(), sketch.toByteArray(), ERROR_MESSAGE); } } + + @Test + public void testTupleIntSketchAccumulator() { + for (int i = 0; i < NUM_ITERATIONS; i++) { + int lgK = 4; + int size = RANDOM.nextInt(100) + 10; + IntegerSketch input = new IntegerSketch(lgK, IntegerSummary.Mode.Sum); + + for (int j = 0; j < size; j++) { + input.update(j, RANDOM.nextInt(100)); + } + + IntegerSummarySetOperations setOps = + new IntegerSummarySetOperations(IntegerSummary.Mode.Sum, IntegerSummary.Mode.Sum); + TupleIntSketchAccumulator accumulator = new TupleIntSketchAccumulator(setOps, (int) Math.pow(2, lgK), 2); + org.apache.datasketches.tuple.Sketch sketch = input.compact(); + accumulator.apply(sketch); + + byte[] bytes = ObjectSerDeUtils.serialize(accumulator); + TupleIntSketchAccumulator actual = + ObjectSerDeUtils.deserialize(bytes, ObjectSerDeUtils.ObjectType.TupleIntSketchAccumulator); + + assertEquals(actual.getResult().getEstimate(), sketch.getEstimate(), ERROR_MESSAGE); + assertEquals(actual.getResult().toByteArray(), sketch.toByteArray(), ERROR_MESSAGE); + } + } + + @Test + public void testCpcSketchAccumulator() { + int lgK = 4; + for (int i = 0; i < NUM_ITERATIONS; i++) { + int size = RANDOM.nextInt(100) + 10; + CpcSketch sketch = new CpcSketch(lgK); + + for (int j = 0; j < size; j++) { + sketch.update(j); + } + + CpcSketchAccumulator accumulator = new CpcSketchAccumulator(lgK, 2); + accumulator.apply(sketch); + + byte[] bytes = ObjectSerDeUtils.serialize(accumulator); + CpcSketchAccumulator actual = + ObjectSerDeUtils.deserialize(bytes, ObjectSerDeUtils.ObjectType.CpcSketchAccumulator); + + assertEquals(actual.getResult().getEstimate(), sketch.getEstimate(), ERROR_MESSAGE); + assertEquals(actual.getResult().toByteArray(), sketch.toByteArray(), ERROR_MESSAGE); + } + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulator.java new file mode 100644 index 000000000000..7d24da87cd2c --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulator.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.customobject; + +import javax.annotation.Nonnull; +import org.apache.datasketches.cpc.CpcSketch; +import org.apache.datasketches.cpc.CpcUnion; + + +/** + * Intermediate state used by {@code DistinctCountCPCSketchAggregationFunction} which gives + * the end user more control over how sketches are merged for performance. + * The end user can set parameters that trade-off more memory usage for more pre-aggregation. + */ +public class CpcSketchAccumulator extends CustomObjectAccumulator { + private int _lgNominalEntries = 4; + private CpcUnion _union; + + public CpcSketchAccumulator() { + } + + // Note: The accumulator is serialized as a sketch. This means that the majority of the processing + // happens on serialization. Therefore, when deserialized, the values may be null and will + // require re-initialisation. Since the primary use case is at query time for the Broker + // and Server, these properties are already in memory and are re-set. + public CpcSketchAccumulator(int lgNominalEntries, int threshold) { + super(threshold); + _lgNominalEntries = lgNominalEntries; + } + + public void setLgNominalEntries(int lgNominalEntries) { + _lgNominalEntries = lgNominalEntries; + } + + @Nonnull + @Override + public CpcSketch getResult() { + return unionAll(); + } + + private CpcSketch unionAll() { + if (_union == null) { + _union = new CpcUnion(_lgNominalEntries); + } + // Return the default update "gadget" sketch as a compact sketch + if (isEmpty()) { + return _union.getResult(); + } + // Corner-case: the parameters are not strictly respected when there is a single sketch. + // This single sketch might have been the result of a previously accumulated union and + // would already have the parameters set. The sketch is returned as-is without adjusting + // nominal entries which requires an additional union operation. + if (getNumInputs() == 1) { + return _accumulator.get(0); + } + + for (CpcSketch accumulatedSketch : _accumulator) { + _union.update(accumulatedSketch); + } + + return _union.getResult(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CustomObjectAccumulator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CustomObjectAccumulator.java new file mode 100644 index 000000000000..3e90695bec2a --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/CustomObjectAccumulator.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.customobject; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import javax.annotation.Nonnull; + + +/** + * Intermediate state used by some aggregation functions which gives the end user more control over how custom objects + * are merged for performance reasons. Some custom objects such as DataSketches have better merge performance when + * more than two items are merged at once through the elimination of intermediate bookkeeping overheads. + * + * The end user can set a value for the "threshold" parameter that defers the merge operation until at least as many + * items are ready to be merged, or the callee forces the merge directly via "getResult" - e.g. on serialization. + * This data structure trades-off more memory usage for a greater degree of pre-aggregation in the accumulator state. + */ +public abstract class CustomObjectAccumulator { + protected ArrayList _accumulator; + private int _threshold; + private int _numInputs = 0; + + public CustomObjectAccumulator() { + this(2); + } + + public CustomObjectAccumulator(int threshold) { + setThreshold(threshold); + } + + /** + * Sets the threshold that determines how much memory to use for the internal accumulator state before + * the intermediate state is merged. + * @param threshold the threshold [> 0]. + */ + public void setThreshold(int threshold) { + Preconditions.checkArgument(threshold > 0, "Invalid threshold: %s, must be positive", threshold); + _threshold = threshold; + } + + /** + * Returns the configured threshold for this accumulator. + */ + public int getThreshold() { + return _threshold; + } + + /** + * Returns the number of inputs that have been added to the accumulator state. + */ + public int getNumInputs() { + return _numInputs; + } + + /** + * Returns true if no inputs have been added to the accumulator state. + */ + public boolean isEmpty() { + return _numInputs == 0; + } + + @Nonnull + /** + * Forces the item T in internal state to be merged with all pending items in the accumulator state + * and returns the result. This should not result in the accumulator state being updated or cleared. + * @return T result of the merge. + */ + public abstract T getResult(); + + /** + * Merges another accumulator with this one, by extracting the result from "other". + * @param other the custom object accumulator to merge. + */ + public void merge(CustomObjectAccumulator other) { + if (other.isEmpty()) { + return; + } + T result = other.getResult(); + applyInternal(result); + } + + /** + * Adds a new item to the accumulator state. If the accumulator state is equal to the threshold value, + * the internal state is updated and the accumulator state is cleared. + * @param item the item to add to the accumulator state, cannot be null. + */ + public void apply(T item) { + Preconditions.checkNotNull(item); + applyInternal(item); + } + + private void applyInternal(T item) { + if (_accumulator == null) { + _accumulator = new ArrayList<>(_threshold); + } + _accumulator.add(item); + _numInputs += 1; + + if (_accumulator.size() >= _threshold) { + getResult(); + _accumulator.clear(); + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java index c9554ce9bfb9..5e2219f12da4 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.segment.local.customobject; -import java.util.ArrayList; import java.util.Comparator; import javax.annotation.Nonnull; import org.apache.datasketches.theta.SetOperationBuilder; @@ -29,19 +28,15 @@ /** * Intermediate state used by {@code DistinctCountThetaSketchAggregationFunction} which gives * the end user more control over how sketches are merged for performance. - * The end user can set parameters that trade-off more memory usage for more pre-aggregation. - * This permits use of the Union "early-stop" optimisation where ordered sketches require no further + * In particular, the Theta Sketch Union "early-stop" optimisation can be used - ordered sketches require no further * processing beyond the minimum Theta value. * The union operation initialises an empty "gadget" bookkeeping sketch that is updated with hashed entries - * that fall below the minimum Theta value for all input sketches ("Broder Rule"). When the initial - * Theta value is set to the minimum immediately, further gains can be realised. + * that fall below the minimum Theta value for all input sketches ("Broder Rule"). When the initial Theta value is + * set to the minimum immediately, further gains can be realised. */ -public class ThetaSketchAccumulator { - private ArrayList _accumulator; +public class ThetaSketchAccumulator extends CustomObjectAccumulator { private SetOperationBuilder _setOperationBuilder = new SetOperationBuilder(); private Union _union; - private int _threshold; - private int _numInputs = 0; public ThetaSketchAccumulator() { } @@ -51,54 +46,20 @@ public ThetaSketchAccumulator() { // require re-initialisation. Since the primary use case is at query time for the Broker // and Server, these properties are already in memory and are re-set. public ThetaSketchAccumulator(SetOperationBuilder setOperationBuilder, int threshold) { + super(threshold); _setOperationBuilder = setOperationBuilder; - _threshold = threshold; } public void setSetOperationBuilder(SetOperationBuilder setOperationBuilder) { _setOperationBuilder = setOperationBuilder; } - public void setThreshold(int threshold) { - _threshold = threshold; - } - - public boolean isEmpty() { - return _numInputs == 0; - } - @Nonnull + @Override public Sketch getResult() { return unionAll(); } - public void apply(Sketch sketch) { - internalAdd(sketch); - } - - public void merge(ThetaSketchAccumulator thetaUnion) { - if (thetaUnion.isEmpty()) { - return; - } - Sketch sketch = thetaUnion.getResult(); - internalAdd(sketch); - } - - private void internalAdd(Sketch sketch) { - if (sketch.isEmpty()) { - return; - } - if (_accumulator == null) { - _accumulator = new ArrayList<>(_threshold); - } - _accumulator.add(sketch); - _numInputs += 1; - - if (_accumulator.size() >= _threshold) { - unionAll(); - } - } - private Sketch unionAll() { if (_union == null) { _union = _setOperationBuilder.buildUnion(); @@ -111,7 +72,7 @@ private Sketch unionAll() { // This single sketch might have been the result of a previously accumulated union and // would already have the parameters set. The sketch is returned as-is without adjusting // nominal entries which requires an additional union operation. - if (_numInputs == 1) { + if (getNumInputs() == 1) { return _accumulator.get(0); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulator.java new file mode 100644 index 000000000000..5a24324913ab --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulator.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.customobject; + +import java.util.Comparator; +import javax.annotation.Nonnull; +import org.apache.datasketches.tuple.Sketch; +import org.apache.datasketches.tuple.Union; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; +import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations; + + +/** + * Intermediate state used by {@code IntegerTupleSketchAggregationFunction} which gives + * the end user more control over how sketches are merged for performance. + * In particular, the Theta Sketch Union "early-stop" optimisation can be used - ordered sketches require no further + * processing beyond the minimum Theta value. This applies to Tuple sketches because they are an extension of the + * Theta sketch. + * The union operation initialises an empty "gadget" bookkeeping sketch that is updated with hashed entries + * that fall below the minimum Theta value for all input sketches ("Broder Rule"). When the initial Theta value is + * set to the minimum immediately, further gains can be realised. + */ +public class TupleIntSketchAccumulator extends CustomObjectAccumulator> { + private IntegerSummarySetOperations _setOperations; + private int _nominalEntries; + private Union _union; + + public TupleIntSketchAccumulator() { + } + + // Note: The accumulator is serialized as a sketch. This means that the majority of the processing + // happens on serialization. Therefore, when deserialized, the values may be null and will + // require re-initialisation. Since the primary use case is at query time for the Broker + // and Server, these properties are already in memory and are re-set. + public TupleIntSketchAccumulator(IntegerSummarySetOperations setOperations, int nominalEntries, int threshold) { + super(threshold); + _nominalEntries = nominalEntries; + _setOperations = setOperations; + } + + public void setSetOperations(IntegerSummarySetOperations setOperations) { + _setOperations = setOperations; + } + + public void setNominalEntries(int nominalEntries) { + _nominalEntries = nominalEntries; + } + + @Nonnull + @Override + public Sketch getResult() { + return unionAll(); + } + + private Sketch unionAll() { + if (_union == null) { + _union = new Union<>(_nominalEntries, _setOperations); + } + // Return the default update "gadget" sketch as a compact sketch + if (isEmpty()) { + return _union.getResult(); + } + // Corner-case: the parameters are not strictly respected when there is a single sketch. + // This single sketch might have been the result of a previously accumulated union and + // would already have the parameters set. The sketch is returned as-is without adjusting + // nominal entries which requires an additional union operation. + if (getNumInputs() == 1) { + return _accumulator.get(0); + } + + // Performance optimization: ensure that the minimum Theta is used for "early stop". + // The "early stop" optimization is implemented in the Apache Datasketches Union operation for + // ordered and compact Theta sketches. Internally, a compact and ordered Theta sketch can be + // compared to a sorted array of K items. When performing a union, only those items from + // the input sketch less than Theta need to be processed. The loop terminates as soon as a hash + // is seen that is > Theta. + // The following "sort" improves on this further by selecting the minimal Theta value up-front, + // which results in fewer redundant entries being retained and subsequently discarded during the + // union operation. + _accumulator.sort(Comparator.comparingDouble(Sketch::getTheta)); + for (Sketch accumulatedSketch : _accumulator) { + _union.union(accumulatedSketch); + } + _accumulator.clear(); + + return _union.getResult(); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulatorTest.java new file mode 100644 index 000000000000..a86144ed036c --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/CpcSketchAccumulatorTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.segment.local.customobject; + +import java.util.stream.IntStream; +import org.apache.datasketches.cpc.CpcSketch; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class CpcSketchAccumulatorTest { + private final int _lgNominalEntries = 20; + private final double _epsilon = 0.5; + + @Test + public void testEmptyAccumulator() { + CpcSketchAccumulator accumulator = new CpcSketchAccumulator(_lgNominalEntries, 2); + Assert.assertTrue(accumulator.isEmpty()); + Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0); + } + + @Test + public void testAccumulatorWithSingleSketch() { + CpcSketch sketch = new CpcSketch(_lgNominalEntries); + IntStream.range(0, 1000).forEach(sketch::update); + + CpcSketchAccumulator accumulator = new CpcSketchAccumulator(_lgNominalEntries, 2); + accumulator.apply(sketch); + + Assert.assertFalse(accumulator.isEmpty()); + Assert.assertEquals(accumulator.getResult().getEstimate(), sketch.getEstimate()); + } + + @Test + public void testAccumulatorMerge() { + CpcSketch sketch1 = new CpcSketch(_lgNominalEntries); + IntStream.range(0, 1000).forEach(sketch1::update); + CpcSketch sketch2 = new CpcSketch(_lgNominalEntries); + IntStream.range(1000, 2000).forEach(sketch2::update); + + CpcSketchAccumulator accumulator1 = new CpcSketchAccumulator(_lgNominalEntries, 3); + accumulator1.apply(sketch1); + CpcSketchAccumulator accumulator2 = new CpcSketchAccumulator(_lgNominalEntries, 3); + accumulator2.apply(sketch2); + accumulator1.merge(accumulator2); + + Assert.assertEquals(accumulator1.getResult().getEstimate(), sketch1.getEstimate() + sketch2.getEstimate(), + _epsilon); + } + + @Test + public void testThresholdBehavior() { + CpcSketch sketch1 = new CpcSketch(_lgNominalEntries); + IntStream.range(0, 1000).forEach(sketch1::update); + CpcSketch sketch2 = new CpcSketch(_lgNominalEntries); + IntStream.range(1000, 2000).forEach(sketch2::update); + + CpcSketchAccumulator accumulator = new CpcSketchAccumulator(_lgNominalEntries, 3); + accumulator.apply(sketch1); + accumulator.apply(sketch2); + + Assert.assertEquals(accumulator.getResult().getEstimate(), sketch1.getEstimate() + sketch2.getEstimate(), _epsilon); + } + + @Test + public void testUnionWithEmptyInput() { + CpcSketchAccumulator accumulator = new CpcSketchAccumulator(_lgNominalEntries, 3); + CpcSketchAccumulator emptyAccumulator = new CpcSketchAccumulator(_lgNominalEntries, 3); + + accumulator.merge(emptyAccumulator); + + Assert.assertTrue(accumulator.isEmpty()); + Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulatorTest.java new file mode 100644 index 000000000000..775541621158 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/TupleIntSketchAccumulatorTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.segment.local.customobject; + +import java.util.stream.IntStream; +import org.apache.datasketches.tuple.CompactSketch; +import org.apache.datasketches.tuple.aninteger.IntegerSketch; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; +import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class TupleIntSketchAccumulatorTest { + private IntegerSummarySetOperations _setOps; + private final int _lgK = 12; + private final int _nominalEntries = (int) Math.pow(2, _lgK); + + @BeforeMethod + public void setUp() { + _setOps = new IntegerSummarySetOperations(IntegerSummary.Mode.Sum, IntegerSummary.Mode.Sum); + } + + @Test + public void testEmptyAccumulator() { + TupleIntSketchAccumulator accumulator = new TupleIntSketchAccumulator(_setOps, _nominalEntries, 2); + Assert.assertTrue(accumulator.isEmpty()); + Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0); + } + + @Test + public void testAccumulatorWithSingleSketch() { + IntegerSketch input = new IntegerSketch(_lgK, IntegerSummary.Mode.Sum); + IntStream.range(0, 1000).forEach(i -> input.update(i, 1)); + CompactSketch sketch = input.compact(); + + TupleIntSketchAccumulator accumulator = new TupleIntSketchAccumulator(_setOps, _nominalEntries, 2); + accumulator.apply(sketch); + + Assert.assertFalse(accumulator.isEmpty()); + Assert.assertEquals(accumulator.getResult().getEstimate(), sketch.getEstimate()); + } + + @Test + public void testAccumulatorMerge() { + IntegerSketch input1 = new IntegerSketch(_lgK, IntegerSummary.Mode.Sum); + IntStream.range(0, 1000).forEach(i -> input1.update(i, 1)); + CompactSketch sketch1 = input1.compact(); + IntegerSketch input2 = new IntegerSketch(_lgK, IntegerSummary.Mode.Sum); + IntStream.range(1000, 2000).forEach(i -> input2.update(i, 1)); + CompactSketch sketch2 = input2.compact(); + + TupleIntSketchAccumulator accumulator1 = new TupleIntSketchAccumulator(_setOps, _nominalEntries, 3); + accumulator1.apply(sketch1); + TupleIntSketchAccumulator accumulator2 = new TupleIntSketchAccumulator(_setOps, _nominalEntries, 3); + accumulator2.apply(sketch2); + accumulator1.merge(accumulator2); + + Assert.assertEquals(accumulator1.getResult().getEstimate(), sketch1.getEstimate() + sketch2.getEstimate()); + } + + @Test + public void testThresholdBehavior() { + IntegerSketch input1 = new IntegerSketch(_lgK, IntegerSummary.Mode.Sum); + IntStream.range(0, 1000).forEach(i -> input1.update(i, 1)); + CompactSketch sketch1 = input1.compact(); + IntegerSketch input2 = new IntegerSketch(_lgK, IntegerSummary.Mode.Sum); + IntStream.range(1000, 2000).forEach(i -> input2.update(i, 1)); + CompactSketch sketch2 = input2.compact(); + + TupleIntSketchAccumulator accumulator = new TupleIntSketchAccumulator(_setOps, _nominalEntries, 3); + accumulator.apply(sketch1); + accumulator.apply(sketch2); + + Assert.assertEquals(accumulator.getResult().getEstimate(), sketch1.getEstimate() + sketch2.getEstimate()); + } + + @Test + public void testUnionWithEmptyInput() { + TupleIntSketchAccumulator accumulator = new TupleIntSketchAccumulator(_setOps, _nominalEntries, 3); + TupleIntSketchAccumulator emptyAccumulator = new TupleIntSketchAccumulator(_setOps, _nominalEntries, 3); + + accumulator.merge(emptyAccumulator); + + Assert.assertTrue(accumulator.isEmpty()); + Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0); + } +}