Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,11 @@ private Pair<List<RexNode>, List<AggCall>> aggregateWithTrimming(
// \- Project([c, b])
// \- Filter(a > 1)
// \- Scan t
// Example 3: source=t | stats count(): no project added for count()
// Before: Aggregate(count)
// \- Scan t
// After: Aggregate(count)
// \- Scan t
Pair<List<RexNode>, List<AggCall>> resolved =
resolveAttributesForAggregation(groupExprList, aggExprList, context);
List<RexInputRef> trimmedRefs = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ public void testFilterAndAggPushDownExplain() throws IOException {
+ "| stats avg(age) AS avg_age by state, city"));
}

@Test
public void testCountAggPushDownExplain() throws IOException {
String expected = loadExpectedPlan("explain_count_agg_push.json");
assertJsonEqualsIgnoreId(
expected,
explainQueryToString("source=opensearch-sql_test_index_account | stats count() as cnt"));
}

@Test
public void testSortPushDownExplain() throws IOException {
String expected = loadExpectedPlan("explain_sort_push.json");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalAggregate(group=[{}], cnt=[COUNT()])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},cnt=COUNT())], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"cnt\":{\"value_count\":{\"field\":\"_index\"}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalAggregate(group=[{}], cnt=[COUNT()])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableAggregate(group=[{}], cnt=[COUNT()])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"root": {
"name": "ProjectOperator",
"description": {
"fields": "[cnt]"
},
"children": [{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"cnt\":{\"value_count\":{\"field\":\"_index\"}}}}, needClean=true, searchDone=false, pitId=*, cursorKeepAlive=null, searchAfter=null, searchResponse=null)"
},
"children": []
}]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.sql.SqlKind;
import org.immutables.value.Value;
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;

Expand All @@ -29,6 +30,11 @@ public void onMatch(RelOptRuleCall call) {
final LogicalProject project = call.rel(1);
final CalciteLogicalIndexScan scan = call.rel(2);
apply(call, aggregate, project, scan);
} else if (call.rels.length == 2) {
// case of count() without group-by
final LogicalAggregate aggregate = call.rel(0);
final CalciteLogicalIndexScan scan = call.rel(1);
apply(call, aggregate, null, scan);
} else {
throw new AssertionError(
String.format(
Expand All @@ -54,6 +60,7 @@ public interface Config extends RelRule.Config {
Config DEFAULT =
ImmutableOpenSearchAggregateIndexScanRule.Config.builder()
.build()
.withDescription("Agg-Project-TableScan")
.withOperandSupplier(
b0 ->
b0.operand(LogicalAggregate.class)
Expand All @@ -71,6 +78,28 @@ public interface Config extends RelRule.Config {
OpenSearchIndexScanRule
::noAggregatePushed))
.noInputs())));
Config COUNT_STAR =
ImmutableOpenSearchAggregateIndexScanRule.Config.builder()
.build()
.withDescription("Agg[count()]-TableScan")
.withOperandSupplier(
b0 ->
b0.operand(LogicalAggregate.class)
.predicate(
agg ->
agg.getGroupSet().isEmpty()
&& agg.getAggCallList().stream()
.allMatch(
call ->
call.getAggregation().kind == SqlKind.COUNT
&& call.getArgList().isEmpty()))
.oneInput(
b1 ->
b1.operand(CalciteLogicalIndexScan.class)
.predicate(
Predicate.not(OpenSearchIndexScanRule::isLimitPushed)
.and(OpenSearchIndexScanRule::noAggregatePushed))
.noInputs()));

@Override
default OpenSearchAggregateIndexScanRule toRule() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public class OpenSearchIndexRules {
OpenSearchFilterIndexScanRule.Config.DEFAULT.toRule();
private static final OpenSearchAggregateIndexScanRule AGGREGATE_INDEX_SCAN =
OpenSearchAggregateIndexScanRule.Config.DEFAULT.toRule();
private static final OpenSearchAggregateIndexScanRule COUNT_STAR_INDEX_SCAN =
OpenSearchAggregateIndexScanRule.Config.COUNT_STAR.toRule();
private static final OpenSearchLimitIndexScanRule LIMIT_INDEX_SCAN =
OpenSearchLimitIndexScanRule.Config.DEFAULT.toRule();
private static final OpenSearchSortIndexScanRule SORT_INDEX_SCAN =
Expand All @@ -26,6 +28,7 @@ public class OpenSearchIndexRules {
PROJECT_INDEX_SCAN,
FILTER_INDEX_SCAN,
AGGREGATE_INDEX_SCAN,
COUNT_STAR_INDEX_SCAN,
LIMIT_INDEX_SCAN,
SORT_INDEX_SCAN);

Expand Down
Loading