Skip to content

Commit

Permalink
Extend Merge Rollup Capabilities for Datasketches (#14625)
Browse files Browse the repository at this point in the history
  • Loading branch information
davecromberge authored Jan 4, 2025
1 parent 26ad816 commit e12aab4
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.datasketches.cpc.CpcSketch;
import org.apache.datasketches.cpc.CpcUnion;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.spi.utils.CommonConstants;


Expand All @@ -34,19 +35,18 @@ public DistinctCountCPCSketchAggregator() {
public Object aggregate(Object value1, Object value2, Map<String, String> functionParameters) {
CpcSketch first = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value1);
CpcSketch second = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value2);
CpcSketch result;
if (first == null && second == null) {
result = new CpcSketch(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
} else if (second == null) {
result = first;
} else if (first == null) {
result = second;
CpcUnion union;

String lgKParam = functionParameters.get(Constants.CPCSKETCH_LGK_KEY);
if (lgKParam != null) {
union = new CpcUnion(Integer.parseInt(lgKParam));
} else {
CpcUnion union = new CpcUnion(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
union.update(first);
union.update(second);
result = union.getResult();
// If the functionParameters don't have an explicit lgK value set,
// use the default value for nominal entries
union = new CpcUnion(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
}
return ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(result);
union.update(first);
union.update(second);
return ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(union.getResult());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.segment.processing.aggregator;

import java.util.Map;
import org.apache.datasketches.theta.SetOperationBuilder;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Union;
import org.apache.pinot.core.common.ObjectSerDeUtils;
Expand All @@ -33,20 +34,26 @@ public DistinctCountThetaSketchAggregator() {

@Override
public Object aggregate(Object value1, Object value2, Map<String, String> functionParameters) {
String nominalEntriesParam = functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES);
SetOperationBuilder unionBuilder = Union.builder();

int sketchNominalEntries;
String samplingProbabilityParam = functionParameters.get(Constants.THETA_TUPLE_SKETCH_SAMPLING_PROBABILITY);
String nominalEntriesParam = functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES);

// Check if nominal entries values match
// Check if nominal entries is set
if (nominalEntriesParam != null) {
sketchNominalEntries = Integer.parseInt(nominalEntriesParam);
unionBuilder.setNominalEntries(Integer.parseInt(nominalEntriesParam));
} else {
// If the functionParameters don't have an explicit nominal entries value set,
// use the default value for nominal entries
sketchNominalEntries = CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES;
unionBuilder.setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES);
}

// Check if sampling probability is set
if (samplingProbabilityParam != null) {
unionBuilder.setP(Float.parseFloat(samplingProbabilityParam));
}

Union union = Union.builder().setNominalEntries(sketchNominalEntries).buildUnion();
Union union = unionBuilder.buildUnion();
Sketch first = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value1);
Sketch second = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value2);
Sketch result = union.union(first, second);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,22 @@ public IntegerTupleSketchAggregator(IntegerSummary.Mode mode) {
public Object aggregate(Object value1, Object value2, Map<String, String> functionParameters) {
String nominalEntriesParam = functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES);

int sketchNominalEntries;
Union<IntegerSummary> integerUnion;
IntegerSummarySetOperations setOperations = new IntegerSummarySetOperations(_mode, _mode);

// Check if nominal entries values match
// Check if nominal entries is set
if (nominalEntriesParam != null) {
sketchNominalEntries = Integer.parseInt(nominalEntriesParam);
integerUnion = new Union<>(Integer.parseInt(nominalEntriesParam), setOperations);
} else {
// If the functionParameters don't have an explicit nominal entries value set,
// use the default value for nominal entries
sketchNominalEntries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
int sketchNominalEntries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
integerUnion = new Union<>(sketchNominalEntries, setOperations);
}

Sketch<IntegerSummary> first = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value1);
Sketch<IntegerSummary> second = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value2);
Sketch<IntegerSummary> result =
new Union<>(sketchNominalEntries, new IntegerSummarySetOperations(_mode, _mode)).union(first, second);
Sketch<IntegerSummary> result = integerUnion.union(first, second);
return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(result);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.segment.processing.aggregator;


import java.util.HashMap;
import java.util.Map;
import org.apache.datasketches.cpc.CpcSketch;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.segment.spi.Constants;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.testng.Assert.*;

public class DistinctCountCPCSketchAggregatorTest {

private DistinctCountCPCSketchAggregator _cpcSketchAggregator;

@BeforeMethod
public void setUp() {
_cpcSketchAggregator = new DistinctCountCPCSketchAggregator();
}

@Test
public void testAggregateWithDefaultLgK() {
CpcSketch firstSketch = new CpcSketch(10);
CpcSketch secondSketch = new CpcSketch(20);
byte[] value1 = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(firstSketch);
byte[] value2 = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(secondSketch);

Map<String, String> functionParameters = new HashMap<>();
byte[] result = (byte[]) _cpcSketchAggregator.aggregate(value1, value2, functionParameters);

CpcSketch resultSketch = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(result);
assertNotNull(resultSketch);
assertEquals(resultSketch.getLgK(), 12);
}

@Test
public void testAggregateWithFunctionParameters() {
CpcSketch firstSketch = new CpcSketch(10);
CpcSketch secondSketch = new CpcSketch(20);
byte[] value1 = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(firstSketch);
byte[] value2 = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(secondSketch);

Map<String, String> functionParameters = new HashMap<>();
functionParameters.put(Constants.CPCSKETCH_LGK_KEY, "15");

byte[] result = (byte[]) _cpcSketchAggregator.aggregate(value1, value2, functionParameters);

CpcSketch resultSketch = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(result);
assertNotNull(resultSketch);
assertEquals(resultSketch.getLgK(), 15);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.segment.processing.aggregator;


import java.util.HashMap;
import java.util.Map;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.UpdateSketch;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.segment.spi.Constants;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.testng.Assert.*;

public class DistinctCountThetaSketchAggregatorTest {

private DistinctCountThetaSketchAggregator _thetaSketchAggregator;

@BeforeMethod
public void setUp() {
_thetaSketchAggregator = new DistinctCountThetaSketchAggregator();
}

@Test
public void testAggregateWithDefaultBehaviour() {
Sketch firstSketch = createThetaSketch(64);
Sketch secondSketch = createThetaSketch(32);
byte[] value1 = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(firstSketch);
byte[] value2 = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(secondSketch);
Map<String, String> functionParameters = new HashMap<>();

byte[] result = (byte[]) _thetaSketchAggregator.aggregate(value1, value2, functionParameters);

Sketch resultSketch = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(result);
assertNotNull(resultSketch);
assertEquals(resultSketch.getRetainedEntries(), 64);
}

@Test
public void testAggregateWithNominalEntries() {
Sketch firstSketch = createThetaSketch(64);
Sketch secondSketch = createThetaSketch(32);
byte[] value1 = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(firstSketch);
byte[] value2 = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(secondSketch);

Map<String, String> functionParameters = new HashMap<>();
functionParameters.put(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES, "32");

byte[] result = (byte[]) _thetaSketchAggregator.aggregate(value1, value2, functionParameters);

Sketch resultSketch = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(result);
assertNotNull(resultSketch);
assertEquals(resultSketch.getRetainedEntries(), 32);
}

@Test
public void testAggregateWithSamplingProbability() {
Sketch firstSketch = createThetaSketch(64);
Sketch secondSketch = createThetaSketch(32);
byte[] value1 = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(firstSketch);
byte[] value2 = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(secondSketch);

Map<String, String> functionParameters = new HashMap<>();
functionParameters.put(Constants.THETA_TUPLE_SKETCH_SAMPLING_PROBABILITY, "0.1");

byte[] result = (byte[]) _thetaSketchAggregator.aggregate(value1, value2, functionParameters);

Sketch resultSketch = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(result);
assertNotNull(resultSketch);
assertTrue(resultSketch.getRetainedEntries() < 64);
}

private Sketch createThetaSketch(int nominalEntries) {
UpdateSketch updateSketch = UpdateSketch.builder().setNominalEntries(nominalEntries).build();
for (int i = 0; i < nominalEntries; i++) {
updateSketch.update(i);
}
return updateSketch.compact();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.segment.processing.aggregator;

import java.util.HashMap;
import java.util.Map;
import org.apache.datasketches.tuple.CompactSketch;
import org.apache.datasketches.tuple.Sketch;
import org.apache.datasketches.tuple.aninteger.IntegerSketch;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.segment.spi.Constants;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;


public class IntegerTupleSketchAggregatorTest {

private IntegerTupleSketchAggregator _tupleSketchAggregator;

@BeforeMethod
public void setUp() {
_tupleSketchAggregator = new IntegerTupleSketchAggregator(IntegerSummary.Mode.Max);
}

@Test
public void testAggregateWithDefaultBehaviour() {
Sketch<IntegerSummary> firstSketch = createTupleSketch(64);
Sketch<IntegerSummary> secondSketch = createTupleSketch(32);
byte[] value1 = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(firstSketch);
byte[] value2 = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(secondSketch);
Map<String, String> functionParameters = new HashMap<>();

byte[] result = (byte[]) _tupleSketchAggregator.aggregate(value1, value2, functionParameters);

Sketch<IntegerSummary> resultSketch = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(result);
assertNotNull(resultSketch);
assertEquals(resultSketch.getRetainedEntries(), 64);
}

@Test
public void testAggregateWithNominalEntries() {
Sketch<IntegerSummary> firstSketch = createTupleSketch(64);
Sketch<IntegerSummary> secondSketch = createTupleSketch(32);
byte[] value1 = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(firstSketch);
byte[] value2 = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(secondSketch);

Map<String, String> functionParameters = new HashMap<>();
functionParameters.put(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES, "32");

byte[] result = (byte[]) _tupleSketchAggregator.aggregate(value1, value2, functionParameters);

Sketch<IntegerSummary> resultSketch = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(result);
assertNotNull(resultSketch);
assertEquals(resultSketch.getRetainedEntries(), 32);
}

private CompactSketch<IntegerSummary> createTupleSketch(int nominalEntries) {
int lgK = (int) (Math.log(nominalEntries) / Math.log(2));
IntegerSketch integerSketch = new IntegerSketch(lgK, IntegerSummary.Mode.Max);
for (int i = 0; i < nominalEntries; i++) {
integerSketch.update(i, 1);
}
return integerSketch.compact();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ private Constants() {
public static final String HLLPLUS_SP_KEY = "sp";
public static final String CPCSKETCH_LGK_KEY = "lgK";
public static final String THETA_TUPLE_SKETCH_NOMINAL_ENTRIES = "nominalEntries";
public static final String THETA_TUPLE_SKETCH_SAMPLING_PROBABILITY = "samplingProbability";
public static final String PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY = "compressionFactor";
public static final String SUMPRECISION_PRECISION_KEY = "precision";
}

0 comments on commit e12aab4

Please sign in to comment.