Skip to content

Commit

Permalink
Allow configurable initial capacity for IndexedTable
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Dec 7, 2024
1 parent 36ef6eb commit 9566078
Show file tree
Hide file tree
Showing 29 changed files with 264 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.utils;

import com.google.common.primitives.Ints;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;

Expand All @@ -44,9 +45,16 @@ public static int getMinHashSetSize(int expected) {
/**
* Returns a capacity that is sufficient to keep the map from being resized as long as it grows no larger than
* expectedSize and the load factor is >= its default (0.75).
* NOTE: Borrowed from Guava's Maps library {@code int capacity(int expectedSize)}.
*/
public static int getHashMapCapacity(int expectedSize) {
return (int) ((float) expectedSize / 0.75f + 1f);
if (expectedSize < 3) {
return expectedSize + 1;
}
if (expectedSize < Ints.MAX_POWER_OF_TWO) {
return (int) Math.ceil(expectedSize / 0.75);
}
return Integer.MAX_VALUE;
}

public static long compute(IntBuffer buff) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,20 @@ public static Integer getNumGroupsLimit(Map<String, String> queryOptions) {

@Nullable
public static Integer getMaxInitialResultHolderCapacity(Map<String, String> queryOptions) {
String maxInitResultCap = queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY);
return checkedParseIntPositive(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY, maxInitResultCap);
String maxInitialResultHolderCapacity = queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY);
return checkedParseIntPositive(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY, maxInitialResultHolderCapacity);
}

public static boolean optimizeMaxInitialResultHolderCapacityEnabled(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.OPTIMIZE_MAX_INITIAL_RESULT_HOLDER_CAPACITY));
}

@Nullable
public static Integer getMinInitialIndexedTableCapacity(Map<String, String> queryOptions) {
String minInitialIndexedTableCapacity = queryOptions.get(QueryOptionKey.MIN_INITIAL_INDEXED_TABLE_CAPACITY);
return checkedParseIntPositive(QueryOptionKey.MIN_INITIAL_INDEXED_TABLE_CAPACITY, minInitialIndexedTableCapacity);
}

