Skip to content

Commit

Permalink
Allow configurable initial capacity for IndexedTable (apache#14620)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored and davecromberge committed Dec 9, 2024
1 parent 78738a3 commit 49f7d95
Show file tree
Hide file tree
Showing 33 changed files with 410 additions and 291 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.utils;

import com.google.common.primitives.Ints;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;

Expand All @@ -44,9 +45,16 @@ public static int getMinHashSetSize(int expected) {
/**
* Returns a capacity that is sufficient to keep the map from being resized as long as it grows no larger than
* expectedSize and the load factor is >= its default (0.75).
* NOTE: Borrowed from Guava's Maps library {@code int capacity(int expectedSize)}.
*/
public static int getHashMapCapacity(int expectedSize) {
return (int) ((float) expectedSize / 0.75f + 1f);
if (expectedSize < 3) {
return expectedSize + 1;
}
if (expectedSize < Ints.MAX_POWER_OF_TWO) {
return (int) Math.ceil(expectedSize / 0.75);
}
return Integer.MAX_VALUE;
}

public static long compute(IntBuffer buff) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,12 @@ public static boolean optimizeMaxInitialResultHolderCapacityEnabled(Map<String,
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.OPTIMIZE_MAX_INITIAL_RESULT_HOLDER_CAPACITY));
}

@Nullable
public static Integer getMinInitialIndexedTableCapacity(Map<String, String> queryOptions) {
String minInitialIndexedTableCapacity = queryOptions.get(QueryOptionKey.MIN_INITIAL_INDEXED_TABLE_CAPACITY);
return checkedParseIntPositive(QueryOptionKey.MIN_INITIAL_INDEXED_TABLE_CAPACITY, minInitialIndexedTableCapacity);
}

