Skip to content

Commit

Permalink
Add group by trimming to MSQE/V2 query engine (#14727)
Browse files Browse the repository at this point in the history
* group_trim_size hint - that enables trimming at aggregate operator stage if both order by and limit are available (currently requires using is_enable_group_trim hint). Note: is_enable_group_trim also enables v1-style leaf-stage group by results trimming. See [grouping algorithm documentation](https://docs.pinot.apache.org/users/user-guide-query/query-syntax/grouping-algorithm) for details.
* error_or_num_groups_limit hint or errorOnNumGroupsLimit query option - throws exception when num_groups_limit is reached in aggregate operator instead of setting a metadata flag
  • Loading branch information
bziobrowski authored Jan 14, 2025
1 parent bd554b3 commit b6904da
Show file tree
Hide file tree
Showing 27 changed files with 1,079 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,13 @@ public static Integer getMaxExecutionThreads(Map<String, String> queryOptions) {
return checkedParseIntPositive(QueryOptionKey.MAX_EXECUTION_THREADS, maxExecutionThreadsString);
}

@Nullable
public static Integer getGroupTrimSize(Map<String, String> queryOptions) {
String groupTrimSize = queryOptions.get(QueryOptionKey.GROUP_TRIM_SIZE);
// NOTE: Non-positive value means turning off the intermediate level trim
return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_SIZE, groupTrimSize);
}

@Nullable
public static Integer getMinSegmentGroupTrimSize(Map<String, String> queryOptions) {
String minSegmentGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE);
Expand Down Expand Up @@ -268,6 +275,10 @@ public static Integer getMultiStageLeafLimit(Map<String, String> queryOptions) {
return checkedParseIntNonNegative(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT, maxLeafLimitStr);
}

public static boolean getErrorOnNumGroupsLimit(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.ERROR_ON_NUM_GROUPS_LIMIT));
}

@Nullable
public static Integer getNumGroupsLimit(Map<String, String> queryOptions) {
String maxNumGroupLimit = queryOptions.get(QueryOptionKey.NUM_GROUPS_LIMIT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -244,6 +246,42 @@ public List<String> listSegments(String tableName, @Nullable String tableType, b
}
}

public Map<String, List<String>> getServersToSegmentsMap(String tableName, TableType tableType)
throws IOException {
String url = _controllerRequestURLBuilder.forServersToSegmentsMap(tableName, tableType.toString());
try {
SimpleHttpResponse resp =
HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest(new URI(url), _headers));
JsonNode jsonNode = JsonUtils.stringToJsonNode(resp.getResponse());
if (jsonNode == null || jsonNode.get(0) == null) {
return Collections.emptyMap();
}

JsonNode serversMap = jsonNode.get(0).get("serverToSegmentsMap");
if (serversMap == null) {
return Collections.emptyMap();
}

HashMap<String, List<String>> result = new HashMap<>();
Iterator<Map.Entry<String, JsonNode>> fields = serversMap.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
List<String> segments = new ArrayList<>();

ArrayNode value = (ArrayNode) field.getValue();
for (int i = 0, len = value.size(); i < len; i++) {
segments.add(value.get(i).toString());
}

result.put(field.getKey(), segments);
}

return result;
} catch (HttpErrorStatusException | URISyntaxException e) {
throw new IOException(e);
}
}

public void deleteSegment(String tableName, String segmentName)
throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,11 @@ public long getTableSize(String tableName)
return getControllerRequestClient().getTableSize(tableName);
}

public Map<String, List<String>> getTableServersToSegmentsMap(String tableName, TableType tableType)
throws IOException {
return getControllerRequestClient().getServersToSegmentsMap(tableName, tableType);
}

