Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,25 @@ public void testExplainBinWithAligntime() throws IOException {
+ " head 5"));
}

@Test
public void testExplainCountEval() throws IOException {
String query =
"source=opensearch-sql_test_index_bank | stats count(eval(age > 30)) as mature_count";
var result = explainQueryToString(query);
String expected = loadExpectedPlan("explain_count_eval_push.json");
assertJsonEqualsIgnoreId(expected, result);
}

@Test
public void testExplainCountEvalComplex() throws IOException {
String query =
"source=opensearch-sql_test_index_bank | stats count(eval(age > 30 and age < 50)) as"
+ " mature_count";
var result = explainQueryToString(query);
String expected = loadExpectedPlan("explain_count_eval_complex_push.json");
assertJsonEqualsIgnoreId(expected, result);
}

public void testEventstatsDistinctCountExplain() throws IOException {
Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled());
String query =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalAggregate(group=[{}], mature_count=[COUNT($0)])\n LogicalProject($f1=[CASE(SEARCH($10, Sarg[(30..50)]), 1, null:NULL)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},mature_count=COUNT() FILTER $0)], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"mature_count\":{\"filter\":{\"range\":{\"age\":{\"from\":30.0,\"to\":50.0,\"include_lower\":false,\"include_upper\":false,\"boost\":1.0}}},\"aggregations\":{\"mature_count\":{\"value_count\":{\"field\":\"_index\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalAggregate(group=[{}], mature_count=[COUNT($0)])\n LogicalProject($f1=[CASE(>($10, 30), 1, null:NULL)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},mature_count=COUNT() FILTER $0)], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"mature_count\":{\"filter\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"aggregations\":{\"mature_count\":{\"value_count\":{\"field\":\"_index\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalAggregate(group=[{}], mature_count=[COUNT($0)])\n LogicalProject($f1=[CASE(SEARCH($10, Sarg[(30..50)]), 1, null:NULL)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableAggregate(group=[{}], mature_count=[COUNT() FILTER $0])\n EnumerableCalc(expr#0..18=[{inputs}], expr#19=[Sarg[(30..50)]], expr#20=[SEARCH($t10, $t19)], expr#21=[IS TRUE($t20)], $f1=[$t21])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalAggregate(group=[{}], mature_count=[COUNT($0)])\n LogicalProject($f1=[CASE(>($10, 30), 1, null:NULL)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableAggregate(group=[{}], mature_count=[COUNT() FILTER $0])\n EnumerableCalc(expr#0..18=[{inputs}], expr#19=[30], expr#20=[>($t10, $t19)], expr#21=[IS TRUE($t20)], $f1=[$t21])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.function.Predicate;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.sql.SqlKind;
Expand Down Expand Up @@ -65,10 +64,6 @@ public interface Config extends RelRule.Config {
.withOperandSupplier(
b0 ->
b0.operand(LogicalAggregate.class)
.predicate(
agg ->
// Cannot push down aggregation with inner filter
agg.getAggCallList().stream().noneMatch(AggregateCall::hasFilter))
.oneInput(
b1 ->
b1.operand(LogicalProject.class)
Expand Down Expand Up @@ -100,8 +95,7 @@ public interface Config extends RelRule.Config {
.allMatch(
call ->
call.getAggregation().kind == SqlKind.COUNT
&& call.getArgList().isEmpty()
&& !call.hasFilter()))
&& call.getArgList().isEmpty()))
.oneInput(
b1 ->
b1.operand(CalciteLogicalIndexScan.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ public static class ExpressionNotAnalyzableException extends Exception {
private AggregateAnalyzer() {}

@RequiredArgsConstructor
private static class AggregateBuilderHelper {
private final RelDataType rowType;
private final Map<String, ExprType> fieldTypes;
private final RelOptCluster cluster;
static class AggregateBuilderHelper {
final RelDataType rowType;
final Map<String, ExprType> fieldTypes;
final RelOptCluster cluster;

<T extends ValuesSourceAggregationBuilder<T>> T build(RexNode node, T aggBuilder) {
return build(node, aggBuilder::field, aggBuilder::script);
Expand Down Expand Up @@ -205,9 +205,11 @@ private static Pair<Builder, List<MetricParser>> processAggregateCalls(
List<String> aggFieldNames,
List<AggregateCall> aggCalls,
Project project,
AggregateBuilderHelper helper) {
AggregateBuilderHelper helper)
throws PredicateAnalyzer.ExpressionNotAnalyzableException {
Builder metricBuilder = new AggregatorFactories.Builder();
List<MetricParser> metricParserList = new ArrayList<>();
AggregateFilterAnalyzer aggFilterAnalyzer = new AggregateFilterAnalyzer(helper, project);

for (int i = 0; i < aggCalls.size(); i++) {
AggregateCall aggCall = aggCalls.get(i);
Expand All @@ -216,6 +218,7 @@ private static Pair<Builder, List<MetricParser>> processAggregateCalls(

Pair<AggregationBuilder, MetricParser> builderAndParser =
createAggregationBuilderAndParser(aggCall, args, aggFieldName, helper);
builderAndParser = aggFilterAnalyzer.analyze(builderAndParser, aggCall, aggFieldName);
metricBuilder.addAggregator(builderAndParser.getLeft());
metricParserList.add(builderAndParser.getRight());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.request;

import lombok.RequiredArgsConstructor;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexNode;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.sql.opensearch.request.PredicateAnalyzer.QueryExpression;
import org.opensearch.sql.opensearch.response.agg.FilterParser;
import org.opensearch.sql.opensearch.response.agg.MetricParser;

/** Analyzer for converting aggregate filter conditions into OpenSearch filter aggregations. */
@RequiredArgsConstructor
public class AggregateFilterAnalyzer {

/** Helper containing row type, field types, and cluster context for analysis. */
private final AggregateAnalyzer.AggregateBuilderHelper helper;

/** Project containing filter expressions referenced by aggregate calls. */
private final Project project;

/**
* Analyzes and applies filter to aggregation if the AggregateCall has a filter condition.
*
* @param aggResult the base aggregation and parser to potentially wrap with filter
* @param aggCall the aggregate call which may contain filter information
* @param aggFieldName name for the filtered aggregation
* @return wrapped aggregation with filter if present, otherwise the original result
* @throws PredicateAnalyzer.ExpressionNotAnalyzableException if filter condition cannot be
* analyzed
*/
public Pair<AggregationBuilder, MetricParser> analyze(
Pair<AggregationBuilder, MetricParser> aggResult, AggregateCall aggCall, String aggFieldName)
throws PredicateAnalyzer.ExpressionNotAnalyzableException {
if (project == null || !aggCall.hasFilter()) {
return aggResult;
}

QueryExpression queryExpression = analyzeAggregateFilter(aggCall);
return Pair.of(
buildFilterAggregation(aggResult.getLeft(), aggFieldName, queryExpression),
buildFilterParser(aggResult.getRight(), aggFieldName));
}

private QueryExpression analyzeAggregateFilter(AggregateCall aggCall)
throws PredicateAnalyzer.ExpressionNotAnalyzableException {
RexNode filterCondition = project.getProjects().get(aggCall.filterArg);
return PredicateAnalyzer.analyzeExpression(
filterCondition,
helper.rowType.getFieldNames(),
helper.fieldTypes,
helper.rowType,
helper.cluster);
}

private AggregationBuilder buildFilterAggregation(
AggregationBuilder aggBuilder, String aggFieldName, QueryExpression queryExpression) {
return AggregationBuilders.filter(aggFieldName, queryExpression.builder())
.subAggregation(aggBuilder);
}

private MetricParser buildFilterParser(MetricParser aggParser, String aggFieldName) {
return FilterParser.builder().name(aggFieldName).metricsParser(aggParser).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ private static boolean supportedRexCall(RexCall call) {
return true;
case POSTFIX:
switch (call.getKind()) {
case IS_TRUE:
case IS_NOT_NULL:
case IS_NULL:
return true;
Expand Down Expand Up @@ -559,12 +560,20 @@ private QueryExpression prefix(RexCall call) {
}

private QueryExpression postfix(RexCall call) {
checkArgument(call.getKind() == SqlKind.IS_NULL || call.getKind() == SqlKind.IS_NOT_NULL);
checkArgument(
call.getKind() == SqlKind.IS_TRUE
|| call.getKind() == SqlKind.IS_NULL
|| call.getKind() == SqlKind.IS_NOT_NULL);
if (call.getOperands().size() != 1) {
String message = format(Locale.ROOT, "Unsupported operator: [%s]", call);
throw new PredicateAnalyzerException(message);
}

if (call.getKind() == SqlKind.IS_TRUE) {
Expression qe = call.getOperands().get(0).accept(this);
return ((QueryExpression) qe).isTrue();
}

// OpenSearch DSL does not handle IS_NULL / IS_NOT_NULL on nested fields correctly
checkForNestedFieldOperands(call);

Expand Down Expand Up @@ -1381,7 +1390,8 @@ public QueryExpression multiMatch(

@Override
public QueryExpression isTrue() {
builder = termQuery(getFieldReferenceForTermQuery(), true);
// Ignore istrue if ISTRUE(predicate) and will support ISTRUE(field) later.
// builder = termQuery(getFieldReferenceForTermQuery(), true);
return this;
}

Expand Down
Loading
Loading