Skip to content

Commit

Permalink
Fix v1 query engine behavior for aggregations without group by where …
Browse files Browse the repository at this point in the history
…the limit is zero (#13564)
  • Loading branch information
yashmayya authored Dec 24, 2024
1 parent 73843b5 commit b398fed
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public List<Object> getResults() {

@Override
public int getNumRows() {
return 1;
return _queryContext.getLimit() == 0 ? 0 : 1;
}

@Override
Expand Down Expand Up @@ -108,6 +108,12 @@ public DataTable getDataTable()
ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
int numColumns = columnDataTypes.length;
DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema);

// For LIMIT 0 queries
if (_results.isEmpty()) {
return dataTableBuilder.build();
}

boolean returnFinalResult = _queryContext.isServerReturnFinalResult();
if (_queryContext.isNullHandlingEnabled()) {
RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public void mergeResultsBlocks(AggregationResultsBlock mergedBlock, AggregationR
List<Object> resultsToMerge = blockToMerge.getResults();
assert aggregationFunctions != null && mergedResults != null && resultsToMerge != null;

// Skip merging empty results (LIMIT 0 queries)
if (mergedBlock.getNumRows() == 0 && blockToMerge.getNumRows() == 0) {
return;
}

int numAggregationFunctions = aggregationFunctions.length;
for (int i = 0; i < numAggregationFunctions; i++) {
mergedResults.set(i, aggregationFunctions[i].merge(mergedResults.get(i), resultsToMerge.get(i)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.operator.query;

import java.util.Collections;
import java.util.List;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;


/**
* The <code>EmptyAggregationOperator</code> provides a way to short circuit aggregation only queries (no group by)
* with a LIMIT of zero.
*/
public class EmptyAggregationOperator extends BaseOperator<AggregationResultsBlock> {

private static final String EXPLAIN_NAME = "AGGREGATE_EMPTY";
private final QueryContext _queryContext;
private final ExecutionStatistics _executionStatistics;

public EmptyAggregationOperator(QueryContext queryContext, int numTotalDocs) {
_queryContext = queryContext;
_executionStatistics = new ExecutionStatistics(0, 0, 0, numTotalDocs);
}

@Override
protected AggregationResultsBlock getNextBlock() {
return new AggregationResultsBlock(_queryContext.getAggregationFunctions(), Collections.emptyList(), _queryContext);
}

@Override
public List<Operator> getChildOperators() {
return Collections.emptyList();
}

@Override
public String toExplainString() {
return EXPLAIN_NAME;
}

@Override
public ExecutionStatistics getExecutionStatistics() {
return _executionStatistics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
import org.apache.pinot.core.operator.filter.BaseFilterOperator;
import org.apache.pinot.core.operator.query.AggregationOperator;
import org.apache.pinot.core.operator.query.EmptyAggregationOperator;
import org.apache.pinot.core.operator.query.FastFilteredCountOperator;
import org.apache.pinot.core.operator.query.FilteredAggregationOperator;
import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
Expand Down Expand Up @@ -70,6 +71,11 @@ public AggregationPlanNode(SegmentContext segmentContext, QueryContext queryCont
@Override
public Operator<AggregationResultsBlock> run() {
assert _queryContext.getAggregationFunctions() != null;

if (_queryContext.getLimit() == 0) {
return new EmptyAggregationOperator(_queryContext, _indexSegment.getSegmentMetadata().getTotalDocs());
}

return _queryContext.hasFilteredAggregations() ? buildFilteredAggOperator() : buildNonFilteredAggOperator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3658,7 +3658,34 @@ public void testBooleanAggregation()
public void testGroupByAggregationWithLimitZero(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
testQuery("SELECT Origin, SUM(ArrDelay) FROM mytable GROUP BY Origin LIMIT 0");

String sqlQuery = "SELECT Origin, SUM(ArrDelay) FROM mytable GROUP BY Origin LIMIT 0";
JsonNode response = postQuery(sqlQuery);
assertTrue(response.get("exceptions").isEmpty());
JsonNode rows = response.get("resultTable").get("rows");
assertEquals(rows.size(), 0);

// Ensure data schema returned is accurate even if there are no rows returned
JsonNode columnDataTypes = response.get("resultTable").get("dataSchema").get("columnDataTypes");
assertEquals(columnDataTypes.size(), 2);
assertEquals(columnDataTypes.get(1).asText(), "DOUBLE");
}

@Test(dataProvider = "useBothQueryEngines")
public void testAggregationWithLimitZero(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);

String sqlQuery = "SELECT AVG(ArrDelay) FROM mytable LIMIT 0";
JsonNode response = postQuery(sqlQuery);
assertTrue(response.get("exceptions").isEmpty());
JsonNode rows = response.get("resultTable").get("rows");
assertEquals(rows.size(), 0);

// Ensure data schema returned is accurate even if there are no rows returned
JsonNode columnDataTypes = response.get("resultTable").get("dataSchema").get("columnDataTypes");
assertEquals(columnDataTypes.size(), 1);
assertEquals(columnDataTypes.get(0).asText(), "DOUBLE");
}

@Test(dataProvider = "useBothQueryEngines")
Expand Down

0 comments on commit b398fed

Please sign in to comment.