public String reloadOfflineTable(String tableName)
throws IOException {
return reloadOfflineTable(tableName, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@


/**
* The <code>AggregationOperator</code> class provides the operator for aggregation only query on a single segment.
* The <code>AggregationOperator</code> class implements keyless aggregation query on a single segment in V1/SSQE.
*/
@SuppressWarnings("rawtypes")
public class AggregationOperator extends BaseOperator<AggregationResultsBlock> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@


/**
* The <code>GroupByOperator</code> class provides the operator for group-by query on a single segment.
* The <code>GroupByOperator</code> class implements keyed aggregation on a single segment in V1/SSQE.
*/
@SuppressWarnings("rawtypes")
public class GroupByOperator extends BaseOperator<GroupByResultsBlock> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@


/**
* The <code>CombinePlanNode</code> class provides the execution plan for combining results from multiple segments.
* The <code>CombinePlanNode</code> class provides the execution plan for combining results from multiple segments in
* V1/SSQE.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class CombinePlanNode implements PlanNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
public static final String NUM_GROUPS_LIMIT_KEY = "num.groups.limit";
public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;

// By default, group trimming in AggregateOperator is disabled
public static final int DEFAULT_GROUP_TRIM_SIZE = -1;

// Instance config key for minimum segment-level group trim size
// Set as pinot.server.query.executor.min.segment.group.trim.size
public static final String MIN_SEGMENT_GROUP_TRIM_SIZE_KEY = "min.segment.group.trim.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
* integer raw keys and map them onto contiguous group ids. (INT_MAP_BASED)
* </li>
* <li>
* If the maximum number of possible group keys cannot fit into than integer, but still fit into long, generate long
* If the maximum number of possible group keys cannot fit into integer, but still fit into long, generate long
* raw keys and map them onto contiguous group ids. (LONG_MAP_BASED)
* </li>
* <li>
Expand Down Expand Up @@ -105,8 +105,6 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
ExpressionContext[] groupByExpressions, int numGroupsLimit, int arrayBasedThreshold,
@Nullable Map<ExpressionContext, Integer> groupByExpressionSizesFromPredicates) {
assert numGroupsLimit >= arrayBasedThreshold;

_groupByExpressions = groupByExpressions;
_numGroupByExpressions = groupByExpressions.length;

Expand Down Expand Up @@ -173,7 +171,9 @@ public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
_rawKeyHolder = new LongMapBasedHolder(groupIdMap);
} else {
_globalGroupIdUpperBound = Math.min((int) cardinalityProduct, numGroupsLimit);
if (cardinalityProduct > arrayBasedThreshold) {
// arrayBaseHolder fails with ArrayIndexOutOfBoundsException if numGroupsLimit < cardinalityProduct
// because array doesn't fit all (potentially unsorted) values
if (cardinalityProduct > arrayBasedThreshold || numGroupsLimit < cardinalityProduct) {
// IntMapBasedHolder
IntGroupIdMap groupIdMap = THREAD_LOCAL_INT_MAP.get();
groupIdMap.clearAndTrim();
Expand Down Expand Up @@ -281,6 +281,7 @@ private interface RawKeyHolder {
int getNumKeys();
}

// This holder works only if it can fit all results, otherwise it fails on AIOOBE or produces too many group keys
private class ArrayBasedHolder implements RawKeyHolder {
private final boolean[] _flags = new boolean[_globalGroupIdUpperBound];
private int _numKeys = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@

/**
* Helper class to reduce data tables and set group by results into the BrokerResponseNative
* Used for key-less aggregations, e.g. select max(id), sum(quantity) from orders .
*/
@SuppressWarnings("rawtypes")
public class GroupByDataTableReducer implements DataTableReducer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ public FilterContext getFilter() {
}

/**
* Returns a list of expressions in the GROUP-BY clause, or {@code null} if there is no GROUP-BY clause.
* Returns a list of expressions in the GROUP-BY clause (aggregation keys), or {@code null} if there is no GROUP-BY
* clause.
*/
@Nullable
public List<ExpressionContext> getGroupByExpressions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,22 @@ public static QueryContext getQueryContext(PinotQuery pinotQuery) {
explainMode = ExplainMode.DESCRIPTION;
}

return new QueryContext.Builder().setTableName(tableName).setSubquery(subquery)
.setSelectExpressions(selectExpressions).setDistinct(distinct).setAliasList(aliasList).setFilter(filter)
.setGroupByExpressions(groupByExpressions).setOrderByExpressions(orderByExpressions)
.setHavingFilter(havingFilter).setLimit(pinotQuery.getLimit()).setOffset(pinotQuery.getOffset())
.setQueryOptions(pinotQuery.getQueryOptions()).setExpressionOverrideHints(expressionContextOverrideHints)
.setExplain(explainMode).build();
return new QueryContext.Builder()
.setTableName(tableName)
.setSubquery(subquery)
.setSelectExpressions(selectExpressions)
.setDistinct(distinct)
.setAliasList(aliasList)
.setFilter(filter)
.setGroupByExpressions(groupByExpressions)
.setOrderByExpressions(orderByExpressions)
.setHavingFilter(havingFilter)
.setLimit(pinotQuery.getLimit())
.setOffset(pinotQuery.getOffset())
.setQueryOptions(pinotQuery.getQueryOptions())
.setExpressionOverrideHints(expressionContextOverrideHints)
.setExplain(explainMode)
.build();
}

private static boolean isMultiStage(PinotQuery pinotQuery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public static IndexedTable createIndexedTableForCombineOperator(GroupByResultsBl
int limit = queryContext.getLimit();
boolean hasOrderBy = queryContext.getOrderByExpressions() != null;
boolean hasHaving = queryContext.getHavingFilter() != null;
int minTrimSize = queryContext.getMinServerGroupTrimSize();
int minTrimSize =
queryContext.getMinServerGroupTrimSize(); // it's minBrokerGroupTrimSize in broker
int minInitialIndexedTableCapacity = queryContext.getMinInitialIndexedTableCapacity();

// Disable trim when min trim size is non-positive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@
package org.apache.pinot.core.query.aggregation.function;

import org.apache.pinot.queries.FluentQueryTest;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec.PASS_THROUGH;


public class AvgAggregationFunctionTest extends AbstractAggregationFunctionTest {

Expand Down Expand Up @@ -177,4 +182,74 @@ void aggregationGroupByMV(DataTypeScenario scenario) {
"tag3 | null"
);
}

@Test(dataProvider = "encodingTypes")
void singleKeyAggregationWithSmallNumGroupsLimitDoesntThrowAIOOBE(FieldConfig.EncodingType encoding) {
FluentQueryTest.withBaseDir(_baseDir)
.givenTable(
new Schema.SchemaBuilder()
.setSchemaName("testTable")
.setEnableColumnBasedNullHandling(true)
.addMetricField("key", FieldSpec.DataType.INT)
.addMetricField("value", FieldSpec.DataType.INT)
.build(),
new TableConfigBuilder(TableType.OFFLINE)
.setTableName("testTable")
.addFieldConfig(
new FieldConfig("key", encoding, (FieldConfig.IndexType) null, PASS_THROUGH, null))
.build())
.onFirstInstance(new Object[]{7, 1}, new Object[]{6, 2}, new Object[]{5, 3}, new Object[]{4, 4})
.andOnSecondInstance(new Object[]{7, 1}, new Object[]{6, 2}, new Object[]{5, 3}, new Object[]{4, 4})
.whenQuery(
"set numGroupsLimit=3; set maxInitialResultHolderCapacity=1000; "
+ "select key, avg(value) "
+ "from testTable "
+ "group by key "
+ "order by key")
.thenResultIs(
"INTEGER | DOUBLE",
"5 | 3",
"6 | 2",
"7 | 1"
);
}

@Test(dataProvider = "encodingTypes")
void multiKeyAggregationWithSmallNumGroupsLimitDoesntThrowAIOOBE(FieldConfig.EncodingType encoding) {
FluentQueryTest.withBaseDir(_baseDir)
.givenTable(
new Schema.SchemaBuilder()
.setSchemaName("testTable")
.setEnableColumnBasedNullHandling(true)
.addMetricField("key1", FieldSpec.DataType.INT)
.addMetricField("key2", FieldSpec.DataType.INT)
.addMetricField("value", FieldSpec.DataType.INT)
.build(),
new TableConfigBuilder(TableType.OFFLINE)
.setTableName("testTable")
.addFieldConfig(
new FieldConfig("key1", encoding, (FieldConfig.IndexType) null, PASS_THROUGH, null))
.addFieldConfig(
new FieldConfig("key2", encoding, (FieldConfig.IndexType) null, PASS_THROUGH, null))
.build())
.onFirstInstance(new Object[]{7, 1}, new Object[]{6, 2}, new Object[]{5, 3}, new Object[]{4, 4})
.andOnSecondInstance(new Object[]{7, 1}, new Object[]{6, 2}, new Object[]{5, 3}, new Object[]{4, 4})
.whenQuery(
"set numGroupsLimit=3; set maxInitialResultHolderCapacity=1000; "
+ "select key1, key2, count(*) "
+ "from testTable "
+ "group by key1, key2 "
+ "order by key1, key2")
.thenResultIs(
"INTEGER | INTEGER | LONG",
"5 | 3 | 2",
"6 | 2 | 2",
"7 | 1 | 2"
);
}

@DataProvider(name = "encodingTypes")
FieldConfig.EncodingType[] encodingTypes() {
return FieldConfig.EncodingType.values();
}
}
Loading

0 comments on commit b6904da

Please sign in to comment.