Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add base class for merging and accumulating custom objects #12685

Merged
merged 1 commit into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.pinot.core.query.utils.idset.IdSets;
import org.apache.pinot.segment.local.customobject.AvgPair;
import org.apache.pinot.segment.local.customobject.CovarianceTuple;
import org.apache.pinot.segment.local.customobject.CpcSketchAccumulator;
import org.apache.pinot.segment.local.customobject.DoubleLongPair;
import org.apache.pinot.segment.local.customobject.FloatLongPair;
import org.apache.pinot.segment.local.customobject.IntLongPair;
Expand All @@ -89,6 +90,7 @@
import org.apache.pinot.segment.local.customobject.QuantileDigest;
import org.apache.pinot.segment.local.customobject.StringLongPair;
import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator;
import org.apache.pinot.segment.local.customobject.VarianceTuple;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.spi.utils.BigDecimalUtils;
Expand Down Expand Up @@ -156,7 +158,9 @@ public enum ObjectType {
FloatArrayList(44),
StringArrayList(45),
UltraLogLog(46),
ThetaSketchAccumulator(47);
ThetaSketchAccumulator(47),
TupleIntSketchAccumulator(48),
CpcSketchAccumulator(49);

private final int _value;

Expand Down Expand Up @@ -277,6 +281,10 @@ public static ObjectType getObjectType(Object value) {
return ObjectType.UltraLogLog;
} else if (value instanceof ThetaSketchAccumulator) {
return ObjectType.ThetaSketchAccumulator;
} else if (value instanceof TupleIntSketchAccumulator) {
return ObjectType.TupleIntSketchAccumulator;
} else if (value instanceof CpcSketchAccumulator) {
return ObjectType.CpcSketchAccumulator;
} else {
throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
}
Expand Down Expand Up @@ -1587,7 +1595,7 @@ public UltraLogLog deserialize(ByteBuffer byteBuffer) {
}
};

public static final ObjectSerDe<ThetaSketchAccumulator> DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE =
public static final ObjectSerDe<ThetaSketchAccumulator> DATA_SKETCH_THETA_ACCUMULATOR_SER_DE =
new ObjectSerDe<ThetaSketchAccumulator>() {

@Override
Expand All @@ -1614,6 +1622,62 @@ public ThetaSketchAccumulator deserialize(ByteBuffer byteBuffer) {
}
};

public static final ObjectSerDe<TupleIntSketchAccumulator> DATA_SKETCH_INT_TUPLE_ACCUMULATOR_SER_DE =
new ObjectSerDe<TupleIntSketchAccumulator>() {

@Override
public byte[] serialize(TupleIntSketchAccumulator tupleIntSketchBuffer) {
org.apache.datasketches.tuple.Sketch<IntegerSummary> sketch = tupleIntSketchBuffer.getResult();
return sketch.toByteArray();
}

@Override
public TupleIntSketchAccumulator deserialize(byte[] bytes) {
return deserialize(ByteBuffer.wrap(bytes));
}

// Note: The accumulator is designed to serialize as a sketch and should
// not be deserialized in practice.
@Override
public TupleIntSketchAccumulator deserialize(ByteBuffer byteBuffer) {
TupleIntSketchAccumulator tupleIntSketchAccumulator = new TupleIntSketchAccumulator();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
org.apache.datasketches.tuple.Sketch<IntegerSummary> sketch =
org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
new IntegerSummaryDeserializer());
tupleIntSketchAccumulator.apply(sketch);
return tupleIntSketchAccumulator;
}
};

public static final ObjectSerDe<CpcSketchAccumulator> DATA_SKETCH_CPC_ACCUMULATOR_SER_DE =
new ObjectSerDe<CpcSketchAccumulator>() {

@Override
public byte[] serialize(CpcSketchAccumulator cpcSketchBuffer) {
CpcSketch sketch = cpcSketchBuffer.getResult();
return sketch.toByteArray();
}

@Override
public CpcSketchAccumulator deserialize(byte[] bytes) {
return deserialize(ByteBuffer.wrap(bytes));
}

// Note: The accumulator is designed to serialize as a sketch and should
// not be deserialized in practice.
@Override
public CpcSketchAccumulator deserialize(ByteBuffer byteBuffer) {
CpcSketchAccumulator cpcSketchAccumulator = new CpcSketchAccumulator();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
CpcSketch sketch = CpcSketch.heapify(Memory.wrap(bytes));
cpcSketchAccumulator.apply(sketch);
return cpcSketchAccumulator;
}
};

// NOTE: DO NOT change the order, it has to be the same order as the ObjectType
//@formatter:off
private static final ObjectSerDe[] SER_DES = {
Expand Down Expand Up @@ -1664,7 +1728,9 @@ public ThetaSketchAccumulator deserialize(ByteBuffer byteBuffer) {
FLOAT_ARRAY_LIST_SER_DE,
STRING_ARRAY_LIST_SER_DE,
ULTRA_LOG_LOG_OBJECT_SER_DE,
DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE,
DATA_SKETCH_THETA_ACCUMULATOR_SER_DE,
DATA_SKETCH_INT_TUPLE_ACCUMULATOR_SER_DE,
DATA_SKETCH_CPC_ACCUMULATOR_SER_DE,
};
//@formatter:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
package org.apache.pinot.core.query.aggregation.function;

import java.util.List;
import org.apache.datasketches.tuple.CompactSketch;
import org.apache.datasketches.tuple.Sketch;
import org.apache.datasketches.tuple.TupleSketchIterator;
import org.apache.datasketches.tuple.Union;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator;
import org.apache.pinot.segment.spi.AggregationFunctionType;


Expand All @@ -48,22 +48,20 @@ public ColumnDataType getFinalResultColumnType() {
}

@Override
public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
if (integerSummarySketches == null) {
public Comparable extractFinalResult(TupleIntSketchAccumulator accumulator) {
accumulator.setNominalEntries(_nominalEntries);
accumulator.setSetOperations(_setOps);
accumulator.setThreshold(_accumulatorThreshold);
Sketch<IntegerSummary> result = accumulator.getResult();
if (result.isEmpty() || result.getRetainedEntries() == 0) {
// there is nothing to average, return null
return null;
}
Union<IntegerSummary> union = new Union<>(_entries, _setOps);
integerSummarySketches.forEach(union::union);
double retainedTotal = 0L;
CompactSketch<IntegerSummary> result = union.getResult();
TupleSketchIterator<IntegerSummary> summaries = result.iterator();
double retainedTotal = 0L;
while (summaries.next()) {
retainedTotal += summaries.getSummary().getValue();
}
if (result.getRetainedEntries() == 0) {
// there is nothing to average, return null
return null;
}
double estimate = retainedTotal / result.getRetainedEntries();
return Math.round(estimate);
}
Expand Down
Loading
Loading