From 95660785af8c9c73f62999b9790718faf907649b Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" Date: Fri, 6 Dec 2024 17:45:53 -0800 Subject: [PATCH] Allow configurable initial capacity for IndexedTable --- .../apache/pinot/common/utils/HashUtil.java | 10 ++- .../utils/config/QueryOptionsUtils.java | 10 ++- .../data/table/ConcurrentIndexedTable.java | 9 +-- .../core/data/table/SimpleIndexedTable.java | 8 +-- .../UnboundedConcurrentIndexedTable.java | 8 +-- .../blocks/results/GroupByResultsBlock.java | 10 +++ .../combine/GroupByCombineOperator.java | 12 +++- .../StreamingGroupByCombineOperator.java | 12 +++- .../plan/maker/InstancePlanMakerImplV2.java | 64 +++++++++++++++---- .../groupby/AggregationGroupByResult.java | 4 ++ .../core/query/reduce/BaseReduceService.java | 4 ++ .../query/reduce/BrokerReduceService.java | 7 +- .../query/reduce/DataTableReducerContext.java | 8 ++- .../query/reduce/GroupByDataTableReducer.java | 19 ++++-- .../query/reduce/StreamingReduceService.java | 7 +- .../query/request/context/QueryContext.java | 10 +++ .../apache/pinot/core/util/GroupByUtils.java | 22 +++++++ .../ResourceManagerAccountingTest.java | 4 +- .../core/data/table/IndexedTableTest.java | 25 +++++--- .../pinot/core/util/GroupByUtilsTest.java | 15 +++++ ...gmentAggregationMultiValueQueriesTest.java | 8 +-- ...ntAggregationMultiValueRawQueriesTest.java | 8 +-- ...mentAggregationSingleValueQueriesTest.java | 8 +-- ...erSegmentGroupByMultiValueQueriesTest.java | 10 +-- ...egmentGroupByMultiValueRawQueriesTest.java | 10 +-- ...rSegmentGroupBySingleValueQueriesTest.java | 10 +-- .../OfflineGRPCServerIntegrationTest.java | 2 +- .../pinot/query/runtime/QueryRunner.java | 15 +++++ .../pinot/spi/utils/CommonConstants.java | 8 ++- 29 files changed, 264 insertions(+), 83 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java index 9c8d227c9b9d..a8c5cc9985a8 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.common.utils; +import com.google.common.primitives.Ints; import java.nio.ByteBuffer; import java.nio.IntBuffer; @@ -44,9 +45,16 @@ public static int getMinHashSetSize(int expected) { /** * Returns a capacity that is sufficient to keep the map from being resized as long as it grows no larger than * expectedSize and the load factor is >= its default (0.75). + * NOTE: Borrowed from Guava's Maps library {@code int capacity(int expectedSize)}. */ public static int getHashMapCapacity(int expectedSize) { - return (int) ((float) expectedSize / 0.75f + 1f); + if (expectedSize < 3) { + return expectedSize + 1; + } + if (expectedSize < Ints.MAX_POWER_OF_TWO) { + return (int) Math.ceil(expectedSize / 0.75); + } + return Integer.MAX_VALUE; } public static long compute(IntBuffer buff) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index 0ff2780fd2fa..d93e1fd218c1 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -267,14 +267,20 @@ public static Integer getNumGroupsLimit(Map queryOptions) { @Nullable public static Integer getMaxInitialResultHolderCapacity(Map queryOptions) { - String maxInitResultCap = queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY); - return checkedParseIntPositive(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY, maxInitResultCap); + String maxInitialResultHolderCapacity = queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY); + return checkedParseIntPositive(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY, maxInitialResultHolderCapacity); } public static boolean optimizeMaxInitialResultHolderCapacityEnabled(Map queryOptions) { return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.OPTIMIZE_MAX_INITIAL_RESULT_HOLDER_CAPACITY)); } + @Nullable + public static Integer getMinInitialIndexedTableCapacity(Map queryOptions) { + String minInitialIndexedTableCapacity = queryOptions.get(QueryOptionKey.MIN_INITIAL_INDEXED_TABLE_CAPACITY); + return checkedParseIntPositive(QueryOptionKey.MIN_INITIAL_INDEXED_TABLE_CAPACITY, minInitialIndexedTableCapacity); + } + @Nullable private static Long checkedParseLongPositive(String optionName, @Nullable String optionValue) { return checkedParseLong(optionName, optionValue, 1); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java index 119d47c79e89..74c510ecf88e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java @@ -33,13 +33,14 @@ public class ConcurrentIndexedTable extends IndexedTable { private final ReentrantReadWriteLock _readWriteLock = new ReentrantReadWriteLock(); public ConcurrentIndexedTable(DataSchema dataSchema, QueryContext queryContext, int resultSize, int trimSize, - int trimThreshold) { - this(dataSchema, false, queryContext, resultSize, trimSize, trimThreshold); + int trimThreshold, int initialCapacity) { + this(dataSchema, false, queryContext, resultSize, trimSize, trimThreshold, initialCapacity); } public ConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize, - int trimSize, int trimThreshold) { - super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new ConcurrentHashMap<>()); + int trimSize, int trimThreshold, int initialCapacity) { + super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, + new ConcurrentHashMap<>(initialCapacity)); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java index 2163620225b6..3b67abd20102 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java @@ -31,13 +31,13 @@ public class SimpleIndexedTable extends IndexedTable { public SimpleIndexedTable(DataSchema dataSchema, QueryContext queryContext, int resultSize, int trimSize, - int trimThreshold) { - this(dataSchema, false, queryContext, resultSize, trimSize, trimThreshold); + int trimThreshold, int initialCapacity) { + this(dataSchema, false, queryContext, resultSize, trimSize, trimThreshold, initialCapacity); } public SimpleIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize, - int trimSize, int trimThreshold) { - super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new HashMap<>()); + int trimSize, int trimThreshold, int initialCapacity) { + super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new HashMap<>(initialCapacity)); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java index 67f82b201194..8dff07b598a0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java @@ -35,13 +35,13 @@ */ public class UnboundedConcurrentIndexedTable extends ConcurrentIndexedTable { - public UnboundedConcurrentIndexedTable(DataSchema dataSchema, QueryContext queryContext, int resultSize) { - this(dataSchema, false, queryContext, resultSize); + public UnboundedConcurrentIndexedTable(DataSchema dataSchema, QueryContext queryContext, int resultSize, int initialCapacity) { + this(dataSchema, false, queryContext, resultSize, initialCapacity); } public UnboundedConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, - int resultSize) { - super(dataSchema, hasFinalInput, queryContext, resultSize, Integer.MAX_VALUE, Integer.MAX_VALUE); + int resultSize, int initialCapacity) { + super(dataSchema, hasFinalInput, queryContext, resultSize, Integer.MAX_VALUE, Integer.MAX_VALUE, initialCapacity); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java index dfc5faa28930..bf32b349d117 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java @@ -120,6 +120,16 @@ public Table getTable() { return _table; } + public int getNumGroups() { + assert _aggregationGroupByResult != null || _intermediateRecords != null : + "Should not call getNumGroups() on instance level results"; + if (_aggregationGroupByResult != null) { + return _aggregationGroupByResult.getNumGroups(); + } else { + return _intermediateRecords.size(); + } + } + public boolean isNumGroupsLimitReached() { return _numGroupsLimitReached; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java index ecb0a56cbf45..21e23dc06401 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java @@ -137,19 +137,25 @@ protected void processSegments() { synchronized (this) { if (_indexedTable == null) { DataSchema dataSchema = resultsBlock.getDataSchema(); + int initialCapacity = + GroupByUtils.getIndexedTableInitialCapacity(_trimThreshold, resultsBlock.getNumGroups(), + _queryContext.getMinInitialIndexedTableCapacity()); // NOTE: Use trimSize as resultSize on server side. if (_numTasks == 1) { - _indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold); + _indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold, + initialCapacity); } else { if (_trimThreshold >= MAX_TRIM_THRESHOLD) { // special case of trim threshold where it is set to max value. // there won't be any trimming during upsert in this case. // thus we can avoid the overhead of read-lock and write-lock // in the upsert method. - _indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize); + _indexedTable = + new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, initialCapacity); } else { _indexedTable = - new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold); + new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold, + initialCapacity); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java index 1e8c88e9ce5e..123390abf026 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java @@ -164,19 +164,25 @@ public void processSegments() { synchronized (this) { if (_indexedTable == null) { DataSchema dataSchema = resultsBlock.getDataSchema(); + int initialCapacity = + GroupByUtils.getIndexedTableInitialCapacity(_trimThreshold, resultsBlock.getNumGroups(), + _queryContext.getMinInitialIndexedTableCapacity()); // NOTE: Use trimSize as resultSize on server side. if (_numTasks == 1) { - _indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold); + _indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold, + initialCapacity); } else { if (_trimThreshold >= MAX_TRIM_THRESHOLD) { // special case of trim threshold where it is set to max value. // there won't be any trimming during upsert in this case. // thus we can avoid the overhead of read-lock and write-lock // in the upsert method. - _indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize); + _indexedTable = + new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, initialCapacity); } else { _indexedTable = - new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold); + new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold, + initialCapacity); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java index e76a64988659..cadce4bcf6d0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java @@ -71,6 +71,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker { public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = "max.init.group.holder.capacity"; public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000; + public static final String MIN_INITIAL_INDEXED_TABLE_CAPACITY_KEY = "min.init.indexed.table.capacity"; + public static final int DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY = 128; public static final String NUM_GROUPS_LIMIT_KEY = "num.groups.limit"; public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000; @@ -93,6 +95,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker { private final FetchPlanner _fetchPlanner = FetchPlannerRegistry.getPlanner(); private int _maxExecutionThreads = DEFAULT_MAX_EXECUTION_THREADS; private int _maxInitialResultHolderCapacity = DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY; + private int _minInitialIndexedTableCapacity = DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY; // Limit on number of groups stored for each segment, beyond which no new group will be created private int _numGroupsLimit = DEFAULT_NUM_GROUPS_LIMIT; // Used for SQL GROUP BY (server combine) @@ -103,25 +106,20 @@ public class InstancePlanMakerImplV2 implements PlanMaker { public InstancePlanMakerImplV2() { } - @VisibleForTesting - public InstancePlanMakerImplV2(int maxInitialResultHolderCapacity, int numGroupsLimit, int minSegmentGroupTrimSize, - int minServerGroupTrimSize, int groupByTrimThreshold) { - _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity; - _numGroupsLimit = numGroupsLimit; - _minSegmentGroupTrimSize = minSegmentGroupTrimSize; - _minServerGroupTrimSize = minServerGroupTrimSize; - _groupByTrimThreshold = groupByTrimThreshold; - } - @Override public void init(PinotConfiguration queryExecutorConfig) { _maxExecutionThreads = queryExecutorConfig.getProperty(MAX_EXECUTION_THREADS_KEY, DEFAULT_MAX_EXECUTION_THREADS); _maxInitialResultHolderCapacity = queryExecutorConfig.getProperty(MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY, DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY); + _minInitialIndexedTableCapacity = queryExecutorConfig.getProperty(MIN_INITIAL_INDEXED_TABLE_CAPACITY_KEY, + DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY); _numGroupsLimit = queryExecutorConfig.getProperty(NUM_GROUPS_LIMIT_KEY, DEFAULT_NUM_GROUPS_LIMIT); Preconditions.checkState(_maxInitialResultHolderCapacity <= _numGroupsLimit, "Invalid configuration: maxInitialResultHolderCapacity: %d must be smaller or equal to numGroupsLimit: %d", _maxInitialResultHolderCapacity, _numGroupsLimit); + Preconditions.checkState(_minInitialIndexedTableCapacity <= _numGroupsLimit, + "Invalid configuration: minInitialIndexedTableCapacity: %d must be smaller or equal to numGroupsLimit: %d", + _minInitialIndexedTableCapacity, _numGroupsLimit); _minSegmentGroupTrimSize = queryExecutorConfig.getProperty(MIN_SEGMENT_GROUP_TRIM_SIZE_KEY, DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE); _minServerGroupTrimSize = @@ -135,6 +133,36 @@ public void init(PinotConfiguration queryExecutorConfig) { _minServerGroupTrimSize, _groupByTrimThreshold); } + @VisibleForTesting + public void setMaxInitialResultHolderCapacity(int maxInitialResultHolderCapacity) { + _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity; + } + + @VisibleForTesting + public void setMinInitialIndexedTableCapacity(int minInitialIndexedTableCapacity) { + _minInitialIndexedTableCapacity = minInitialIndexedTableCapacity; + } + + @VisibleForTesting + public void setNumGroupsLimit(int numGroupsLimit) { + _numGroupsLimit = numGroupsLimit; + } + + @VisibleForTesting + public void setMinSegmentGroupTrimSize(int minSegmentGroupTrimSize) { + _minSegmentGroupTrimSize = minSegmentGroupTrimSize; + } + + @VisibleForTesting + public void setMinServerGroupTrimSize(int minServerGroupTrimSize) { + _minServerGroupTrimSize = minServerGroupTrimSize; + } + + @VisibleForTesting + public void setGroupByTrimThreshold(int groupByTrimThreshold) { + _groupByTrimThreshold = groupByTrimThreshold; + } + public Plan makeInstancePlan(List segmentContexts, QueryContext queryContext, ExecutorService executorService, ServerMetrics serverMetrics) { applyQueryOptions(queryContext); @@ -196,12 +224,19 @@ private void applyQueryOptions(QueryContext queryContext) { // Set group-by query options if (QueryContextUtils.isAggregationQuery(queryContext) && queryContext.getGroupByExpressions() != null) { // Set maxInitialResultHolderCapacity - Integer initResultCap = QueryOptionsUtils.getMaxInitialResultHolderCapacity(queryOptions); - if (initResultCap != null) { - queryContext.setMaxInitialResultHolderCapacity(initResultCap); + Integer maxInitialResultHolderCapacity = QueryOptionsUtils.getMaxInitialResultHolderCapacity(queryOptions); + if (maxInitialResultHolderCapacity != null) { + queryContext.setMaxInitialResultHolderCapacity(maxInitialResultHolderCapacity); } else { queryContext.setMaxInitialResultHolderCapacity(_maxInitialResultHolderCapacity); } + // Set initialResultTableCapacity + Integer minInitialIndexedTableCapacity = QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions); + if (minInitialIndexedTableCapacity != null) { + queryContext.setMinInitialIndexedTableCapacity(minInitialIndexedTableCapacity); + } else { + queryContext.setMinInitialIndexedTableCapacity(_minInitialIndexedTableCapacity); + } // Set numGroupsLimit Integer numGroupsLimit = QueryOptionsUtils.getNumGroupsLimit(queryOptions); if (numGroupsLimit != null) { @@ -361,7 +396,8 @@ public static ExpressionContext overrideWithExpressionHints(ExpressionContext ex .contains(overrideExpression.getIdentifier())) { return overrideExpression; } - expression.getFunction().getArguments() + expression.getFunction() + .getArguments() .replaceAll(argument -> overrideWithExpressionHints(argument, indexSegment, expressionOverrideHints)); return expression; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByResult.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByResult.java index e2933527af0b..49c2361c3c7a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByResult.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByResult.java @@ -40,6 +40,10 @@ public AggregationGroupByResult(GroupKeyGenerator groupKeyGenerator, Aggregation _resultHolders = resultHolders; } + public int getNumGroups() { + return _groupKeyGenerator.getNumKeys(); + } + /** * Returns an iterator of {@link GroupKeyGenerator.GroupKey}. */ diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java index 9b44e0c40598..05e9dae536fa 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java @@ -52,6 +52,7 @@ public abstract class BaseReduceService { protected final int _maxReduceThreadsPerQuery; protected final int _groupByTrimThreshold; protected final int _minGroupTrimSize; + protected final int _minInitialIndexedTableCapacity; public BaseReduceService(PinotConfiguration config) { _maxReduceThreadsPerQuery = config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, @@ -60,6 +61,9 @@ public BaseReduceService(PinotConfiguration config) { CommonConstants.Broker.DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD); _minGroupTrimSize = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_GROUP_TRIM_SIZE, CommonConstants.Broker.DEFAULT_BROKER_MIN_GROUP_TRIM_SIZE); + _minInitialIndexedTableCapacity = + config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_INITIAL_INDEXED_TABLE_CAPACITY, + CommonConstants.Broker.DEFAULT_BROKER_MIN_INITIAL_INDEXED_TABLE_CAPACITY); int numThreadsInExecutorService = Runtime.getRuntime().availableProcessors(); LOGGER.info("Initializing BrokerReduceService with {} threads, and {} max reduce threads.", diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java index b9b0f7bb513e..d10e0811ede9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java @@ -142,18 +142,23 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke Integer minGroupTrimSizeQueryOption = null; Integer groupTrimThresholdQueryOption = null; + Integer minInitialIndexedTableCapacityQueryOption = null; if (queryOptions != null) { minGroupTrimSizeQueryOption = QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions); groupTrimThresholdQueryOption = QueryOptionsUtils.getGroupTrimThreshold(queryOptions); + minInitialIndexedTableCapacityQueryOption = QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions); } int minGroupTrimSize = minGroupTrimSizeQueryOption != null ? minGroupTrimSizeQueryOption : _minGroupTrimSize; int groupTrimThreshold = groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption : _groupByTrimThreshold; + int minInitialIndexedTableCapacity = + minInitialIndexedTableCapacityQueryOption != null ? minInitialIndexedTableCapacityQueryOption + : _minInitialIndexedTableCapacity; try { dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, dataTableMap, brokerResponseNative, new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs, - groupTrimThreshold, minGroupTrimSize), brokerMetrics); + groupTrimThreshold, minGroupTrimSize, minInitialIndexedTableCapacity), brokerMetrics); } catch (EarlyTerminationException e) { brokerResponseNative.addException( new QueryProcessingException(QueryException.QUERY_CANCELLATION_ERROR_CODE, e.toString())); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java index d4b69e6c2132..8c645a622b09 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java @@ -32,6 +32,7 @@ public class DataTableReducerContext { // used for SQL GROUP BY private final int _groupByTrimThreshold; private final int _minGroupTrimSize; + private final int _minInitialIndexedTableCapacity; /** * Constructor for the class. @@ -42,12 +43,13 @@ public class DataTableReducerContext { * @param groupByTrimThreshold trim threshold for SQL group by */ public DataTableReducerContext(ExecutorService executorService, int maxReduceThreadsPerQuery, long reduceTimeOutMs, - int groupByTrimThreshold, int minGroupTrimSize) { + int groupByTrimThreshold, int minGroupTrimSize, int minInitialIndexedTableCapacity) { _executorService = executorService; _maxReduceThreadsPerQuery = maxReduceThreadsPerQuery; _reduceTimeOutMs = reduceTimeOutMs; _groupByTrimThreshold = groupByTrimThreshold; _minGroupTrimSize = minGroupTrimSize; + _minInitialIndexedTableCapacity = minInitialIndexedTableCapacity; } public ExecutorService getExecutorService() { @@ -69,4 +71,8 @@ public int getGroupByTrimThreshold() { public int getMinGroupTrimSize() { return _minGroupTrimSize; } + + public int getMinInitialIndexedTableCapacity() { + return _minInitialIndexedTableCapacity; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index 34395febfb4f..c82fd2058e77 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -232,7 +232,10 @@ private IndexedTable getIndexedTable(DataSchema dataSchema, Collection dataTables = new ArrayList<>(dataTablesToReduce); + int numDataTables = dataTables.size(); + assert numDataTables > 1; // Get the number of threads to use for reducing. // In case of single reduce thread, fall back to SimpleIndexedTable to avoid redundant locking/unlocking calls. @@ -257,28 +260,32 @@ private IndexedTable getIndexedTable(DataSchema dataSchema, Collection= GroupByCombineOperator.MAX_TRIM_THRESHOLD) { // special case of trim threshold where it is set to max value. // there won't be any trimming during upsert in this case. // thus we can avoid the overhead of read-lock and write-lock // in the upsert method. - indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, hasFinalInput, _queryContext, resultSize); + indexedTable = + new UnboundedConcurrentIndexedTable(dataSchema, hasFinalInput, _queryContext, resultSize, initialCapacity); } else { indexedTable = - new ConcurrentIndexedTable(dataSchema, hasFinalInput, _queryContext, resultSize, trimSize, trimThreshold); + new ConcurrentIndexedTable(dataSchema, hasFinalInput, _queryContext, resultSize, trimSize, trimThreshold, + initialCapacity); } } // Create groups of data tables that each thread can process concurrently. // Given that numReduceThreads is <= numDataTables, each group will have at least one data table. - ArrayList dataTables = new ArrayList<>(dataTablesToReduce); List> reduceGroups = new ArrayList<>(numReduceThreadsToUse); - for (int i = 0; i < numReduceThreadsToUse; i++) { reduceGroups.add(new ArrayList<>()); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java index 653498aadeff..8b61a97d554c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java @@ -80,18 +80,23 @@ public BrokerResponseNative reduceOnStreamResponse(BrokerRequest brokerRequest, Integer minGroupTrimSizeQueryOption = null; Integer groupTrimThresholdQueryOption = null; + Integer minInitialIndexedTableCapacityQueryOption = null; if (queryOptions != null) { minGroupTrimSizeQueryOption = QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions); groupTrimThresholdQueryOption = QueryOptionsUtils.getGroupTrimThreshold(queryOptions); + minInitialIndexedTableCapacityQueryOption = QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions); } int minGroupTrimSize = minGroupTrimSizeQueryOption != null ? minGroupTrimSizeQueryOption : _minGroupTrimSize; int groupTrimThreshold = groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption : _groupByTrimThreshold; + int minInitialIndexedTableCapacity = + minInitialIndexedTableCapacityQueryOption != null ? minInitialIndexedTableCapacityQueryOption + : _minInitialIndexedTableCapacity; // Process server response. DataTableReducerContext dataTableReducerContext = new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs, - groupTrimThreshold, minGroupTrimSize); + groupTrimThreshold, minGroupTrimSize, minInitialIndexedTableCapacity); StreamingReducer streamingReducer = ResultReducerFactory.getStreamingReducer(queryContext); streamingReducer.init(dataTableReducerContext); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java index 0aa233b43ea7..e1e3c37a8dfd 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java @@ -114,6 +114,8 @@ public class QueryContext { // The following properties apply to group-by queries // Maximum initial capacity of the group-by result holder private int _maxInitialResultHolderCapacity = InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY; + // Initial capacity of the indexed table + private int _minInitialIndexedTableCapacity = InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY; // Limit of number of groups stored in each segment private int _numGroupsLimit = InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT; // Minimum number of groups to keep per segment when trimming groups for SQL GROUP BY @@ -368,6 +370,14 @@ public void setMaxInitialResultHolderCapacity(int maxInitialResultHolderCapacity _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity; } + public int getMinInitialIndexedTableCapacity() { + return _minInitialIndexedTableCapacity; + } + + public void setMinInitialIndexedTableCapacity(int minInitialIndexedTableCapacity) { + _minInitialIndexedTableCapacity = minInitialIndexedTableCapacity; + } + public int getNumGroupsLimit() { return _numGroupsLimit; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java index e8551dab2cb4..d66d39a9572a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.core.util; +import org.apache.pinot.common.utils.HashUtil; + + public final class GroupByUtils { private GroupByUtils() { } @@ -41,4 +44,23 @@ public static int getTableCapacity(int limit, int minNumGroups) { long capacityByLimit = limit * 5L; return capacityByLimit > Integer.MAX_VALUE ? Integer.MAX_VALUE : Math.max((int) capacityByLimit, minNumGroups); } + + /** + * Returns the initial capacity of the indexed table required by the given query. + */ + public static int getIndexedTableInitialCapacity(int trimThreshold, int minNumGroups, int minCapacity) { + // The upper bound of the initial capacity is the capacity required by the trim threshold. The indexed table should + // never grow over this capacity. + int upperBound = HashUtil.getHashMapCapacity(trimThreshold); + if (minCapacity > upperBound) { + return upperBound; + } + // The lower bound of the initial capacity is the capacity required by the min number of groups to be added to the + // table. + int lowerBound = HashUtil.getHashMapCapacity(minNumGroups); + if (lowerBound > upperBound) { + return upperBound; + } + return Math.max(minCapacity, lowerBound); + } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java index 852e847d5abe..d7c7884f58ad 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java @@ -44,6 +44,7 @@ import org.apache.pinot.core.data.table.Record; import org.apache.pinot.core.data.table.SimpleIndexedTable; import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; @@ -329,7 +330,8 @@ public void testGetDataTableOOMGroupBy() }); List rows = DataBlockTestUtils.getRandomRows(dataSchema, NUM_ROWS, 0); IndexedTable indexedTable = - new SimpleIndexedTable(dataSchema, queryContext, NUM_ROWS, Integer.MAX_VALUE, Integer.MAX_VALUE); + new SimpleIndexedTable(dataSchema, queryContext, NUM_ROWS, Integer.MAX_VALUE, Integer.MAX_VALUE, + InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY); for (Object[] row : rows) { indexedTable.upsert(new Record(row)); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java index efbcbe0225be..d981428afd17 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeoutException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.testng.Assert; @@ -45,6 +46,7 @@ public class IndexedTableTest { private static final int TRIM_SIZE = 10; private static final int TRIM_THRESHOLD = 20; + private static final int INITIAL_CAPACITY = InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY; @Test public void testConcurrentIndexedTable() @@ -54,7 +56,8 @@ public void testConcurrentIndexedTable() DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)"}, new ColumnDataType[]{ ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE }); - IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD); + IndexedTable indexedTable = + new ConcurrentIndexedTable(dataSchema, queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY); // 3 threads upsert together // a inserted 6 times (60), b inserted 5 times (50), d inserted 2 times (20) @@ -127,15 +130,17 @@ public void testNonConcurrentIndexedTable(String orderBy, List survivors }); // Test SimpleIndexedTable - IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD); - IndexedTable mergeTable = new SimpleIndexedTable(dataSchema, queryContext, 10, TRIM_SIZE, TRIM_THRESHOLD); + IndexedTable indexedTable = + new SimpleIndexedTable(dataSchema, queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY); + IndexedTable mergeTable = + new SimpleIndexedTable(dataSchema, queryContext, 10, TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY); testNonConcurrent(indexedTable, mergeTable); indexedTable.finish(true); checkSurvivors(indexedTable, survivors); // Test ConcurrentIndexedTable - indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD); - mergeTable = new SimpleIndexedTable(dataSchema, queryContext, 10, TRIM_SIZE, TRIM_THRESHOLD); + indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY); + mergeTable = new SimpleIndexedTable(dataSchema, queryContext, 10, TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY); testNonConcurrent(indexedTable, mergeTable); indexedTable.finish(true); checkSurvivors(indexedTable, survivors); @@ -251,10 +256,11 @@ public void testNoMoreNewRecords() { ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE }); - IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD); + IndexedTable indexedTable = + new SimpleIndexedTable(dataSchema, queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY); testNoMoreNewRecordsInTable(indexedTable); - indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD); + indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY); testNoMoreNewRecordsInTable(indexedTable); } @@ -292,7 +298,7 @@ public void testAdaptiveTrimThreshold() { DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)"}, new ColumnDataType[]{ ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE }); - IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, queryContext, 5, 5, 6); + IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, queryContext, 5, 5, 6, INITIAL_CAPACITY); // Insert 7 records. Ensure that no trimming has been done since the trim threshold should adapt to be at least // twice the trim size to avoid excessive trimming @@ -335,7 +341,8 @@ public void testAdaptiveTrimThresholdMaxValue() { DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)"}, new ColumnDataType[]{ ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE }); - IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, queryContext, 1234567890, 1234567890, 1234567890); + IndexedTable indexedTable = + new SimpleIndexedTable(dataSchema, queryContext, 1234567890, 1234567890, 1234567890, INITIAL_CAPACITY); // If 2 * trimSize exceeds the max integer value, the trim threshold should be bounded to the max integer value Assert.assertEquals(indexedTable._trimThreshold, Integer.MAX_VALUE); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/util/GroupByUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/GroupByUtilsTest.java index 4370c43744ac..0ab98b65c5d6 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/util/GroupByUtilsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/util/GroupByUtilsTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.util; +import org.apache.pinot.common.utils.HashUtil; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -37,4 +38,18 @@ public void testGetTableCapacity() { assertEquals(GroupByUtils.getTableCapacity(100000000), 500000000); assertEquals(GroupByUtils.getTableCapacity(1000000000), Integer.MAX_VALUE); } + + @Test + public void testGetIndexedTableInitialCapacity() { + assertEquals(GroupByUtils.getIndexedTableInitialCapacity(Integer.MAX_VALUE, 10, 128), 128); + assertEquals(GroupByUtils.getIndexedTableInitialCapacity(Integer.MAX_VALUE, 100, 128), + HashUtil.getHashMapCapacity(100)); + assertEquals(GroupByUtils.getIndexedTableInitialCapacity(Integer.MAX_VALUE, 100, 256), 256); + assertEquals(GroupByUtils.getIndexedTableInitialCapacity(Integer.MAX_VALUE, 1000, 256), + HashUtil.getHashMapCapacity(1000)); + assertEquals(GroupByUtils.getIndexedTableInitialCapacity(100, 10, 128), 128); + assertEquals(GroupByUtils.getIndexedTableInitialCapacity(100, 10, 256), HashUtil.getHashMapCapacity(100)); + assertEquals(GroupByUtils.getIndexedTableInitialCapacity(100, 100, 256), HashUtil.getHashMapCapacity(100)); + assertEquals(GroupByUtils.getIndexedTableInitialCapacity(100, 1000, 256), HashUtil.getHashMapCapacity(100)); + } } diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java index f974aa854237..f5d0be1a0663 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java @@ -679,10 +679,10 @@ public void testNumGroupsLimit() { BrokerResponseNative brokerResponse = getBrokerResponse(query); assertFalse(brokerResponse.isNumGroupsLimitReached()); - brokerResponse = getBrokerResponse(query, - new InstancePlanMakerImplV2(1000, 1000, InstancePlanMakerImplV2.DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE, - InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE, - InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD)); + InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(); + planMaker.setMaxInitialResultHolderCapacity(1000); + planMaker.setNumGroupsLimit(1000); + brokerResponse = getBrokerResponse(query, planMaker); assertTrue(brokerResponse.isNumGroupsLimitReached()); } diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java index 591b5ffffa7e..62cbb0a3ef41 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java @@ -661,10 +661,10 @@ public void testNumGroupsLimit() { BrokerResponseNative brokerResponse = getBrokerResponse(query); assertFalse(brokerResponse.isNumGroupsLimitReached()); - brokerResponse = getBrokerResponse(query, - new InstancePlanMakerImplV2(1000, 1000, InstancePlanMakerImplV2.DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE, - InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE, - InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD)); + InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(); + planMaker.setMaxInitialResultHolderCapacity(1000); + planMaker.setNumGroupsLimit(1000); + brokerResponse = getBrokerResponse(query, planMaker); assertTrue(brokerResponse.isNumGroupsLimitReached()); } diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java index a2c74071c98d..6fcb909374e5 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java @@ -767,10 +767,10 @@ public void testNumGroupsLimit() { BrokerResponseNative brokerResponse = getBrokerResponse(query); assertFalse(brokerResponse.isNumGroupsLimitReached()); - brokerResponse = getBrokerResponse(query, - new InstancePlanMakerImplV2(1000, 1000, InstancePlanMakerImplV2.DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE, - InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE, - InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD)); + InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(); + planMaker.setMaxInitialResultHolderCapacity(1000); + planMaker.setNumGroupsLimit(1000); + brokerResponse = getBrokerResponse(query, planMaker); assertTrue(brokerResponse.isNumGroupsLimitReached()); } diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueQueriesTest.java index 6b876de3aefb..070ff531cb34 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueQueriesTest.java @@ -33,11 +33,11 @@ * Tests order by queries */ public class InterSegmentGroupByMultiValueQueriesTest extends BaseMultiValueQueriesTest { - private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER = - new InstancePlanMakerImplV2(InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, - InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, 1, - InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE, - InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD); + private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER = new InstancePlanMakerImplV2(); + + static { + TRIM_ENABLED_PLAN_MAKER.setMinSegmentGroupTrimSize(1); + } @Test(dataProvider = "groupByOrderByDataProvider") public void testGroupByOrderBy(String query, long expectedNumEntriesScannedPostFilter, diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueRawQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueRawQueriesTest.java index ec9e0d951f8d..a75bce325d48 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueRawQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueRawQueriesTest.java @@ -32,11 +32,11 @@ * Tests order by queries with MV RAW index */ public class InterSegmentGroupByMultiValueRawQueriesTest extends BaseMultiValueRawQueriesTest { - private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER = - new InstancePlanMakerImplV2(InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, - InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, 1, - InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE, - InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD); + private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER = new InstancePlanMakerImplV2(); + + static { + TRIM_ENABLED_PLAN_MAKER.setMinSegmentGroupTrimSize(1); + } @Test(dataProvider = "groupByOrderByDataProvider") public void testGroupByOrderBy(String query, long expectedNumEntriesScannedPostFilter, diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupBySingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupBySingleValueQueriesTest.java index 846fbdbb8c49..8cda3e16094f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupBySingleValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupBySingleValueQueriesTest.java @@ -34,11 +34,11 @@ * Tests order by queries */ public class InterSegmentGroupBySingleValueQueriesTest extends BaseSingleValueQueriesTest { - private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER = - new InstancePlanMakerImplV2(InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, - InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, 1, - InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE, - InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD); + private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER = new InstancePlanMakerImplV2(); + + static { + TRIM_ENABLED_PLAN_MAKER.setMinSegmentGroupTrimSize(1); + } @Test(dataProvider = "groupByOrderByDataProvider") public void testGroupByOrderBy(String query, long expectedNumEntriesScannedPostFilter, diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java index 6408fd8f31df..66d143746019 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java @@ -60,7 +60,7 @@ public class OfflineGRPCServerIntegrationTest extends BaseClusterIntegrationTest { private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(2); private static final DataTableReducerContext DATATABLE_REDUCER_CONTEXT = - new DataTableReducerContext(EXECUTOR_SERVICE, 2, 10000, 10000, 5000); + new DataTableReducerContext(EXECUTOR_SERVICE, 2, 10000, 10000, 5000, 128); @BeforeClass public void setUp() diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index d62887b7be08..0ca99b06ccd2 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -109,6 +109,8 @@ public class QueryRunner { private Integer _numGroupsLimit; @Nullable private Integer _maxInitialResultHolderCapacity; + @Nullable + private Integer _minInitialIndexedTableCapacity; // Join overflow settings @Nullable @@ -142,6 +144,10 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY); _maxInitialResultHolderCapacity = maxInitialGroupHolderCapacity != null ? Integer.parseInt(maxInitialGroupHolderCapacity) : null; + String minInitialIndexedTableCapacityStr = + config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY); + _minInitialIndexedTableCapacity = + minInitialIndexedTableCapacityStr != null ? Integer.parseInt(minInitialIndexedTableCapacityStr) : null; String maxRowsInJoinStr = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_ROWS_IN_JOIN); _maxRowsInJoin = maxRowsInJoinStr != null ? Integer.parseInt(maxRowsInJoinStr) : null; String joinOverflowModeStr = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE); @@ -319,6 +325,15 @@ private Map consolidateMetadata(Map customProper Integer.toString(maxInitialResultHolderCapacity)); } + Integer minInitialIndexedTableCapacity = QueryOptionsUtils.getMinInitialIndexedTableCapacity(opChainMetadata); + if (minInitialIndexedTableCapacity == null) { + minInitialIndexedTableCapacity = _minInitialIndexedTableCapacity; + } + if (minInitialIndexedTableCapacity != null) { + opChainMetadata.put(QueryOptionKey.MIN_INITIAL_INDEXED_TABLE_CAPACITY, + Integer.toString(minInitialIndexedTableCapacity)); + } + Integer maxRowsInJoin = QueryOptionsUtils.getMaxRowsInJoin(opChainMetadata); if (maxRowsInJoin == null) { maxRowsInJoin = _maxRowsInJoin; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 06c7184f4e43..1a6985084bd3 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -51,7 +51,7 @@ private CommonConstants() { public static final String DEFAULT_METRICS_FACTORY_CLASS_NAME = //"org.apache.pinot.plugin.metrics.compound.CompoundPinotMetricsFactory"; "org.apache.pinot.plugin.metrics.yammer.YammerMetricsFactory"; - //"org.apache.pinot.plugin.metrics.dropwizard.DropwizardMetricsFactory"; + //"org.apache.pinot.plugin.metrics.dropwizard.DropwizardMetricsFactory"; public static final String DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME = "org.apache.pinot.spi.eventlistener.query.NoOpBrokerQueryEventListener"; @@ -311,6 +311,9 @@ public static class Broker { public static final int DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD = 1_000_000; public static final String CONFIG_OF_BROKER_MIN_GROUP_TRIM_SIZE = "pinot.broker.min.group.trim.size"; public static final int DEFAULT_BROKER_MIN_GROUP_TRIM_SIZE = 5000; + public static final String CONFIG_OF_BROKER_MIN_INITIAL_INDEXED_TABLE_CAPACITY = + "pinot.broker.min.init.indexed.table.capacity"; + public static final int DEFAULT_BROKER_MIN_INITIAL_INDEXED_TABLE_CAPACITY = 128; // Configure the request handler type used by broker to handler inbound query request. // NOTE: the request handler type refers to the communication between Broker and Server. @@ -439,6 +442,7 @@ public static class QueryOptionKey { public static final String MULTI_STAGE_LEAF_LIMIT = "multiStageLeafLimit"; public static final String NUM_GROUPS_LIMIT = "numGroupsLimit"; public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = "maxInitialResultHolderCapacity"; + public static final String MIN_INITIAL_INDEXED_TABLE_CAPACITY = "minInitialIndexedTableCapacity"; public static final String GROUP_TRIM_THRESHOLD = "groupTrimThreshold"; public static final String STAGE_PARALLELISM = "stageParallelism"; @@ -685,6 +689,8 @@ public static class Server { "pinot.server.query.executor.num.groups.limit"; public static final String CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY = "pinot.server.query.executor.max.init.group.holder.capacity"; + public static final String CONFIG_OF_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY = + "pinot.server.query.executor.min.init.indexed.table.capacity"; public static final String CONFIG_OF_QUERY_EXECUTOR_OPCHAIN_EXECUTOR = "pinot.server.query.executor.multistage.executor";