@Nullable
private static Long checkedParseLongPositive(String optionName, @Nullable String optionValue) {
return checkedParseLong(optionName, optionValue, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ public class ConcurrentIndexedTable extends IndexedTable {
private final ReentrantReadWriteLock _readWriteLock = new ReentrantReadWriteLock();

public ConcurrentIndexedTable(DataSchema dataSchema, QueryContext queryContext, int resultSize, int trimSize,
int trimThreshold) {
this(dataSchema, false, queryContext, resultSize, trimSize, trimThreshold);
int trimThreshold, int initialCapacity) {
this(dataSchema, false, queryContext, resultSize, trimSize, trimThreshold, initialCapacity);
}

public ConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize,
int trimSize, int trimThreshold) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new ConcurrentHashMap<>());
int trimSize, int trimThreshold, int initialCapacity) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold,
new ConcurrentHashMap<>(initialCapacity));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
public class SimpleIndexedTable extends IndexedTable {

public SimpleIndexedTable(DataSchema dataSchema, QueryContext queryContext, int resultSize, int trimSize,
int trimThreshold) {
this(dataSchema, false, queryContext, resultSize, trimSize, trimThreshold);
int trimThreshold, int initialCapacity) {
this(dataSchema, false, queryContext, resultSize, trimSize, trimThreshold, initialCapacity);
}

public SimpleIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize,
int trimSize, int trimThreshold) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new HashMap<>());
int trimSize, int trimThreshold, int initialCapacity) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new HashMap<>(initialCapacity));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
*/
public class UnboundedConcurrentIndexedTable extends ConcurrentIndexedTable {

public UnboundedConcurrentIndexedTable(DataSchema dataSchema, QueryContext queryContext, int resultSize) {
this(dataSchema, false, queryContext, resultSize);
public UnboundedConcurrentIndexedTable(DataSchema dataSchema, QueryContext queryContext, int resultSize, int initialCapacity) {
this(dataSchema, false, queryContext, resultSize, initialCapacity);
}

public UnboundedConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext,
int resultSize) {
super(dataSchema, hasFinalInput, queryContext, resultSize, Integer.MAX_VALUE, Integer.MAX_VALUE);
int resultSize, int initialCapacity) {
super(dataSchema, hasFinalInput, queryContext, resultSize, Integer.MAX_VALUE, Integer.MAX_VALUE, initialCapacity);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ public Table getTable() {
return _table;
}

public int getNumGroups() {
assert _aggregationGroupByResult != null || _intermediateRecords != null :
"Should not call getNumGroups() on instance level results";
if (_aggregationGroupByResult != null) {
return _aggregationGroupByResult.getNumGroups();
} else {
return _intermediateRecords.size();
}
}

public boolean isNumGroupsLimitReached() {
return _numGroupsLimitReached;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,25 @@ protected void processSegments() {
synchronized (this) {
if (_indexedTable == null) {
DataSchema dataSchema = resultsBlock.getDataSchema();
int initialCapacity =
GroupByUtils.getIndexedTableInitialCapacity(_trimThreshold, resultsBlock.getNumGroups(),
_queryContext.getMinInitialIndexedTableCapacity());
// NOTE: Use trimSize as resultSize on server side.
if (_numTasks == 1) {
_indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold);
_indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold,
initialCapacity);
} else {
if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
// special case of trim threshold where it is set to max value.
// there won't be any trimming during upsert in this case.
// thus we can avoid the overhead of read-lock and write-lock
// in the upsert method.
_indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
_indexedTable =
new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, initialCapacity);
} else {
_indexedTable =
new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold);
new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold,
initialCapacity);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,19 +164,25 @@ public void processSegments() {
synchronized (this) {
if (_indexedTable == null) {
DataSchema dataSchema = resultsBlock.getDataSchema();
int initialCapacity =
GroupByUtils.getIndexedTableInitialCapacity(_trimThreshold, resultsBlock.getNumGroups(),
_queryContext.getMinInitialIndexedTableCapacity());
// NOTE: Use trimSize as resultSize on server side.
if (_numTasks == 1) {
_indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold);
_indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold,
initialCapacity);
} else {
if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
// special case of trim threshold where it is set to max value.
// there won't be any trimming during upsert in this case.
// thus we can avoid the overhead of read-lock and write-lock
// in the upsert method.
_indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
_indexedTable =
new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, initialCapacity);
} else {
_indexedTable =
new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold);
new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold,
initialCapacity);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {

public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = "max.init.group.holder.capacity";
public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000;
public static final String MIN_INITIAL_INDEXED_TABLE_CAPACITY_KEY = "min.init.indexed.table.capacity";
public static final int DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY = 128;
public static final String NUM_GROUPS_LIMIT_KEY = "num.groups.limit";
public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;

Expand All @@ -93,6 +95,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
private final FetchPlanner _fetchPlanner = FetchPlannerRegistry.getPlanner();
private int _maxExecutionThreads = DEFAULT_MAX_EXECUTION_THREADS;
private int _maxInitialResultHolderCapacity = DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
private int _minInitialIndexedTableCapacity = DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY;
// Limit on number of groups stored for each segment, beyond which no new group will be created
private int _numGroupsLimit = DEFAULT_NUM_GROUPS_LIMIT;
// Used for SQL GROUP BY (server combine)
Expand All @@ -103,25 +106,20 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
public InstancePlanMakerImplV2() {
}

@VisibleForTesting
public InstancePlanMakerImplV2(int maxInitialResultHolderCapacity, int numGroupsLimit, int minSegmentGroupTrimSize,
int minServerGroupTrimSize, int groupByTrimThreshold) {
_maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
_numGroupsLimit = numGroupsLimit;
_minSegmentGroupTrimSize = minSegmentGroupTrimSize;
_minServerGroupTrimSize = minServerGroupTrimSize;
_groupByTrimThreshold = groupByTrimThreshold;
}

@Override
public void init(PinotConfiguration queryExecutorConfig) {
_maxExecutionThreads = queryExecutorConfig.getProperty(MAX_EXECUTION_THREADS_KEY, DEFAULT_MAX_EXECUTION_THREADS);
_maxInitialResultHolderCapacity = queryExecutorConfig.getProperty(MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY,
DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
_minInitialIndexedTableCapacity = queryExecutorConfig.getProperty(MIN_INITIAL_INDEXED_TABLE_CAPACITY_KEY,
DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
_numGroupsLimit = queryExecutorConfig.getProperty(NUM_GROUPS_LIMIT_KEY, DEFAULT_NUM_GROUPS_LIMIT);
Preconditions.checkState(_maxInitialResultHolderCapacity <= _numGroupsLimit,
"Invalid configuration: maxInitialResultHolderCapacity: %d must be smaller or equal to numGroupsLimit: %d",
_maxInitialResultHolderCapacity, _numGroupsLimit);
Preconditions.checkState(_minInitialIndexedTableCapacity <= _numGroupsLimit,
"Invalid configuration: minInitialIndexedTableCapacity: %d must be smaller or equal to numGroupsLimit: %d",
_minInitialIndexedTableCapacity, _numGroupsLimit);
_minSegmentGroupTrimSize =
queryExecutorConfig.getProperty(MIN_SEGMENT_GROUP_TRIM_SIZE_KEY, DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE);
_minServerGroupTrimSize =
Expand All @@ -135,6 +133,36 @@ public void init(PinotConfiguration queryExecutorConfig) {
_minServerGroupTrimSize, _groupByTrimThreshold);
}

@VisibleForTesting
public void setMaxInitialResultHolderCapacity(int maxInitialResultHolderCapacity) {
_maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
}

@VisibleForTesting
public void setMinInitialIndexedTableCapacity(int minInitialIndexedTableCapacity) {
_minInitialIndexedTableCapacity = minInitialIndexedTableCapacity;
}

@VisibleForTesting
public void setNumGroupsLimit(int numGroupsLimit) {
_numGroupsLimit = numGroupsLimit;
}

@VisibleForTesting
public void setMinSegmentGroupTrimSize(int minSegmentGroupTrimSize) {
_minSegmentGroupTrimSize = minSegmentGroupTrimSize;
}

@VisibleForTesting
public void setMinServerGroupTrimSize(int minServerGroupTrimSize) {
_minServerGroupTrimSize = minServerGroupTrimSize;
}

@VisibleForTesting
public void setGroupByTrimThreshold(int groupByTrimThreshold) {
_groupByTrimThreshold = groupByTrimThreshold;
}

public Plan makeInstancePlan(List<SegmentContext> segmentContexts, QueryContext queryContext,
ExecutorService executorService, ServerMetrics serverMetrics) {
applyQueryOptions(queryContext);
Expand Down Expand Up @@ -196,12 +224,19 @@ private void applyQueryOptions(QueryContext queryContext) {
// Set group-by query options
if (QueryContextUtils.isAggregationQuery(queryContext) && queryContext.getGroupByExpressions() != null) {
// Set maxInitialResultHolderCapacity
Integer initResultCap = QueryOptionsUtils.getMaxInitialResultHolderCapacity(queryOptions);
if (initResultCap != null) {
queryContext.setMaxInitialResultHolderCapacity(initResultCap);
Integer maxInitialResultHolderCapacity = QueryOptionsUtils.getMaxInitialResultHolderCapacity(queryOptions);
if (maxInitialResultHolderCapacity != null) {
queryContext.setMaxInitialResultHolderCapacity(maxInitialResultHolderCapacity);
} else {
queryContext.setMaxInitialResultHolderCapacity(_maxInitialResultHolderCapacity);
}
// Set initialResultTableCapacity
Integer minInitialIndexedTableCapacity = QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
if (minInitialIndexedTableCapacity != null) {
queryContext.setMinInitialIndexedTableCapacity(minInitialIndexedTableCapacity);
} else {
queryContext.setMinInitialIndexedTableCapacity(_minInitialIndexedTableCapacity);
}
// Set numGroupsLimit
Integer numGroupsLimit = QueryOptionsUtils.getNumGroupsLimit(queryOptions);
if (numGroupsLimit != null) {
Expand Down Expand Up @@ -361,7 +396,8 @@ public static ExpressionContext overrideWithExpressionHints(ExpressionContext ex
.contains(overrideExpression.getIdentifier())) {
return overrideExpression;
}
expression.getFunction().getArguments()
expression.getFunction()
.getArguments()
.replaceAll(argument -> overrideWithExpressionHints(argument, indexSegment, expressionOverrideHints));
return expression;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public AggregationGroupByResult(GroupKeyGenerator groupKeyGenerator, Aggregation
_resultHolders = resultHolders;
}

public int getNumGroups() {
return _groupKeyGenerator.getNumKeys();
}

/**
* Returns an iterator of {@link GroupKeyGenerator.GroupKey}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public abstract class BaseReduceService {
protected final int _maxReduceThreadsPerQuery;
protected final int _groupByTrimThreshold;
protected final int _minGroupTrimSize;
protected final int _minInitialIndexedTableCapacity;

public BaseReduceService(PinotConfiguration config) {
_maxReduceThreadsPerQuery = config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
Expand All @@ -60,6 +61,9 @@ public BaseReduceService(PinotConfiguration config) {
CommonConstants.Broker.DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD);
_minGroupTrimSize = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_GROUP_TRIM_SIZE,
CommonConstants.Broker.DEFAULT_BROKER_MIN_GROUP_TRIM_SIZE);
_minInitialIndexedTableCapacity =
config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_INITIAL_INDEXED_TABLE_CAPACITY,
CommonConstants.Broker.DEFAULT_BROKER_MIN_INITIAL_INDEXED_TABLE_CAPACITY);

int numThreadsInExecutorService = Runtime.getRuntime().availableProcessors();
LOGGER.info("Initializing BrokerReduceService with {} threads, and {} max reduce threads.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,23 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke

Integer minGroupTrimSizeQueryOption = null;
Integer groupTrimThresholdQueryOption = null;
Integer minInitialIndexedTableCapacityQueryOption = null;
if (queryOptions != null) {
minGroupTrimSizeQueryOption = QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
groupTrimThresholdQueryOption = QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
minInitialIndexedTableCapacityQueryOption = QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
}
int minGroupTrimSize = minGroupTrimSizeQueryOption != null ? minGroupTrimSizeQueryOption : _minGroupTrimSize;
int groupTrimThreshold =
groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption : _groupByTrimThreshold;
int minInitialIndexedTableCapacity =
minInitialIndexedTableCapacityQueryOption != null ? minInitialIndexedTableCapacityQueryOption
: _minInitialIndexedTableCapacity;

try {
dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, dataTableMap, brokerResponseNative,
new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs,
groupTrimThreshold, minGroupTrimSize), brokerMetrics);
groupTrimThreshold, minGroupTrimSize, minInitialIndexedTableCapacity), brokerMetrics);
} catch (EarlyTerminationException e) {
brokerResponseNative.addException(
new QueryProcessingException(QueryException.QUERY_CANCELLATION_ERROR_CODE, e.toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class DataTableReducerContext {
// used for SQL GROUP BY
private final int _groupByTrimThreshold;
private final int _minGroupTrimSize;
private final int _minInitialIndexedTableCapacity;

/**
* Constructor for the class.
Expand All @@ -42,12 +43,13 @@ public class DataTableReducerContext {
* @param groupByTrimThreshold trim threshold for SQL group by
*/
public DataTableReducerContext(ExecutorService executorService, int maxReduceThreadsPerQuery, long reduceTimeOutMs,
int groupByTrimThreshold, int minGroupTrimSize) {
int groupByTrimThreshold, int minGroupTrimSize, int minInitialIndexedTableCapacity) {
_executorService = executorService;
_maxReduceThreadsPerQuery = maxReduceThreadsPerQuery;
_reduceTimeOutMs = reduceTimeOutMs;
_groupByTrimThreshold = groupByTrimThreshold;
_minGroupTrimSize = minGroupTrimSize;
_minInitialIndexedTableCapacity = minInitialIndexedTableCapacity;
}

public ExecutorService getExecutorService() {
Expand All @@ -69,4 +71,8 @@ public int getGroupByTrimThreshold() {
public int getMinGroupTrimSize() {
return _minGroupTrimSize;
}

public int getMinInitialIndexedTableCapacity() {
return _minInitialIndexedTableCapacity;
}
}
Loading

0 comments on commit 9566078

Please sign in to comment.