public static boolean shouldDropResults(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.DROP_RESULTS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,10 @@ public class ConcurrentIndexedTable extends IndexedTable {
private final AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
private final ReentrantReadWriteLock _readWriteLock = new ReentrantReadWriteLock();

public ConcurrentIndexedTable(DataSchema dataSchema, QueryContext queryContext, int resultSize, int trimSize,
int trimThreshold) {
this(dataSchema, false, queryContext, resultSize, trimSize, trimThreshold);
}

public ConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize,
int trimSize, int trimThreshold) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new ConcurrentHashMap<>());
int trimSize, int trimThreshold, int initialCapacity) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold,
new ConcurrentHashMap<>(initialCapacity));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,17 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Base implementation of Map-based Table for indexed lookup
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public abstract class IndexedTable extends BaseTable {
private static final Logger LOGGER = LoggerFactory.getLogger(IndexedTable.class);

protected final Map<Key, Record> _lookupMap;
protected final boolean _hasFinalInput;
protected final int _resultSize;
Expand Down Expand Up @@ -83,31 +78,12 @@ protected IndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContex
assert groupByExpressions != null;
_numKeyColumns = groupByExpressions.size();
_aggregationFunctions = queryContext.getAggregationFunctions();
List<OrderByExpressionContext> orderByExpressions = queryContext.getOrderByExpressions();
if (orderByExpressions != null) {
// GROUP BY with ORDER BY
_hasOrderBy = true;
_tableResizer = new TableResizer(dataSchema, hasFinalInput, queryContext);
_trimSize = trimSize;
// trimThreshold is lower bounded by (2 * trimSize) in order to avoid excessive trimming. We don't modify trimSize
// in order to maintain the desired accuracy
if (trimSize > trimThreshold / 2) {
// Handle potential overflow
_trimThreshold = (2 * trimSize) > 0 ? 2 * trimSize : Integer.MAX_VALUE;
LOGGER.debug("Overriding group trim threshold to {}, since the configured value {} is less than twice the "
+ "trim size ({})", _trimThreshold, trimThreshold, trimSize);
} else {
_trimThreshold = trimThreshold;
}
} else {
// GROUP BY without ORDER BY
// NOTE: The indexed table stops accepting records once the map size reaches resultSize, and there is no
// resize/trim during upsert.
_hasOrderBy = false;
_tableResizer = null;
_trimSize = Integer.MAX_VALUE;
_trimThreshold = Integer.MAX_VALUE;
}
_hasOrderBy = queryContext.getOrderByExpressions() != null;
_tableResizer = _hasOrderBy ? new TableResizer(dataSchema, hasFinalInput, queryContext) : null;
// NOTE: Trim should be disabled when there is no ORDER BY
assert _hasOrderBy || (trimSize == Integer.MAX_VALUE && trimThreshold == Integer.MAX_VALUE);
_trimSize = trimSize;
_trimThreshold = trimThreshold;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,9 @@
@NotThreadSafe
public class SimpleIndexedTable extends IndexedTable {

public SimpleIndexedTable(DataSchema dataSchema, QueryContext queryContext, int resultSize, int trimSize,
int trimThreshold) {
this(dataSchema, false, queryContext, resultSize, trimSize, trimThreshold);
}

public SimpleIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize,
int trimSize, int trimThreshold) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new HashMap<>());
int trimSize, int trimThreshold, int initialCapacity) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new HashMap<>(initialCapacity));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,9 @@
*/
public class UnboundedConcurrentIndexedTable extends ConcurrentIndexedTable {

public UnboundedConcurrentIndexedTable(DataSchema dataSchema, QueryContext queryContext, int resultSize) {
this(dataSchema, false, queryContext, resultSize);
}

public UnboundedConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext,
int resultSize) {
super(dataSchema, hasFinalInput, queryContext, resultSize, Integer.MAX_VALUE, Integer.MAX_VALUE);
int resultSize, int initialCapacity) {
super(dataSchema, hasFinalInput, queryContext, resultSize, Integer.MAX_VALUE, Integer.MAX_VALUE, initialCapacity);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ public Table getTable() {
return _table;
}

public int getNumGroups() {
assert _aggregationGroupByResult != null || _intermediateRecords != null
: "Should not call getNumGroups() on instance level results";
if (_aggregationGroupByResult != null) {
return _aggregationGroupByResult.getNumGroups();
} else {
return _intermediateRecords.size();
}
}

public boolean isNumGroupsLimitReached() {
return _numGroupsLimitReached;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,11 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
import org.apache.pinot.core.data.table.IndexedTable;
import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.SimpleIndexedTable;
import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
Expand All @@ -61,8 +57,6 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group
private static final Logger LOGGER = LoggerFactory.getLogger(GroupByCombineOperator.class);
private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY";

private final int _trimSize;
private final int _trimThreshold;
private final int _numAggregationFunctions;
private final int _numGroupByExpressions;
private final int _numColumns;
Expand All @@ -76,25 +70,6 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group
public GroupByCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService) {
super(null, operators, overrideMaxExecutionThreads(queryContext, operators.size()), executorService);

int minTrimSize = queryContext.getMinServerGroupTrimSize();
if (minTrimSize > 0) {
int limit = queryContext.getLimit();
if ((!queryContext.isServerReturnFinalResult() && queryContext.getOrderByExpressions() != null)
|| queryContext.getHavingFilter() != null) {
_trimSize = GroupByUtils.getTableCapacity(limit, minTrimSize);
} else {
// TODO: Keeping only 'LIMIT' groups can cause inaccurate result because the groups are randomly selected
// without ordering. Consider ordering on group-by columns if no ordering is specified.
_trimSize = limit;
}
int trimThreshold = queryContext.getGroupTrimThreshold();
_trimThreshold = trimThreshold > 0 ? trimThreshold : Integer.MAX_VALUE;
} else {
// Server trim is disabled
_trimSize = Integer.MAX_VALUE;
_trimThreshold = Integer.MAX_VALUE;
}

AggregationFunction[] aggregationFunctions = _queryContext.getAggregationFunctions();
assert aggregationFunctions != null;
_numAggregationFunctions = aggregationFunctions.length;
Expand Down Expand Up @@ -136,22 +111,7 @@ protected void processSegments() {
if (_indexedTable == null) {
synchronized (this) {
if (_indexedTable == null) {
DataSchema dataSchema = resultsBlock.getDataSchema();
// NOTE: Use trimSize as resultSize on server side.
if (_numTasks == 1) {
_indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold);
} else {
if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
// special case of trim threshold where it is set to max value.
// there won't be any trimming during upsert in this case.
// thus we can avoid the overhead of read-lock and write-lock
// in the upsert method.
_indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
} else {
_indexedTable =
new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold);
}
}
_indexedTable = GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext, _numTasks);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
import org.apache.pinot.core.data.table.IndexedTable;
import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.SimpleIndexedTable;
import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
Expand Down Expand Up @@ -66,8 +62,6 @@ public class StreamingGroupByCombineOperator extends BaseStreamingCombineOperato
private static final Logger LOGGER = LoggerFactory.getLogger(StreamingGroupByCombineOperator.class);
private static final String EXPLAIN_NAME = "STREAMING_COMBINE_GROUP_BY";

private final int _trimSize;
private final int _trimThreshold;
private final int _numAggregationFunctions;
private final int _numGroupByExpressions;
private final int _numColumns;
Expand All @@ -83,24 +77,6 @@ public StreamingGroupByCombineOperator(List<Operator> operators, QueryContext qu
ExecutorService executorService) {
super(null, operators, overrideMaxExecutionThreads(queryContext, operators.size()), executorService);

int minTrimSize = queryContext.getMinServerGroupTrimSize();
if (minTrimSize > 0) {
int limit = queryContext.getLimit();
if ((!queryContext.isServerReturnFinalResult() && queryContext.getOrderByExpressions() != null)
|| queryContext.getHavingFilter() != null) {
_trimSize = GroupByUtils.getTableCapacity(limit, minTrimSize);
} else {
// TODO: Keeping only 'LIMIT' groups can cause inaccurate result because the groups are randomly selected
// without ordering. Consider ordering on group-by columns if no ordering is specified.
_trimSize = limit;
}
_trimThreshold = queryContext.getGroupTrimThreshold();
} else {
// Server trim is disabled
_trimSize = Integer.MAX_VALUE;
_trimThreshold = Integer.MAX_VALUE;
}

AggregationFunction[] aggregationFunctions = _queryContext.getAggregationFunctions();
assert aggregationFunctions != null;
_numAggregationFunctions = aggregationFunctions.length;
Expand Down Expand Up @@ -163,22 +139,7 @@ public void processSegments() {
if (_indexedTable == null) {
synchronized (this) {
if (_indexedTable == null) {
DataSchema dataSchema = resultsBlock.getDataSchema();
// NOTE: Use trimSize as resultSize on server side.
if (_numTasks == 1) {
_indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold);
} else {
if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
// special case of trim threshold where it is set to max value.
// there won't be any trimming during upsert in this case.
// thus we can avoid the overhead of read-lock and write-lock
// in the upsert method.
_indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
} else {
_indexedTable =
new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold);
}
}
_indexedTable = GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext, _numTasks);
}
}
}
Expand Down
Loading

0 comments on commit 49f7d95

Please sign in to comment.