diff --git a/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/CliExplainIT.java b/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/CliExplainIT.java index b7d0649943116..8c152fb930105 100644 --- a/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/CliExplainIT.java +++ b/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/CliExplainIT.java @@ -20,7 +20,7 @@ public void testExplainBasic() throws IOException { assertThat(readLine(), startsWith("----------")); assertThat(readLine(), startsWith("With[{}]")); assertThat(readLine(), startsWith("\\_Project[[?*]]")); - assertThat(readLine(), startsWith(" \\_UnresolvedRelation[[][index=test],null,Unknown index [test]]")); + assertThat(readLine(), startsWith(" \\_UnresolvedRelation[test]")); assertEquals("", readLine()); assertThat(command("EXPLAIN " + (randomBoolean() ? "" : "(PLAN ANALYZED) ") + "SELECT * FROM test"), containsString("plan")); @@ -64,22 +64,22 @@ public void testExplainWithWhere() throws IOException { assertThat(readLine(), startsWith("----------")); assertThat(readLine(), startsWith("With[{}]")); assertThat(readLine(), startsWith("\\_Project[[?*]]")); - assertThat(readLine(), startsWith(" \\_Filter[i = 2#")); - assertThat(readLine(), startsWith(" \\_UnresolvedRelation[[][index=test],null,Unknown index [test]]")); + assertThat(readLine(), startsWith(" \\_Filter[Equals[?i")); + assertThat(readLine(), startsWith(" \\_UnresolvedRelation[test]")); assertEquals("", readLine()); assertThat(command("EXPLAIN " + (randomBoolean() ? "" : "(PLAN ANALYZED) ") + "SELECT * FROM test WHERE i = 2"), containsString("plan")); assertThat(readLine(), startsWith("----------")); assertThat(readLine(), startsWith("Project[[i{f}#")); - assertThat(readLine(), startsWith("\\_Filter[i = 2#")); + assertThat(readLine(), startsWith("\\_Filter[Equals[i")); assertThat(readLine(), startsWith(" \\_EsRelation[test][i{f}#")); assertEquals("", readLine()); assertThat(command("EXPLAIN (PLAN OPTIMIZED) SELECT * FROM test WHERE i = 2"), containsString("plan")); assertThat(readLine(), startsWith("----------")); assertThat(readLine(), startsWith("Project[[i{f}#")); - assertThat(readLine(), startsWith("\\_Filter[i = 2#")); + assertThat(readLine(), startsWith("\\_Filter[Equals[i")); assertThat(readLine(), startsWith(" \\_EsRelation[test][i{f}#")); assertEquals("", readLine()); @@ -123,20 +123,20 @@ public void testExplainWithCount() throws IOException { assertThat(command("EXPLAIN (PLAN PARSED) SELECT COUNT(*) FROM test"), containsString("plan")); assertThat(readLine(), startsWith("----------")); assertThat(readLine(), startsWith("With[{}]")); - assertThat(readLine(), startsWith("\\_Project[[?COUNT(*)]]")); - assertThat(readLine(), startsWith(" \\_UnresolvedRelation[[][index=test],null,Unknown index [test]]")); + assertThat(readLine(), startsWith("\\_Project[[?COUNT[?*]]]")); + assertThat(readLine(), startsWith(" \\_UnresolvedRelation[test]")); assertEquals("", readLine()); assertThat(command("EXPLAIN " + (randomBoolean() ? "" : "(PLAN ANALYZED) ") + "SELECT COUNT(*) FROM test"), containsString("plan")); assertThat(readLine(), startsWith("----------")); - assertThat(readLine(), startsWith("Aggregate[[],[COUNT(*)#")); + assertThat(readLine(), startsWith("Aggregate[[],[Count[*=1")); assertThat(readLine(), startsWith("\\_EsRelation[test][i{f}#")); assertEquals("", readLine()); assertThat(command("EXPLAIN (PLAN OPTIMIZED) SELECT COUNT(*) FROM test"), containsString("plan")); assertThat(readLine(), startsWith("----------")); - assertThat(readLine(), startsWith("Aggregate[[],[COUNT(*)#")); + assertThat(readLine(), startsWith("Aggregate[[],[Count[*=1")); assertThat(readLine(), startsWith("\\_EsRelation[test][i{f}#")); assertEquals("", readLine()); diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SqlSpecTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SqlSpecTestCase.java index d5e720cae8614..e8ba7eb30b048 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SqlSpecTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SqlSpecTestCase.java @@ -38,6 +38,7 @@ public static List readScriptSpec() throws Exception { tests.addAll(readScriptSpec("/datetime.sql-spec", parser)); tests.addAll(readScriptSpec("/math.sql-spec", parser)); tests.addAll(readScriptSpec("/agg.sql-spec", parser)); + tests.addAll(readScriptSpec("/agg-ordering.sql-spec", parser)); tests.addAll(readScriptSpec("/arithmetic.sql-spec", parser)); tests.addAll(readScriptSpec("/string-functions.sql-spec", parser)); tests.addAll(readScriptSpec("/case-functions.sql-spec", parser)); diff --git a/x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec b/x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec new file mode 100644 index 0000000000000..cdce1f8616757 --- /dev/null +++ b/x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec @@ -0,0 +1,51 @@ +// +// Custom sorting/ordering on aggregates +// + +countWithImplicitGroupBy +SELECT MAX(salary) AS m FROM test_emp ORDER BY COUNT(*); + +countWithImplicitGroupByWithHaving +SELECT MAX(salary) AS m FROM test_emp HAVING MIN(salary) > 1 ORDER BY COUNT(*); + +countAndMaxWithImplicitGroupBy +SELECT MAX(salary) AS m FROM test_emp ORDER BY MAX(salary), COUNT(*); + +maxWithAliasWithImplicitGroupBy +SELECT MAX(salary) AS m FROM test_emp ORDER BY m; + +maxWithAliasWithImplicitGroupByAndHaving +SELECT MAX(salary) AS m FROM test_emp HAVING COUNT(*) > 1 ORDER BY m; + +aggWithoutAlias +SELECT MAX(salary) AS max FROM test_emp GROUP BY gender ORDER BY MAX(salary); + +aggWithAlias +SELECT MAX(salary) AS m FROM test_emp GROUP BY gender ORDER BY m; + +multipleAggsThatGetRewrittenWithoutAlias +SELECT MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY gender ORDER BY MAX(salary); + +multipleAggsThatGetRewrittenWithAlias +SELECT MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY gender ORDER BY max; + +aggNotSpecifiedInTheAggregate +SELECT MIN(salary) AS min, COUNT(*) AS c FROM test_emp GROUP BY gender ORDER BY MAX(salary); + +aggNotSpecifiedInTheAggregateWithHaving +SELECT MIN(salary) AS min, COUNT(*) AS c FROM test_emp GROUP BY gender HAVING c > 1 ORDER BY MAX(salary); + +multipleAggsThatGetRewrittenWithAliasOnAMediumGroupBy +SELECT languages, MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY languages ORDER BY max; + +multipleAggsThatGetRewrittenWithAliasOnALargeGroupBy +SELECT emp_no, MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY emp_no ORDER BY max; + +multipleAggsThatGetRewrittenWithAliasOnAMediumGroupByWithHaving +SELECT languages, MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY languages HAVING min BETWEEN 1000 AND 99999 ORDER BY max; + +aggNotSpecifiedInTheAggregatemultipleAggsThatGetRewrittenWithAliasOnALargeGroupBy +SELECT emp_no, MIN(salary) AS min FROM test_emp GROUP BY emp_no ORDER BY MAX(salary); + +aggNotSpecifiedWithHavingOnLargeGroupBy +SELECT MAX(salary) AS max FROM test_emp GROUP BY emp_no HAVING AVG(salary) > 1000 ORDER BY MIN(salary); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/analysis/analyzer/Analyzer.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/analysis/analyzer/Analyzer.java index f0137a250cc79..d5e50a0e9cdef 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/analysis/analyzer/Analyzer.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/analysis/analyzer/Analyzer.java @@ -52,6 +52,7 @@ import org.elasticsearch.xpack.sql.type.DataTypes; import org.elasticsearch.xpack.sql.type.InvalidMappedField; import org.elasticsearch.xpack.sql.type.UnsupportedEsField; +import org.elasticsearch.xpack.sql.util.CollectionUtils; import java.util.ArrayList; import java.util.Arrays; @@ -62,6 +63,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -106,7 +108,8 @@ protected Iterable.Batch> batches() { new ResolveFunctions(), new ResolveAliases(), new ProjectedAggregations(), - new ResolveAggsInHaving() + new ResolveAggsInHaving(), + new ResolveAggsInOrderBy() //new ImplicitCasting() ); Batch finish = new Batch("Finish Analysis", @@ -926,7 +929,7 @@ protected LogicalPlan rule(Project p) { // Handle aggs in HAVING. To help folding any aggs not found in Aggregation // will be pushed down to the Aggregate and then projected. This also simplifies the Verifier's job. // - private class ResolveAggsInHaving extends AnalyzeRule { + private class ResolveAggsInHaving extends AnalyzeRule { @Override protected boolean skipResolved() { @@ -934,54 +937,49 @@ protected boolean skipResolved() { } @Override - protected LogicalPlan rule(LogicalPlan plan) { + protected LogicalPlan rule(Filter f) { // HAVING = Filter followed by an Agg - if (plan instanceof Filter) { - Filter f = (Filter) plan; - if (f.child() instanceof Aggregate && f.child().resolved()) { - Aggregate agg = (Aggregate) f.child(); + if (f.child() instanceof Aggregate && f.child().resolved()) { + Aggregate agg = (Aggregate) f.child(); - Set missing = null; - Expression condition = f.condition(); + Set missing = null; + Expression condition = f.condition(); - // the condition might contain an agg (AVG(salary)) that could have been resolved - // (salary cannot be pushed down to Aggregate since there's no grouping and thus the function wasn't resolved either) + // the condition might contain an agg (AVG(salary)) that could have been resolved + // (salary cannot be pushed down to Aggregate since there's no grouping and thus the function wasn't resolved either) - // so try resolving the condition in one go through a 'dummy' aggregate - if (!condition.resolved()) { - // that's why try to resolve the condition - Aggregate tryResolvingCondition = new Aggregate(agg.source(), agg.child(), agg.groupings(), - combine(agg.aggregates(), new Alias(f.source(), ".having", condition))); + // so try resolving the condition in one go through a 'dummy' aggregate + if (!condition.resolved()) { + // that's why try to resolve the condition + Aggregate tryResolvingCondition = new Aggregate(agg.source(), agg.child(), agg.groupings(), + combine(agg.aggregates(), new Alias(f.source(), ".having", condition))); - tryResolvingCondition = (Aggregate) analyze(tryResolvingCondition, false); + tryResolvingCondition = (Aggregate) analyze(tryResolvingCondition, false); - // if it got resolved - if (tryResolvingCondition.resolved()) { - // replace the condition with the resolved one - condition = ((Alias) tryResolvingCondition.aggregates() - .get(tryResolvingCondition.aggregates().size() - 1)).child(); - } else { - // else bail out - return plan; - } + // if it got resolved + if (tryResolvingCondition.resolved()) { + // replace the condition with the resolved one + condition = ((Alias) tryResolvingCondition.aggregates() + .get(tryResolvingCondition.aggregates().size() - 1)).child(); + } else { + // else bail out + return f; } + } - missing = findMissingAggregate(agg, condition); - - if (!missing.isEmpty()) { - Aggregate newAgg = new Aggregate(agg.source(), agg.child(), agg.groupings(), - combine(agg.aggregates(), missing)); - Filter newFilter = new Filter(f.source(), newAgg, condition); - // preserve old output - return new Project(f.source(), newFilter, f.output()); - } + missing = findMissingAggregate(agg, condition); - return new Filter(f.source(), f.child(), condition); + if (!missing.isEmpty()) { + Aggregate newAgg = new Aggregate(agg.source(), agg.child(), agg.groupings(), + combine(agg.aggregates(), missing)); + Filter newFilter = new Filter(f.source(), newAgg, condition); + // preserve old output + return new Project(f.source(), newFilter, f.output()); } - return plan; - } - return plan; + return new Filter(f.source(), f.child(), condition); + } + return f; } private Set findMissingAggregate(Aggregate target, Expression from) { @@ -1001,6 +999,67 @@ private Set findMissingAggregate(Aggregate target, Expression f } } + + // + // Handle aggs in ORDER BY. To help folding any aggs not found in Aggregation + // will be pushed down to the Aggregate and then projected. This also simplifies the Verifier's job. + // Similar to Having however using a different matching pattern since HAVING is always Filter with Agg, + // while an OrderBy can have multiple intermediate nodes (Filter,Project, etc...) + // + private static class ResolveAggsInOrderBy extends AnalyzeRule { + + @Override + protected boolean skipResolved() { + return false; + } + + @Override + protected LogicalPlan rule(OrderBy ob) { + List orders = ob.order(); + + // 1. collect aggs inside an order by + List aggs = new ArrayList<>(); + for (Order order : orders) { + if (Functions.isAggregate(order.child())) { + aggs.add(Expressions.wrapAsNamed(order.child())); + } + } + if (aggs.isEmpty()) { + return ob; + } + + // 2. find first Aggregate child and update it + + final AtomicBoolean found = new AtomicBoolean(false); + + LogicalPlan plan = ob.transformDown(a -> { + if (found.get() == false) { + found.set(true); + + List missing = new ArrayList<>(); + + for (NamedExpression orderedAgg : aggs) { + if (Expressions.anyMatch(a.aggregates(), e -> Expressions.equalsAsAttribute(e, orderedAgg)) == false) { + missing.add(orderedAgg); + } + } + // agg already contains all aggs + if (missing.isEmpty() == false) { + // save aggregates + return new Aggregate(a.source(), a.child(), a.groupings(), CollectionUtils.combine(a.aggregates(), missing)); + } + } + return a; + }, Aggregate.class); + + // if the plan was updated, project the initial aggregates + if (plan != ob) { + return new Project(ob.source(), plan, ob.output()); + } + return ob; + } + } + private class PruneDuplicateFunctions extends AnalyzeRule { @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/analysis/analyzer/Verifier.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/analysis/analyzer/Verifier.java index 3f363d5a92809..43e30af24ab46 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/analysis/analyzer/Verifier.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/analysis/analyzer/Verifier.java @@ -50,8 +50,8 @@ import java.util.Set; import java.util.function.Consumer; -import static java.lang.String.format; import static java.util.stream.Collectors.toMap; +import static org.elasticsearch.common.logging.LoggerMessageFormat.format; import static org.elasticsearch.xpack.sql.stats.FeatureMetric.COMMAND; import static org.elasticsearch.xpack.sql.stats.FeatureMetric.GROUPBY; import static org.elasticsearch.xpack.sql.stats.FeatureMetric.HAVING; @@ -114,7 +114,7 @@ public String toString() { } private static Failure fail(Node source, String message, Object... args) { - return new Failure(source, format(Locale.ROOT, message, args)); + return new Failure(source, format(message, args)); } public Map, String> verifyFailures(LogicalPlan plan) { @@ -310,11 +310,31 @@ private static boolean checkGroupByOrder(LogicalPlan p, Set localFailur Aggregate a = (Aggregate) child; Map> missing = new LinkedHashMap<>(); + + // track aggs and non-aggs - to keep the final modifier, use an array + final Expression[] aggAndNonAgg = new Expression[2]; + o.order().forEach(oe -> { Expression e = oe.child(); - // cannot order by aggregates (not supported by composite) - if (Functions.isAggregate(e)) { - missing.put(e, oe); + + if (Functions.isAggregate(e) || e instanceof AggregateFunctionAttribute) { + if (aggAndNonAgg[0] != null) { + return; + } else { + aggAndNonAgg[0] = e; + if (aggAndNonAgg[1] == null) { + return; + } + } + } else { + aggAndNonAgg[1] = e; + } + + if (aggAndNonAgg[0] != null && aggAndNonAgg[1] != null) { + localFailures.add(fail(oe, + "Cannot order by aggregated [{}] and non-aggregated [{}] columns at the same time; " + + "use either one or the other", + Expressions.name(aggAndNonAgg[0]), Expressions.name(aggAndNonAgg[1]))); return; } @@ -348,7 +368,7 @@ private static boolean checkGroupByOrder(LogicalPlan p, Set localFailur String plural = missing.size() > 1 ? "s" : StringUtils.EMPTY; // get the location of the first missing expression as the order by might be on a different line localFailures.add( - fail(missing.values().iterator().next(), "Cannot order by non-grouped column" + plural + " %s, expected %s", + fail(missing.values().iterator().next(), "Cannot order by non-grouped column" + plural + " {}, expected {}", Expressions.names(missing.keySet()), Expressions.names(a.groupings()))); groupingFailures.add(a); @@ -374,7 +394,7 @@ private static boolean checkGroupByHaving(LogicalPlan p, Set localFailu if (!missing.isEmpty()) { String plural = missing.size() > 1 ? "s" : StringUtils.EMPTY; localFailures.add( - fail(condition, "Cannot use HAVING filter on non-aggregate" + plural + " %s; use WHERE instead", + fail(condition, "Cannot use HAVING filter on non-aggregate" + plural + " {}; use WHERE instead", Expressions.names(missing.keySet()))); groupingFailures.add(a); return false; @@ -456,7 +476,7 @@ private static boolean checkGroupByAgg(LogicalPlan p, Set localFailures e.collectFirstChildren(c -> { if (Functions.isGrouping(c)) { localFailures.add(fail(c, - "Cannot combine [%s] grouping function inside GROUP BY, found [%s];" + "Cannot combine [{}] grouping function inside GROUP BY, found [{}];" + " consider moving the expression inside the histogram", Expressions.name(c), Expressions.name(e))); return true; @@ -485,7 +505,7 @@ private static boolean checkGroupByAgg(LogicalPlan p, Set localFailures if (!missing.isEmpty()) { String plural = missing.size() > 1 ? "s" : StringUtils.EMPTY; - localFailures.add(fail(missing.values().iterator().next(), "Cannot use non-grouped column" + plural + " %s, expected %s", + localFailures.add(fail(missing.values().iterator().next(), "Cannot use non-grouped column" + plural + " {}, expected {}", Expressions.names(missing.keySet()), Expressions.names(a.groupings()))); return false; @@ -568,7 +588,7 @@ private static void checkFilterOnAggs(LogicalPlan p, Set localFailures) filter.condition().forEachDown(e -> { if (Functions.isAggregate(e) || e instanceof AggregateFunctionAttribute) { localFailures.add( - fail(e, "Cannot use WHERE filtering on aggregate function [%s], use HAVING instead", Expressions.name(e))); + fail(e, "Cannot use WHERE filtering on aggregate function [{}], use HAVING instead", Expressions.name(e))); } }, Expression.class); } @@ -582,7 +602,7 @@ private static void checkFilterOnGrouping(LogicalPlan p, Set localFailu filter.condition().forEachDown(e -> { if (Functions.isGrouping(e) || e instanceof GroupingFunctionAttribute) { localFailures - .add(fail(e, "Cannot filter on grouping function [%s], use its argument instead", Expressions.name(e))); + .add(fail(e, "Cannot filter on grouping function [{}], use its argument instead", Expressions.name(e))); } }, Expression.class); } @@ -635,7 +655,7 @@ private static void validateInExpression(LogicalPlan p, Set localFailur DataType dt = in.value().dataType(); for (Expression value : in.list()) { if (areTypesCompatible(dt, value.dataType()) == false) { - localFailures.add(fail(value, "expected data type [%s], value provided is of type [%s]", + localFailures.add(fail(value, "expected data type [{}], value provided is of type [{}]", dt.esType, value.dataType().esType)); return; } @@ -656,7 +676,7 @@ private static void validateConditional(LogicalPlan p, Set localFailure } } else { if (areTypesCompatible(dt, child.dataType()) == false) { - localFailures.add(fail(child, "expected data type [%s], value provided is of type [%s]", + localFailures.add(fail(child, "expected data type [{}], value provided is of type [{}]", dt.esType, child.dataType().esType)); return; } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java index b4c7e063db092..c3fd9613bfcce 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java @@ -60,7 +60,7 @@ public PlanExecutor(Client client, IndexResolver indexResolver, NamedWriteableRe } private SqlSession newSession(Configuration cfg) { - return new SqlSession(cfg, client, functionRegistry, indexResolver, preAnalyzer, verifier, optimizer, planner); + return new SqlSession(cfg, client, functionRegistry, indexResolver, preAnalyzer, verifier, optimizer, planner, this); } public void searchSource(Configuration cfg, String sql, List params, ActionListener listener) { @@ -68,15 +68,20 @@ public void searchSource(Configuration cfg, String sql, List if (exec instanceof EsQueryExec) { EsQueryExec e = (EsQueryExec) exec; listener.onResponse(SourceGenerator.sourceBuilder(e.queryContainer(), cfg.filter(), cfg.pageSize())); - } else if (exec instanceof LocalExec) { - listener.onFailure(new PlanningException("Cannot generate a query DSL for an SQL query that either " + - "its WHERE clause evaluates to FALSE or doesn't operate on a table (missing a FROM clause), sql statement: [{}]", - sql)); - } else if (exec instanceof CommandExec) { - listener.onFailure(new PlanningException("Cannot generate a query DSL for a special SQL command " + - "(e.g.: DESCRIBE, SHOW), sql statement: [{}]", sql)); - } else { - listener.onFailure(new PlanningException("Cannot generate a query DSL, sql statement: [{}]", sql)); + } + // try to provide a better resolution of what failed + else { + String message = null; + if (exec instanceof LocalExec) { + message = "Cannot generate a query DSL for an SQL query that either " + + "its WHERE clause evaluates to FALSE or doesn't operate on a table (missing a FROM clause)"; + } else if (exec instanceof CommandExec) { + message = "Cannot generate a query DSL for a special SQL command " + + "(e.g.: DESCRIBE, SHOW)"; + } else { + message = "Cannot generate a query DSL"; + } + listener.onFailure(new PlanningException(message + ", sql statement: [{}]", sql)); } }, listener::onFailure)); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java index 94ea45b5ec83e..b09e98d11c17d 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.BitSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -49,12 +50,14 @@ public class CompositeAggregationCursor implements Cursor { private final String[] indices; private final byte[] nextQuery; private final List extractors; + private final BitSet mask; private final int limit; - CompositeAggregationCursor(byte[] next, List exts, int remainingLimit, String... indices) { + CompositeAggregationCursor(byte[] next, List exts, BitSet mask, int remainingLimit, String... indices) { this.indices = indices; this.nextQuery = next; this.extractors = exts; + this.mask = mask; this.limit = remainingLimit; } @@ -64,6 +67,7 @@ public CompositeAggregationCursor(StreamInput in) throws IOException { limit = in.readVInt(); extractors = in.readNamedWriteableList(BucketExtractor.class); + mask = BitSet.valueOf(in.readByteArray()); } @Override @@ -73,6 +77,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(limit); out.writeNamedWriteableList(extractors); + out.writeByteArray(mask.toByteArray()); } @Override @@ -88,6 +93,10 @@ byte[] next() { return nextQuery; } + BitSet mask() { + return mask; + } + List extractors() { return extractors; } @@ -125,7 +134,7 @@ public void onResponse(SearchResponse r) { } updateCompositeAfterKey(r, query); - CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, r, limit, serializeQuery(query), indices); + CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit, serializeQuery(query), indices); listener.onResponse(rowSet); } catch (Exception ex) { listener.onFailure(ex); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java index a9ca179147ea9..fbbc839fe1c76 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java @@ -8,10 +8,10 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractor; -import org.elasticsearch.xpack.sql.session.AbstractRowSet; import org.elasticsearch.xpack.sql.session.Cursor; import org.elasticsearch.xpack.sql.session.RowSet; +import java.util.BitSet; import java.util.List; import static java.util.Collections.emptyList; @@ -19,8 +19,7 @@ /** * {@link RowSet} specific to (GROUP BY) aggregation. */ -class CompositeAggsRowSet extends AbstractRowSet { - private final List exts; +class CompositeAggsRowSet extends ResultRowSet { private final List buckets; @@ -29,8 +28,8 @@ class CompositeAggsRowSet extends AbstractRowSet { private final int size; private int row = 0; - CompositeAggsRowSet(List exts, SearchResponse response, int limit, byte[] next, String... indices) { - this.exts = exts; + CompositeAggsRowSet(List exts, BitSet mask, SearchResponse response, int limit, byte[] next, String... indices) { + super(exts, mask); CompositeAggregation composite = CompositeAggregationCursor.getComposite(response); if (composite != null) { @@ -54,19 +53,14 @@ class CompositeAggsRowSet extends AbstractRowSet { if (next == null || size == 0 || remainingLimit == 0) { cursor = Cursor.EMPTY; } else { - cursor = new CompositeAggregationCursor(next, exts, remainingLimit, indices); + cursor = new CompositeAggregationCursor(next, exts, mask, remainingLimit, indices); } } } @Override - protected Object getColumn(int column) { - return exts.get(column).extract(buckets.get(row)); - } - - @Override - public int columnCount() { - return exts.size(); + protected Object extractValue(BucketExtractor e) { + return e.extract(buckets.get(row)); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursor.java new file mode 100644 index 0000000000000..379ed3b61ec67 --- /dev/null +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursor.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.sql.execution.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xpack.sql.session.Configuration; +import org.elasticsearch.xpack.sql.session.Cursor; +import org.elasticsearch.xpack.sql.session.RowSet; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +import static java.util.Collections.emptyList; + +public class PagingListCursor implements Cursor { + + public static final String NAME = "p"; + + private final List> data; + private final int pageSize; + + PagingListCursor(List> data, int pageSize) { + this.data = data; + this.pageSize = pageSize; + } + + @SuppressWarnings("unchecked") + public PagingListCursor(StreamInput in) throws IOException { + data = (List>) in.readGenericValue(); + pageSize = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeGenericValue(data); + out.writeVInt(pageSize); + } + + @Override + public String getWriteableName() { + return NAME; + } + + List> data() { + return data; + } + + int pageSize() { + return pageSize; + } + + @Override + public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { + // the check is really a safety measure since the page initialization handles it already (by returning an empty cursor) + List> nextData = data.size() > pageSize ? data.subList(pageSize, data.size()) : emptyList(); + listener.onResponse(new PagingListRowSet(nextData, pageSize)); + } + + @Override + public void clear(Configuration cfg, Client client, ActionListener listener) { + listener.onResponse(true); + } + + @Override + public int hashCode() { + return Objects.hash(data, pageSize); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + PagingListCursor other = (PagingListCursor) obj; + return Objects.equals(pageSize, other.pageSize) && Objects.equals(data, other.data); + } + + @Override + public String toString() { + return "cursor for paging list"; + } +} \ No newline at end of file diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListRowSet.java new file mode 100644 index 0000000000000..24d1e9c21f997 --- /dev/null +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListRowSet.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.sql.execution.search; + +import org.elasticsearch.xpack.sql.session.Cursor; +import org.elasticsearch.xpack.sql.session.ListRowSet; +import org.elasticsearch.xpack.sql.type.Schema; + +import java.util.List; + +class PagingListRowSet extends ListRowSet { + + private final int pageSize; + private final Cursor cursor; + + PagingListRowSet(List> list, int pageSize) { + this(Schema.EMPTY, list, pageSize); + } + + PagingListRowSet(Schema schema, List> list, int pageSize) { + super(schema, list); + this.pageSize = Math.min(pageSize, list.size()); + this.cursor = list.size() > pageSize ? new PagingListCursor(list, pageSize) : Cursor.EMPTY; + } + + @Override + public int size() { + return pageSize; + } + + @Override + public Cursor nextPageCursor() { + return cursor; + } +} diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java index ff02ed85818fe..fa9778a70c8ed 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java @@ -7,6 +7,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -14,6 +15,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -25,6 +27,7 @@ import org.elasticsearch.search.aggregations.bucket.filter.Filters; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; +import org.elasticsearch.xpack.sql.execution.PlanExecutor; import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractor; import org.elasticsearch.xpack.sql.execution.search.extractor.CompositeKeyExtractor; import org.elasticsearch.xpack.sql.execution.search.extractor.ComputingExtractor; @@ -32,6 +35,8 @@ import org.elasticsearch.xpack.sql.execution.search.extractor.FieldHitExtractor; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; import org.elasticsearch.xpack.sql.execution.search.extractor.MetricAggExtractor; +import org.elasticsearch.xpack.sql.expression.Attribute; +import org.elasticsearch.xpack.sql.expression.ExpressionId; import org.elasticsearch.xpack.sql.expression.gen.pipeline.AggExtractorInput; import org.elasticsearch.xpack.sql.expression.gen.pipeline.AggPathInput; import org.elasticsearch.xpack.sql.expression.gen.pipeline.HitExtractorInput; @@ -46,16 +51,23 @@ import org.elasticsearch.xpack.sql.querydsl.container.ScriptFieldRef; import org.elasticsearch.xpack.sql.querydsl.container.SearchHitFieldRef; import org.elasticsearch.xpack.sql.session.Configuration; +import org.elasticsearch.xpack.sql.session.Cursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.SchemaRowSet; +import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.type.Schema; import org.elasticsearch.xpack.sql.util.StringUtils; import java.io.IOException; import java.util.ArrayList; +import java.util.BitSet; +import java.util.Comparator; import java.util.LinkedHashSet; import java.util.List; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.singletonList; // TODO: add retry/back-off @@ -63,25 +75,25 @@ public class Querier { private final Logger log = LogManager.getLogger(getClass()); + private final PlanExecutor planExecutor; + private final Configuration cfg; private final TimeValue keepAlive, timeout; private final int size; private final Client client; @Nullable private final QueryBuilder filter; - public Querier(Client client, Configuration cfg) { - this(client, cfg.requestTimeout(), cfg.pageTimeout(), cfg.filter(), cfg.pageSize()); + public Querier(SqlSession sqlSession) { + this.planExecutor = sqlSession.planExecutor(); + this.client = sqlSession.client(); + this.cfg = sqlSession.configuration(); + this.keepAlive = cfg.requestTimeout(); + this.timeout = cfg.pageTimeout(); + this.filter = cfg.filter(); + this.size = cfg.pageSize(); } - public Querier(Client client, TimeValue keepAlive, TimeValue timeout, QueryBuilder filter, int size) { - this.client = client; - this.keepAlive = keepAlive; - this.timeout = timeout; - this.filter = filter; - this.size = size; - } - - public void query(Schema schema, QueryContainer query, String index, ActionListener listener) { + public void query(List output, QueryContainer query, String index, ActionListener listener) { // prepare the request SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(query, filter, size); // set query timeout @@ -95,16 +107,21 @@ public void query(Schema schema, QueryContainer query, String index, ActionListe SearchRequest search = prepareRequest(client, sourceBuilder, timeout, Strings.commaDelimitedListToStringArray(index)); - ActionListener l; + @SuppressWarnings("rawtypes") + List> sortingColumns = query.sortingColumns(); + listener = sortingColumns.isEmpty() ? listener : new LocalAggregationSorterListener(listener, sortingColumns, query.limit()); + + ActionListener l = null; + if (query.isAggsOnly()) { if (query.aggs().useImplicitGroupBy()) { - l = new ImplicitGroupActionListener(listener, client, timeout, schema, query, search); + l = new ImplicitGroupActionListener(listener, client, timeout, output, query, search); } else { - l = new CompositeActionListener(listener, client, timeout, schema, query, search); + l = new CompositeActionListener(listener, client, timeout, output, query, search); } } else { search.scroll(keepAlive); - l = new ScrollActionListener(listener, client, timeout, schema, query); + l = new ScrollActionListener(listener, client, timeout, output, query); } client.search(search, l); @@ -112,13 +129,125 @@ public void query(Schema schema, QueryContainer query, String index, ActionListe public static SearchRequest prepareRequest(Client client, SearchSourceBuilder source, TimeValue timeout, String... indices) { SearchRequest search = client.prepareSearch(indices) - // always track total hits accurately - .setTrackTotalHits(true) - .setAllowPartialSearchResults(false) - .setSource(source) - .setTimeout(timeout) - .request(); - return search; + // always track total hits accurately + .setTrackTotalHits(true) + .setAllowPartialSearchResults(false) + .setSource(source) + .setTimeout(timeout) + .request(); + return search; + } + + /** + * Listener used for local sorting (typically due to aggregations used inside `ORDER BY`). + * + * This listener consumes the whole result set, sorts it in memory then send the paginates + * the results back the user. + */ + @SuppressWarnings("rawtypes") + class LocalAggregationSorterListener implements ActionListener { + + private final ActionListener listener; + + // keep the top N entries. + private final PriorityQueue, Integer>> data; + private final AtomicInteger counter = new AtomicInteger(); + private volatile Schema schema; + + LocalAggregationSorterListener(ActionListener listener, List> sortingColumns, int limit) { + this.listener = listener; + + this.data = new PriorityQueue, Integer>>(Math.max(limit, 100)) { + + // compare row based on the received attribute sort + // if a sort item is not in the list, it is assumed the sorting happened in ES + // and the results are left as is (by using the row ordering), otherwise it is sorted based on the given criteria. + // + // Take for example ORDER BY a, x, b, y + // a, b - are sorted in ES + // x, y - need to be sorted client-side + // sorting on x kicks in, only if the values for a are equal. + + // thanks to @jpountz for the row ordering idea as a way to preserve ordering + @SuppressWarnings("unchecked") + @Override + protected boolean lessThan(Tuple, Integer> l, Tuple, Integer> r) { + for (Tuple tuple : sortingColumns) { + int i = tuple.v1().intValue(); + Comparator comparator = tuple.v2(); + + Object vl = l.v1().get(i); + Object vr = r.v1().get(i); + if (comparator != null) { + int result = comparator.compare(vl, vr); + // if things are equals, move to the next comparator + if (result != 0) { + return result < 0; + } + } + // no comparator means the existing order needs to be preserved + else { + // check the values - if they are equal move to the next comparator + // otherwise return the row order + if (Objects.equals(vl, vr) == false) { + return l.v2().compareTo(r.v2()) < 0; + } + } + } + // everything is equal, fall-back to the row order + return l.v2().compareTo(r.v2()) < 0; + } + }; + } + + @Override + public void onResponse(SchemaRowSet schemaRowSet) { + schema = schemaRowSet.schema(); + doResponse(schemaRowSet); + } + + private void doResponse(RowSet rowSet) { + // 1. consume all pages received + consumeRowSet(rowSet); + Cursor cursor = rowSet.nextPageCursor(); + // 1a. trigger a next call if there's still data + if (cursor != Cursor.EMPTY) { + // trigger a next call + planExecutor.nextPage(cfg, cursor, ActionListener.wrap(this::doResponse, this::onFailure)); + // make sure to bail out afterwards as we'll get called by a different thread + return; + } + + // no more data available, the last thread sends the response + // 2. send the in-memory view to the client + sendResponse(); + } + + private void consumeRowSet(RowSet rowSet) { + // use a synchronized block for visibility purposes (there's no concurrency) + ResultRowSet rrs = (ResultRowSet) rowSet; + synchronized (data) { + rrs.forEachRow(r -> { + List row = new ArrayList<>(rrs.columnCount()); + rrs.forEachResultColumn(row::add); + data.insertWithOverflow(new Tuple<>(row, counter.getAndIncrement())); + }); + } + } + + private void sendResponse() { + List> list = new ArrayList<>(data.size()); + Tuple, Integer> pop = null; + while ((pop = data.pop()) != null) { + list.add(pop.v1()); + } + listener.onResponse(new PagingListRowSet(schema, list, cfg.pageSize())); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } } /** @@ -154,9 +283,9 @@ public Aggregations getAggregations() { } }); - ImplicitGroupActionListener(ActionListener listener, Client client, TimeValue keepAlive, Schema schema, + ImplicitGroupActionListener(ActionListener listener, Client client, TimeValue keepAlive, List output, QueryContainer query, SearchRequest request) { - super(listener, client, keepAlive, schema, query, request); + super(listener, client, keepAlive, output, query, request); } @Override @@ -180,9 +309,12 @@ private void handleBuckets(List buckets, SearchResponse respon if (buckets.size() == 1) { Bucket implicitGroup = buckets.get(0); List extractors = initBucketExtractors(response); - Object[] values = new Object[extractors.size()]; - for (int i = 0; i < values.length; i++) { - values[i] = extractors.get(i).extract(implicitGroup); + + Object[] values = new Object[mask.cardinality()]; + + int index = 0; + for (int i = mask.nextSetBit(0); i >= 0; i = mask.nextSetBit(i + 1)) { + values[index++] = extractors.get(i).extract(implicitGroup); } listener.onResponse(Rows.singleton(schema, values)); @@ -203,8 +335,8 @@ private void handleBuckets(List buckets, SearchResponse respon static class CompositeActionListener extends BaseAggActionListener { CompositeActionListener(ActionListener listener, Client client, TimeValue keepAlive, - Schema schema, QueryContainer query, SearchRequest request) { - super(listener, client, keepAlive, schema, query, request); + List output, QueryContainer query, SearchRequest request) { + super(listener, client, keepAlive, output, query, request); } @@ -230,7 +362,7 @@ protected void handleResponse(SearchResponse response, ActionListener listener, Client client, TimeValue keepAlive, Schema schema, + BaseAggActionListener(ActionListener listener, Client client, TimeValue keepAlive, List output, QueryContainer query, SearchRequest request) { - super(listener, client, keepAlive, schema); + super(listener, client, keepAlive, output); this.query = query; this.request = request; + this.mask = query.columnMask(output); } protected List initBucketExtractors(SearchResponse response) { // create response extractors for the first time - List refs = query.columns(); + List> refs = query.fields(); List exts = new ArrayList<>(refs.size()); ConstantExtractor totalCount = new ConstantExtractor(response.getHits().getTotalHits().value); - for (FieldExtraction ref : refs) { - exts.add(createExtractor(ref, totalCount)); + for (Tuple ref : refs) { + exts.add(createExtractor(ref.v1(), totalCount)); } return exts; } @@ -301,11 +435,13 @@ private BucketExtractor createExtractor(FieldExtraction ref, BucketExtractor tot */ static class ScrollActionListener extends BaseActionListener { private final QueryContainer query; + private final BitSet mask; ScrollActionListener(ActionListener listener, Client client, TimeValue keepAlive, - Schema schema, QueryContainer query) { - super(listener, client, keepAlive, schema); + List output, QueryContainer query) { + super(listener, client, keepAlive, output); this.query = query; + this.mask = query.columnMask(output); } @Override @@ -313,27 +449,27 @@ protected void handleResponse(SearchResponse response, ActionListener refs = query.columns(); + List> refs = query.fields(); List exts = new ArrayList<>(refs.size()); - for (FieldExtraction ref : refs) { - exts.add(createExtractor(ref)); + for (Tuple ref : refs) { + exts.add(createExtractor(ref.v1())); } // there are some results if (hits.length > 0) { String scrollId = response.getScrollId(); - SchemaSearchHitRowSet hitRowSet = new SchemaSearchHitRowSet(schema, exts, hits, query.limit(), scrollId); + SchemaSearchHitRowSet hitRowSet = new SchemaSearchHitRowSet(schema, exts, mask, hits, query.limit(), scrollId); // if there's an id, try to setup next scroll if (scrollId != null && // is all the content already retrieved? - (Boolean.TRUE.equals(response.isTerminatedEarly()) + (Boolean.TRUE.equals(response.isTerminatedEarly()) || response.getHits().getTotalHits().value == hits.length || hitRowSet.isLimitReached())) { // if so, clear the scroll clear(response.getScrollId(), ActionListener.wrap( - succeeded -> listener.onResponse(new SchemaSearchHitRowSet(schema, exts, hits, query.limit(), null)), + succeeded -> listener.onResponse(new SchemaSearchHitRowSet(schema, exts, mask, hits, query.limit(), null)), listener::onFailure)); } else { listener.onResponse(hitRowSet); @@ -394,12 +530,12 @@ abstract static class BaseActionListener implements ActionListener listener, Client client, TimeValue keepAlive, Schema schema) { + BaseActionListener(ActionListener listener, Client client, TimeValue keepAlive, List output) { this.listener = listener; this.client = client; this.keepAlive = keepAlive; - this.schema = schema; + this.schema = Rows.schema(output); } // TODO: need to handle rejections plus check failures (shard size, etc...) diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/ResultRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/ResultRowSet.java new file mode 100644 index 0000000000000..b692585760ab2 --- /dev/null +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/ResultRowSet.java @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.sql.execution.search; + +import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; +import org.elasticsearch.xpack.sql.session.AbstractRowSet; +import org.elasticsearch.xpack.sql.util.Check; + +import java.util.BitSet; +import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; + +abstract class ResultRowSet extends AbstractRowSet { + + private final List extractors; + private final BitSet mask; + + ResultRowSet(List extractors, BitSet mask) { + this.extractors = extractors; + this.mask = mask; + Check.isTrue(mask.length() <= extractors.size(), "Invalid number of extracted columns specified"); + } + + @Override + public final int columnCount() { + return mask.cardinality(); + } + + @Override + protected Object getColumn(int column) { + return extractValue(userExtractor(column)); + } + + List extractors() { + return extractors; + } + + BitSet mask() { + return mask; + } + + E userExtractor(int column) { + int i = -1; + // find the nth set bit + for (i = mask.nextSetBit(0); i >= 0; i = mask.nextSetBit(i + 1)) { + if (column-- == 0) { + return extractors.get(i); + } + } + + throw new SqlIllegalArgumentException("Cannot find column [{}]", column); + } + + Object resultColumn(int column) { + return extractValue(extractors().get(column)); + } + + int resultColumnCount() { + return extractors.size(); + } + + void forEachResultColumn(Consumer action) { + Objects.requireNonNull(action); + int rowSize = resultColumnCount(); + for (int i = 0; i < rowSize; i++) { + action.accept(resultColumn(i)); + } + } + + + protected abstract Object extractValue(E e); +} diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java index 7c646fbb0b7f1..bad618161e58c 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java @@ -11,6 +11,7 @@ import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.type.Schema; +import java.util.BitSet; import java.util.List; /** @@ -21,9 +22,10 @@ class SchemaCompositeAggsRowSet extends CompositeAggsRowSet implements SchemaRow private final Schema schema; - SchemaCompositeAggsRowSet(Schema schema, List exts, SearchResponse response, int limitAggs, byte[] next, + SchemaCompositeAggsRowSet(Schema schema, List exts, BitSet mask, SearchResponse response, int limitAggs, + byte[] next, String... indices) { - super(exts, response, limitAggs, next, indices); + super(exts, mask, response, limitAggs, next, indices); this.schema = schema; } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaSearchHitRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaSearchHitRowSet.java index 7ec20a93c0945..aa5c57aab609d 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaSearchHitRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaSearchHitRowSet.java @@ -10,6 +10,7 @@ import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.type.Schema; +import java.util.BitSet; import java.util.List; /** @@ -20,8 +21,8 @@ class SchemaSearchHitRowSet extends SearchHitRowSet implements SchemaRowSet { private final Schema schema; - SchemaSearchHitRowSet(Schema schema, List exts, SearchHit[] hits, int limitHits, String scrollId) { - super(exts, hits, limitHits, scrollId); + SchemaSearchHitRowSet(Schema schema, List exts, BitSet mask, SearchHit[] hits, int limitHits, String scrollId) { + super(exts, mask, hits, limitHits, scrollId); this.schema = schema; } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java index dcaca223f4ce2..af57126cc5610 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java @@ -23,6 +23,7 @@ import org.elasticsearch.xpack.sql.session.RowSet; import java.io.IOException; +import java.util.BitSet; import java.util.List; import java.util.Objects; @@ -34,11 +35,13 @@ public class ScrollCursor implements Cursor { private final String scrollId; private final List extractors; + private final BitSet mask; private final int limit; - public ScrollCursor(String scrollId, List extractors, int limit) { + public ScrollCursor(String scrollId, List extractors, BitSet mask, int limit) { this.scrollId = scrollId; this.extractors = extractors; + this.mask = mask; this.limit = limit; } @@ -47,6 +50,7 @@ public ScrollCursor(StreamInput in) throws IOException { limit = in.readVInt(); extractors = in.readNamedWriteableList(HitExtractor.class); + mask = BitSet.valueOf(in.readByteArray()); } @Override @@ -55,6 +59,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(limit); out.writeNamedWriteableList(extractors); + out.writeByteArray(mask.toByteArray()); } @Override @@ -66,6 +71,10 @@ String scrollId() { return scrollId; } + BitSet mask() { + return mask; + } + List extractors() { return extractors; } @@ -79,7 +88,7 @@ public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry re SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(cfg.pageTimeout()); client.searchScroll(request, ActionListener.wrap((SearchResponse response) -> { - SearchHitRowSet rowSet = new SearchHitRowSet(extractors, response.getHits().getHits(), + SearchHitRowSet rowSet = new SearchHitRowSet(extractors, mask, response.getHits().getHits(), limit, response.getScrollId()); if (rowSet.nextPageCursor() == Cursor.EMPTY ) { // we are finished with this cursor, let's clean it before continuing diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java index ba3682df5cc23..54f60ec6ae146 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java @@ -9,10 +9,10 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; -import org.elasticsearch.xpack.sql.session.AbstractRowSet; import org.elasticsearch.xpack.sql.session.Cursor; import java.util.Arrays; +import java.util.BitSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -20,10 +20,9 @@ /** * Extracts rows from an array of {@link SearchHit}. */ -class SearchHitRowSet extends AbstractRowSet { +class SearchHitRowSet extends ResultRowSet { private final SearchHit[] hits; private final Cursor cursor; - private final List extractors; private final Set innerHits = new LinkedHashSet<>(); private final String innerHit; @@ -31,10 +30,10 @@ class SearchHitRowSet extends AbstractRowSet { private final int[] indexPerLevel; private int row = 0; - SearchHitRowSet(List exts, SearchHit[] hits, int limit, String scrollId) { + SearchHitRowSet(List exts, BitSet mask, SearchHit[] hits, int limit, String scrollId) { + super(exts, mask); this.hits = hits; - this.extractors = exts; // Since the results might contain nested docs, the iteration is similar to that of Aggregation // namely it discovers the nested docs and then, for iteration, increments the deepest level first @@ -85,7 +84,7 @@ class SearchHitRowSet extends AbstractRowSet { if (size == 0 || remainingLimit == 0) { cursor = Cursor.EMPTY; } else { - cursor = new ScrollCursor(scrollId, extractors, remainingLimit); + cursor = new ScrollCursor(scrollId, extractors(), mask, remainingLimit); } } } @@ -95,13 +94,7 @@ protected boolean isLimitReached() { } @Override - public int columnCount() { - return extractors.size(); - } - - @Override - protected Object getColumn(int column) { - HitExtractor e = extractors.get(column); + protected Object extractValue(HitExtractor e) { int extractorLevel = e.hitName() == null ? 0 : 1; SearchHit hit = null; diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SourceGenerator.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SourceGenerator.java index 16f283a1231da..08bb737e65cc4 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SourceGenerator.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SourceGenerator.java @@ -58,7 +58,7 @@ public static SearchSourceBuilder sourceBuilder(QueryContainer container, QueryB // need to be retrieved from the result documents // NB: the sortBuilder takes care of eliminating duplicates - container.columns().forEach(cr -> cr.collectFields(sortBuilder)); + container.fields().forEach(f -> f.v1().collectFields(sortBuilder)); sortBuilder.build(source); optimize(sortBuilder, source); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/AttributeMap.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/AttributeMap.java index 9f64246a514f7..73092f2ed72da 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/AttributeMap.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/AttributeMap.java @@ -11,7 +11,6 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.function.BiConsumer; import java.util.stream.Stream; @@ -21,7 +20,7 @@ import static java.util.Collections.unmodifiableCollection; import static java.util.Collections.unmodifiableSet; -public class AttributeMap { +public class AttributeMap implements Map { static class AttributeWrapper { @@ -120,8 +119,9 @@ public Object[] toArray() { @SuppressWarnings("unchecked") public A[] toArray(A[] a) { // collection is immutable so use that to our advantage - if (a.length < size()) + if (a.length < size()) { a = (A[]) java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), size()); + } int i = 0; Object[] result = a; for (U u : this) { @@ -140,6 +140,14 @@ public String toString() { } } + @SuppressWarnings("rawtypes") + public static final AttributeMap EMPTY = new AttributeMap<>(); + + @SuppressWarnings("unchecked") + public static final AttributeMap emptyAttributeMap() { + return EMPTY; + } + private final Map delegate; private Set keySet = null; private Collection values = null; @@ -175,6 +183,14 @@ void addAll(AttributeMap other) { delegate.putAll(other.delegate); } + public AttributeMap combine(AttributeMap other) { + AttributeMap combine = new AttributeMap<>(); + combine.addAll(this); + combine.addAll(other); + + return combine; + } + public AttributeMap subtract(AttributeMap other) { AttributeMap diff = new AttributeMap<>(); for (Entry entry : this.delegate.entrySet()) { @@ -222,14 +238,17 @@ public Set attributeNames() { return s; } + @Override public int size() { return delegate.size(); } + @Override public boolean isEmpty() { return delegate.isEmpty(); } + @Override public boolean containsKey(Object key) { if (key instanceof NamedExpression) { return delegate.keySet().contains(new AttributeWrapper(((NamedExpression) key).toAttribute())); @@ -237,10 +256,12 @@ public boolean containsKey(Object key) { return false; } + @Override public boolean containsValue(Object value) { return delegate.values().contains(value); } + @Override public E get(Object key) { if (key instanceof NamedExpression) { return delegate.get(new AttributeWrapper(((NamedExpression) key).toAttribute())); @@ -248,6 +269,7 @@ public E get(Object key) { return null; } + @Override public E getOrDefault(Object key, E defaultValue) { E e; return (((e = get(key)) != null) || containsKey(key)) @@ -255,6 +277,27 @@ public E getOrDefault(Object key, E defaultValue) { : defaultValue; } + @Override + public E put(Attribute key, E value) { + throw new UnsupportedOperationException(); + } + + @Override + public E remove(Object key) { + throw new UnsupportedOperationException(); + } + + @Override + public void putAll(Map m) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + @Override public Set keySet() { if (keySet == null) { keySet = new UnwrappingSet(delegate.keySet()) { @@ -267,6 +310,7 @@ protected Attribute unwrap(AttributeWrapper next) { return keySet; } + @Override public Collection values() { if (values == null) { values = unmodifiableCollection(delegate.values()); @@ -274,6 +318,7 @@ public Collection values() { return values; } + @Override public Set> entrySet() { if (entrySet == null) { entrySet = new UnwrappingSet, Entry>(delegate.entrySet()) { @@ -301,6 +346,7 @@ public E setValue(E value) { return entrySet; } + @Override public void forEach(BiConsumer action) { delegate.forEach((k, v) -> action.accept(k.attr, v)); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/AttributeSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/AttributeSet.java index af290371dafd3..f8b89ac4b08fc 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/AttributeSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/AttributeSet.java @@ -57,6 +57,10 @@ void addAll(AttributeSet other) { delegate.addAll(other.delegate); } + public AttributeSet combine(AttributeSet other) { + return new AttributeSet(delegate.combine(other.delegate)); + } + public AttributeSet subtract(AttributeSet other) { return new AttributeSet(delegate.subtract(other.delegate)); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/Expression.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/Expression.java index d421d2b01c098..745cc36e34a57 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/Expression.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/Expression.java @@ -8,8 +8,8 @@ import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import org.elasticsearch.xpack.sql.capabilities.Resolvable; import org.elasticsearch.xpack.sql.capabilities.Resolvables; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.Node; +import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.type.DataType; import org.elasticsearch.xpack.sql.util.StringUtils; @@ -64,6 +64,7 @@ public String message() { private TypeResolution lazyTypeResolution = null; private Boolean lazyChildrenResolved = null; private Expression lazyCanonical = null; + private AttributeSet lazyReferences = null; public Expression(Source source, List children) { super(source, children); @@ -82,7 +83,10 @@ public Object fold() { // the references/inputs/leaves of the expression tree public AttributeSet references() { - return Expressions.references(children()); + if (lazyReferences == null) { + lazyReferences = Expressions.references(children()); + } + return lazyReferences; } public boolean childrenResolved() { diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/Expressions.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/Expressions.java index ee9f98e104877..4959e73c15ae5 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/Expressions.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/Expressions.java @@ -36,7 +36,7 @@ public enum ParamOrdinal { private Expressions() {} public static NamedExpression wrapAsNamed(Expression exp) { - return exp instanceof NamedExpression ? (NamedExpression) exp : new Alias(exp.source(), exp.nodeName(), exp); + return exp instanceof NamedExpression ? (NamedExpression) exp : new Alias(exp.source(), exp.sourceText(), exp); } public static List asAttributes(List named) { diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/NamedExpression.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/NamedExpression.java index 90b46b29ccf7f..2331034c10140 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/NamedExpression.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/NamedExpression.java @@ -91,4 +91,9 @@ public boolean equals(Object obj) { && Objects.equals(name, other.name) && Objects.equals(children(), other.children()); } + + @Override + public String toString() { + return super.toString() + "#" + id(); + } } \ No newline at end of file diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/Function.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/Function.java index 06171d43cc036..7724e81525b6e 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/Function.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/Function.java @@ -49,11 +49,6 @@ public Nullability nullable() { return Expressions.nullable(children()); } - @Override - public String toString() { - return sourceText() + "#" + id(); - } - public String functionName() { return functionName; } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/UnresolvedFunction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/UnresolvedFunction.java index 52bbed17346b3..85afc25c5c6cb 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/UnresolvedFunction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/UnresolvedFunction.java @@ -166,7 +166,7 @@ public String unresolvedMessage() { @Override public String toString() { - return UNRESOLVED_PREFIX + sourceText(); + return UNRESOLVED_PREFIX + name + children(); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/aggregate/AggregateFunction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/aggregate/AggregateFunction.java index 3b646a68d21c7..b432c5063a64b 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/aggregate/AggregateFunction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/aggregate/AggregateFunction.java @@ -52,7 +52,7 @@ public List parameters() { public AggregateFunctionAttribute toAttribute() { if (lazyAttribute == null) { // this is highly correlated with QueryFolder$FoldAggregate#addFunction (regarding the function name within the querydsl) - lazyAttribute = new AggregateFunctionAttribute(source(), name(), dataType(), id(), functionId(), null); + lazyAttribute = new AggregateFunctionAttribute(source(), name(), dataType(), id(), functionId()); } return lazyAttribute; } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/aggregate/AggregateFunctionAttribute.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/aggregate/AggregateFunctionAttribute.java index b50b268844c7b..96f072acda567 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/aggregate/AggregateFunctionAttribute.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/aggregate/AggregateFunctionAttribute.java @@ -18,23 +18,36 @@ public class AggregateFunctionAttribute extends FunctionAttribute { + // used when dealing with a inner agg (avg -> stats) to keep track of + // packed id + // used since the functionId points to the compoundAgg + private final ExpressionId innerId; private final String propertyPath; - AggregateFunctionAttribute(Source source, String name, DataType dataType, ExpressionId id, - String functionId, String propertyPath) { - this(source, name, dataType, null, Nullability.FALSE, id, false, functionId, propertyPath); + AggregateFunctionAttribute(Source source, String name, DataType dataType, ExpressionId id, String functionId) { + this(source, name, dataType, null, Nullability.FALSE, id, false, functionId, null, null); } - public AggregateFunctionAttribute(Source source, String name, DataType dataType, String qualifier, - Nullability nullability, ExpressionId id, boolean synthetic, String functionId, String propertyPath) { + AggregateFunctionAttribute(Source source, String name, DataType dataType, ExpressionId id, String functionId, ExpressionId innerId, + String propertyPath) { + this(source, name, dataType, null, Nullability.FALSE, id, false, functionId, innerId, propertyPath); + } + + public AggregateFunctionAttribute(Source source, String name, DataType dataType, String qualifier, Nullability nullability, + ExpressionId id, boolean synthetic, String functionId, ExpressionId innerId, String propertyPath) { super(source, name, dataType, qualifier, nullability, id, synthetic, functionId); + this.innerId = innerId; this.propertyPath = propertyPath; } @Override protected NodeInfo info() { - return NodeInfo.create(this, AggregateFunctionAttribute::new, - name(), dataType(), qualifier(), nullable(), id(), synthetic(), functionId(), propertyPath); + return NodeInfo.create(this, AggregateFunctionAttribute::new, name(), dataType(), qualifier(), nullable(), id(), synthetic(), + functionId(), innerId, propertyPath); + } + + public ExpressionId innerId() { + return innerId != null ? innerId : id(); } public String propertyPath() { @@ -43,33 +56,38 @@ public String propertyPath() { @Override protected Expression canonicalize() { - return new AggregateFunctionAttribute(source(), "", dataType(), null, Nullability.TRUE, id(), false, "", null); + return new AggregateFunctionAttribute(source(), "", dataType(), null, Nullability.TRUE, id(), false, "", null, null); } @Override protected Attribute clone(Source source, String name, String qualifier, Nullability nullability, ExpressionId id, boolean synthetic) { // this is highly correlated with QueryFolder$FoldAggregate#addFunction (regarding the function name within the querydsl) // that is the functionId is actually derived from the expression id to easily track it across contexts - return new AggregateFunctionAttribute(source, name, dataType(), qualifier, nullability, id, synthetic, functionId(), propertyPath); + return new AggregateFunctionAttribute(source, name, dataType(), qualifier, nullability, id, synthetic, functionId(), innerId, + propertyPath); } public AggregateFunctionAttribute withFunctionId(String functionId, String propertyPath) { - return new AggregateFunctionAttribute(source(), name(), dataType(), qualifier(), nullable(), - id(), synthetic(), functionId, propertyPath); + return new AggregateFunctionAttribute(source(), name(), dataType(), qualifier(), nullable(), id(), synthetic(), functionId, innerId, + propertyPath); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), propertyPath); + return Objects.hash(super.hashCode(), innerId, propertyPath); } @Override public boolean equals(Object obj) { - return super.equals(obj) && Objects.equals(propertyPath(), ((AggregateFunctionAttribute) obj).propertyPath()); + if (super.equals(obj)) { + AggregateFunctionAttribute other = (AggregateFunctionAttribute) obj; + return Objects.equals(innerId, other.innerId) && Objects.equals(propertyPath, other.propertyPath); + } + return false; } @Override protected String label() { - return "a->" + functionId(); + return "a->" + innerId(); } -} +} \ No newline at end of file diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/aggregate/Count.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/aggregate/Count.java index 429b4e7ba0774..95a1b50cc1139 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/aggregate/Count.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/aggregate/Count.java @@ -77,11 +77,11 @@ public String name() { public AggregateFunctionAttribute toAttribute() { // COUNT(*) gets its value from the parent aggregation on which _count is called if (field() instanceof Literal) { - return new AggregateFunctionAttribute(source(), name(), dataType(), id(), functionId(), "_count"); + return new AggregateFunctionAttribute(source(), name(), dataType(), id(), functionId(), id(), "_count"); } // COUNT(column) gets its value from a sibling aggregation (an exists filter agg) by calling its id and then _count on it if (!distinct()) { - return new AggregateFunctionAttribute(source(), name(), dataType(), id(), functionId(), functionId() + "._count"); + return new AggregateFunctionAttribute(source(), name(), dataType(), id(), functionId(), id(), functionId() + "._count"); } return super.toAttribute(); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/aggregate/InnerAggregate.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/aggregate/InnerAggregate.java index 2fdcf6658804c..6e35fa5a7ac72 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/aggregate/InnerAggregate.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/aggregate/InnerAggregate.java @@ -7,8 +7,8 @@ import org.elasticsearch.xpack.sql.expression.Expression; import org.elasticsearch.xpack.sql.expression.function.Function; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.NodeInfo; +import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.type.DataType; import java.util.List; @@ -17,7 +17,7 @@ public class InnerAggregate extends AggregateFunction { private final AggregateFunction inner; private final CompoundNumericAggregate outer; - private final String innerId; + private final String innerName; // used when the result needs to be extracted from a map (like in MatrixAggs or Percentiles) private final Expression innerKey; @@ -29,7 +29,7 @@ public InnerAggregate(Source source, AggregateFunction inner, CompoundNumericAgg super(source, outer.field(), outer.arguments()); this.inner = inner; this.outer = outer; - this.innerId = ((EnclosedAgg) inner).innerName(); + this.innerName = ((EnclosedAgg) inner).innerName(); this.innerKey = innerKey; } @@ -55,8 +55,8 @@ public CompoundNumericAggregate outer() { return outer; } - public String innerId() { - return innerId; + public String innerName() { + return innerName; } public Expression innerKey() { @@ -77,10 +77,10 @@ public String functionId() { public AggregateFunctionAttribute toAttribute() { // this is highly correlated with QueryFolder$FoldAggregate#addFunction (regarding the function name within the querydsl) return new AggregateFunctionAttribute(source(), name(), dataType(), outer.id(), functionId(), - aggMetricValue(functionId(), innerId)); + inner.id(), aggMetricValue(functionId(), innerName)); } - public static String aggMetricValue(String aggPath, String valueName) { + private static String aggMetricValue(String aggPath, String valueName) { // handle aggPath inconsistency (for percentiles and percentileRanks) percentile[99.9] (valid) vs percentile.99.9 (invalid) return aggPath + "[" + valueName + "]"; } @@ -98,4 +98,9 @@ public boolean functionEquals(Function f) { public String name() { return inner.name(); } + + @Override + public String toString() { + return nodeName() + "[" + outer + ">" + inner.nodeName() + "#" + inner.id() + "]"; + } } \ No newline at end of file diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/optimizer/Optimizer.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/optimizer/Optimizer.java index 38313fa613a0e..cef0fffaa7bb6 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/optimizer/Optimizer.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/optimizer/Optimizer.java @@ -10,7 +10,6 @@ import org.elasticsearch.xpack.sql.expression.Alias; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.AttributeMap; -import org.elasticsearch.xpack.sql.expression.AttributeSet; import org.elasticsearch.xpack.sql.expression.Expression; import org.elasticsearch.xpack.sql.expression.ExpressionId; import org.elasticsearch.xpack.sql.expression.ExpressionSet; @@ -73,6 +72,7 @@ import org.elasticsearch.xpack.sql.rule.RuleExecutor; import org.elasticsearch.xpack.sql.session.EmptyExecutable; import org.elasticsearch.xpack.sql.session.SingletonExecutable; +import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.type.DataType; import org.elasticsearch.xpack.sql.util.CollectionUtils; @@ -112,18 +112,9 @@ public LogicalPlan optimize(LogicalPlan verified) { @Override protected Iterable.Batch> batches() { - Batch aggregate = new Batch("Aggregation", - new PruneDuplicatesInGroupBy(), - new ReplaceDuplicateAggsWithReferences(), - new ReplaceAggsWithMatrixStats(), - new ReplaceAggsWithExtendedStats(), - new ReplaceAggsWithStats(), - new PromoteStatsToExtendedStats(), - new ReplaceAggsWithPercentiles(), - new ReplaceAggsWithPercentileRanks() - ); - Batch operators = new Batch("Operator Optimization", + new PruneDuplicatesInGroupBy(), + //new ReplaceDuplicateAggsWithReferences(), // combining new CombineProjections(), // folding @@ -151,6 +142,15 @@ protected Iterable.Batch> batches() { //new PruneDuplicateFunctions() ); + Batch aggregate = new Batch("Aggregation Rewrite", + new ReplaceAggsWithMatrixStats(), + new ReplaceAggsWithExtendedStats(), + new ReplaceAggsWithStats(), + new PromoteStatsToExtendedStats(), + new ReplaceAggsWithPercentiles(), + new ReplaceAggsWithPercentileRanks() + ); + Batch local = new Batch("Skip Elasticsearch", new SkipQueryOnLimitZero(), new SkipQueryIfFoldingProjection() @@ -247,7 +247,7 @@ protected Expression rule(Expression e, Map seen, Map { private static class Match { final Stats stats; - int count = 1; - final Set> functionTypes = new LinkedHashSet<>(); + private final Set> functionTypes = new LinkedHashSet<>(); + private Map, InnerAggregate> innerAggs = null; Match(Stats stats) { this.stats = stats; @@ -315,6 +315,22 @@ private static class Match { public String toString() { return stats.toString(); } + + public void add(Class aggType) { + functionTypes.add(aggType); + } + + // if the stat has at least two different functions for it, promote it as stat + // also keep the promoted function around for reuse + public AggregateFunction maybePromote(AggregateFunction agg) { + if (functionTypes.size() > 1) { + if (innerAggs == null) { + innerAggs = new LinkedHashMap<>(); + } + return innerAggs.computeIfAbsent(agg.getClass(), k -> new InnerAggregate(agg, stats)); + } + return agg; + } } @Override @@ -353,15 +369,10 @@ private Expression collect(Expression e, Map seen) { Match match = seen.get(argument); if (match == null) { - match = new Match(new Stats(f.source(), argument)); - match.functionTypes.add(f.getClass()); + match = new Match(new Stats(new Source(f.sourceLocation(), "STATS(" + Expressions.name(argument) + ")"), argument)); seen.put(argument, match); } - else { - if (match.functionTypes.add(f.getClass())) { - match.count++; - } - } + match.add(f.getClass()); } return e; @@ -372,13 +383,14 @@ private static Expression promote(Expression e, Map seen, Map AggregateFunction f = (AggregateFunction) e; Expression argument = f.field(); - Match counter = seen.get(argument); + Match match = seen.get(argument); - // if the stat has at least two different functions for it, promote it as stat - if (counter != null && counter.count > 1) { - InnerAggregate innerAgg = new InnerAggregate(f, counter.stats); - attrs.putIfAbsent(f.functionId(), innerAgg.toAttribute()); - return innerAgg; + if (match != null) { + AggregateFunction inner = match.maybePromote(f); + if (inner != f) { + attrs.putIfAbsent(f.functionId(), inner.toAttribute()); + } + return inner; } } return e; @@ -778,31 +790,23 @@ static class PruneOrderBy extends OptimizerRule { @Override protected LogicalPlan rule(OrderBy ob) { - List order = ob.order(); - - // remove constants - List nonConstant = order.stream().filter(o -> !o.child().foldable()).collect(toList()); - - if (nonConstant.isEmpty()) { - return ob.child(); - } - - // if the sort points to an agg, consider it only if there's grouping - if (ob.child() instanceof Aggregate) { - Aggregate a = (Aggregate) ob.child(); + AtomicBoolean foundAggregate = new AtomicBoolean(false); + AtomicBoolean foundImplicitGroupBy = new AtomicBoolean(false); + // if the first found aggregate has no grouping, there's no need to do ordering + ob.forEachDown(a -> { + // take into account + if (foundAggregate.get()) { + return; + } + foundAggregate.set(true); if (a.groupings().isEmpty()) { - AttributeSet aggsAttr = new AttributeSet(Expressions.asAttributes(a.aggregates())); - - List nonAgg = nonConstant.stream().filter(o -> { - if (o.child() instanceof NamedExpression) { - return !aggsAttr.contains(((NamedExpression) o.child()).toAttribute()); - } - return true; - }).collect(toList()); - - return nonAgg.isEmpty() ? ob.child() : new OrderBy(ob.source(), ob.child(), nonAgg); + foundImplicitGroupBy.set(true); } + }, Aggregate.class); + + if (foundImplicitGroupBy.get()) { + return ob.child(); } return ob; } @@ -820,6 +824,8 @@ protected LogicalPlan rule(OrderBy ob) { // remove constants List nonConstant = order.stream().filter(o -> !o.child().foldable()).collect(toList()); + // TODO: handle HAVING case - maybe simply use a transformation + // if the sort points to an agg, change the agg order based on the order if (ob.child() instanceof Aggregate) { Aggregate a = (Aggregate) ob.child(); @@ -976,6 +982,7 @@ protected LogicalPlan rule(Project project) { // eliminate lower project but first replace the aliases in the upper one return new Project(p.source(), p.child(), combineProjections(project.projections(), p.projections())); } + if (child instanceof Aggregate) { Aggregate a = (Aggregate) child; return new Aggregate(a.source(), a.child(), a.groupings(), combineProjections(project.projections(), a.aggregates())); @@ -988,23 +995,25 @@ protected LogicalPlan rule(Project project) { // that might be reused by the upper one, these need to be replaced. // for example an alias defined in the lower list might be referred in the upper - without replacing it the alias becomes invalid private List combineProjections(List upper, List lower) { + + //TODO: this need rewriting when moving functions of NamedExpression + // collect aliases in the lower list - Map map = new LinkedHashMap<>(); + Map map = new LinkedHashMap<>(); for (NamedExpression ne : lower) { - if (ne instanceof Alias) { - Alias a = (Alias) ne; - map.put(a.toAttribute(), a); + if ((ne instanceof Attribute) == false) { + map.put(ne.toAttribute(), ne); } } - AttributeMap aliases = new AttributeMap<>(map); + AttributeMap aliases = new AttributeMap<>(map); List replaced = new ArrayList<>(); // replace any matching attribute with a lower alias (if there's a match) // but clean-up non-top aliases at the end for (NamedExpression ne : upper) { NamedExpression replacedExp = (NamedExpression) ne.transformUp(a -> { - Alias as = aliases.get(a); + NamedExpression as = aliases.get(a); return as != null ? as : a; }, Attribute.class); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/TableIdentifier.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/TableIdentifier.java index 3cb9b52fd8a9a..9d17d766c2f96 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/TableIdentifier.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/TableIdentifier.java @@ -60,13 +60,11 @@ public String qualifiedIndex() { @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("["); if (cluster != null) { builder.append(cluster); + builder.append(":"); } - builder.append("][index="); builder.append(index); - builder.append("]"); return builder.toString(); } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/UnresolvedRelation.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/UnresolvedRelation.java index 2613b05b97129..fa9e2326f2cc5 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/UnresolvedRelation.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/UnresolvedRelation.java @@ -8,13 +8,15 @@ import org.elasticsearch.xpack.sql.capabilities.Unresolvable; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.plan.TableIdentifier; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.NodeInfo; +import org.elasticsearch.xpack.sql.tree.Source; import java.util.Collections; import java.util.List; import java.util.Objects; +import static java.util.Collections.singletonList; + public class UnresolvedRelation extends LeafPlan implements Unresolvable { private final TableIdentifier table; @@ -86,4 +88,14 @@ public boolean equals(Object obj) { && Objects.equals(alias, other.alias) && unresolvedMsg.equals(other.unresolvedMsg); } + + @Override + public List nodeProperties() { + return singletonList(table); + } + + @Override + public String toString() { + return UNRESOLVED_PREFIX + table.index(); + } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java index 2d16f6e0cf128..23805de0334f8 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java @@ -9,11 +9,10 @@ import org.elasticsearch.xpack.sql.execution.search.Querier; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.querydsl.container.QueryContainer; -import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.NodeInfo; +import org.elasticsearch.xpack.sql.tree.Source; import java.util.List; import java.util.Objects; @@ -22,7 +21,6 @@ public class EsQueryExec extends LeafExec { private final String index; private final List output; - private final QueryContainer queryContainer; public EsQueryExec(Source source, String index, List output, QueryContainer queryContainer) { @@ -56,8 +54,21 @@ public List output() { @Override public void execute(SqlSession session, ActionListener listener) { - Querier scroller = new Querier(session.client(), session.configuration()); - scroller.query(Rows.schema(output), queryContainer, index, listener); + Querier scroller = new Querier(session); + // List ids = queryContainer.columns(); + // + // int[] extIds = new int[output.size()]; + // + // for (int i = 0; i < output.size(); i++) { + // Attribute o = output.get(i); + // int indexOf = ids.indexOf(o.id()); + // if (indexOf == -1) { + // throw new SqlIllegalArgumentException("Cannot find extractor for column [{}]", o.name()); + // } + // extIds[i] = indexOf; + // } + + scroller.query(output, queryContainer, index, listener); } @Override @@ -85,4 +96,4 @@ public boolean equals(Object obj) { public String nodeString() { return nodeName() + "[" + index + "," + queryContainer + "]"; } -} +} \ No newline at end of file diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/planner/QueryFolder.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/planner/QueryFolder.java index da409439558c7..e17abd3697764 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/planner/QueryFolder.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/planner/QueryFolder.java @@ -10,6 +10,7 @@ import org.elasticsearch.xpack.sql.execution.search.AggRef; import org.elasticsearch.xpack.sql.expression.Alias; import org.elasticsearch.xpack.sql.expression.Attribute; +import org.elasticsearch.xpack.sql.expression.AttributeMap; import org.elasticsearch.xpack.sql.expression.Expression; import org.elasticsearch.xpack.sql.expression.Expressions; import org.elasticsearch.xpack.sql.expression.Foldables; @@ -144,8 +145,12 @@ protected PhysicalPlan rule(ProjectExec project) { } } - QueryContainer clone = new QueryContainer(queryC.query(), queryC.aggs(), queryC.columns(), aliases, - queryC.pseudoFunctions(), processors, queryC.sort(), queryC.limit()); + QueryContainer clone = new QueryContainer(queryC.query(), queryC.aggs(), queryC.fields(), + new AttributeMap<>(aliases), + queryC.pseudoFunctions(), + new AttributeMap<>(processors), + queryC.sort(), + queryC.limit()); return new EsQueryExec(exec.source(), exec.index(), project.output(), clone); } return project; @@ -168,7 +173,8 @@ protected PhysicalPlan rule(FilterExec plan) { } Aggs aggs = addPipelineAggs(qContainer, qt, plan); - qContainer = new QueryContainer(query, aggs, qContainer.columns(), qContainer.aliases(), + qContainer = new QueryContainer(query, aggs, qContainer.fields(), + qContainer.aliases(), qContainer.pseudoFunctions(), qContainer.scalarFunctions(), qContainer.sort(), @@ -313,7 +319,7 @@ protected PhysicalPlan rule(AggregateExec a) { } // add the computed column - queryC = qC.get().addColumn(new ComputedRef(proc)); + queryC = qC.get().addColumn(new ComputedRef(proc), f.toAttribute()); // TODO: is this needed? // redirect the alias to the scalar group id (changing the id altogether doesn't work it is @@ -335,20 +341,22 @@ protected PhysicalPlan rule(AggregateExec a) { // UTC is used since that's what the server uses and there's no conversion applied // (like for date histograms) ZoneId zi = child.dataType().isDateBased() ? DateUtils.UTC : null; - queryC = queryC.addColumn(new GroupByRef(matchingGroup.id(), null, zi)); + queryC = queryC.addColumn(new GroupByRef(matchingGroup.id(), null, zi), ((Attribute) child)); } // handle histogram else if (child instanceof GroupingFunction) { - queryC = queryC.addColumn(new GroupByRef(matchingGroup.id(), null, null)); + queryC = queryC.addColumn(new GroupByRef(matchingGroup.id(), null, null), + ((GroupingFunction) child).toAttribute()); } // fallback to regular agg functions else { // the only thing left is agg function Check.isTrue(Functions.isAggregate(child), "Expected aggregate function inside alias; got [{}]", child.nodeString()); - Tuple withAgg = addAggFunction(matchingGroup, - (AggregateFunction) child, compoundAggMap, queryC); - queryC = withAgg.v1().addColumn(withAgg.v2().context()); + AggregateFunction af = (AggregateFunction) child; + Tuple withAgg = addAggFunction(matchingGroup, af, compoundAggMap, queryC); + // make sure to add the inner id (to handle compound aggs) + queryC = withAgg.v1().addColumn(withAgg.v2().context(), af.toAttribute()); } } // not an Alias or Function means it's an Attribute so apply the same logic as above @@ -359,7 +367,7 @@ else if (child instanceof GroupingFunction) { Check.notNull(matchingGroup, "Cannot find group [{}]", Expressions.name(ne)); ZoneId zi = ne.dataType().isDateBased() ? DateUtils.UTC : null; - queryC = queryC.addColumn(new GroupByRef(matchingGroup.id(), null, zi)); + queryC = queryC.addColumn(new GroupByRef(matchingGroup.id(), null, zi), ne.toAttribute()); } } } @@ -367,7 +375,7 @@ else if (child instanceof GroupingFunction) { if (!aliases.isEmpty()) { Map newAliases = new LinkedHashMap<>(queryC.aliases()); newAliases.putAll(aliases); - queryC = queryC.withAliases(newAliases); + queryC = queryC.withAliases(new AttributeMap<>(newAliases)); } return new EsQueryExec(exec.source(), exec.index(), a.output(), queryC); } @@ -418,7 +426,7 @@ private Tuple addAggFunction(GroupByKey groupingAg // FIXME: concern leak - hack around MatrixAgg which is not // generalized (afaik) aggInput = new AggPathInput(f, - new MetricAggRef(cAggPath, ia.innerId(), ia.innerKey() != null ? QueryTranslator.nameOf(ia.innerKey()) : null)); + new MetricAggRef(cAggPath, ia.innerName(), ia.innerKey() != null ? QueryTranslator.nameOf(ia.innerKey()) : null)); } else { LeafAgg leafAgg = toAgg(functionId, f); @@ -468,19 +476,19 @@ protected PhysicalPlan rule(OrderExec plan) { if (sfa.orderBy() instanceof NamedExpression) { Attribute at = ((NamedExpression) sfa.orderBy()).toAttribute(); at = qContainer.aliases().getOrDefault(at, at); - qContainer = qContainer.sort(new AttributeSort(at, direction, missing)); + qContainer = qContainer.addSort(new AttributeSort(at, direction, missing)); } else if (!sfa.orderBy().foldable()) { // ignore constant throw new PlanningException("does not know how to order by expression {}", sfa.orderBy()); } } else { // nope, use scripted sorting - qContainer = qContainer.sort(new ScriptSort(sfa.script(), direction, missing)); + qContainer = qContainer.addSort(new ScriptSort(sfa.script(), direction, missing)); } } else if (attr instanceof ScoreAttribute) { - qContainer = qContainer.sort(new ScoreSort(direction, missing)); + qContainer = qContainer.addSort(new ScoreSort(direction, missing)); } else { - qContainer = qContainer.sort(new AttributeSort(attr, direction, missing)); + qContainer = qContainer.addSort(new AttributeSort(attr, direction, missing)); } } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/querydsl/agg/Aggs.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/querydsl/agg/Aggs.java index c7ab17670a212..dd6766960e755 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/querydsl/agg/Aggs.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/querydsl/agg/Aggs.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import org.elasticsearch.xpack.sql.expression.gen.script.ScriptTemplate; import org.elasticsearch.xpack.sql.querydsl.container.Sort.Direction; +import org.elasticsearch.xpack.sql.util.StringUtils; import java.util.ArrayList; import java.util.Collection; @@ -21,7 +22,6 @@ import static java.util.Collections.emptyList; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.xpack.sql.util.CollectionUtils.combine; -import static org.elasticsearch.xpack.sql.util.StringUtils.EMPTY; /** * SQL Aggregations associated with a query. @@ -40,7 +40,7 @@ public class Aggs { public static final String ROOT_GROUP_NAME = "groupby"; - public static final GroupByKey IMPLICIT_GROUP_KEY = new GroupByKey(ROOT_GROUP_NAME, EMPTY, null, null) { + public static final GroupByKey IMPLICIT_GROUP_KEY = new GroupByKey(ROOT_GROUP_NAME, StringUtils.EMPTY, null, null) { @Override public CompositeValuesSourceBuilder createSourceBuilder() { @@ -53,14 +53,12 @@ protected GroupByKey copy(String id, String fieldName, ScriptTemplate script, Di } }; + public static final Aggs EMPTY = new Aggs(emptyList(), emptyList(), emptyList()); + private final List groups; private final List metricAggs; private final List pipelineAggs; - public Aggs() { - this(emptyList(), emptyList(), emptyList()); - } - public Aggs(List groups, List metricAggs, List pipelineAggs) { this.groups = groups; diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/querydsl/container/QueryContainer.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/querydsl/container/QueryContainer.java index fee8d0e942a3b..b8e0d6243cb96 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/querydsl/container/QueryContainer.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/querydsl/container/QueryContainer.java @@ -15,9 +15,12 @@ import org.elasticsearch.xpack.sql.execution.search.FieldExtraction; import org.elasticsearch.xpack.sql.execution.search.SourceGenerator; import org.elasticsearch.xpack.sql.expression.Attribute; +import org.elasticsearch.xpack.sql.expression.AttributeMap; +import org.elasticsearch.xpack.sql.expression.ExpressionId; import org.elasticsearch.xpack.sql.expression.FieldAttribute; import org.elasticsearch.xpack.sql.expression.LiteralAttribute; import org.elasticsearch.xpack.sql.expression.function.ScoreAttribute; +import org.elasticsearch.xpack.sql.expression.function.aggregate.AggregateFunctionAttribute; import org.elasticsearch.xpack.sql.expression.function.scalar.ScalarFunctionAttribute; import org.elasticsearch.xpack.sql.expression.gen.pipeline.Pipe; import org.elasticsearch.xpack.sql.querydsl.agg.Aggs; @@ -34,7 +37,9 @@ import java.io.IOException; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collection; +import java.util.Comparator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -48,48 +53,142 @@ import static java.util.Collections.singletonMap; import static org.elasticsearch.xpack.sql.util.CollectionUtils.combine; +/** + * Container for various references of the built ES query. + * Useful to understanding how to interpret and navigate the + * returned result. + */ public class QueryContainer { private final Aggs aggs; private final Query query; - // final output seen by the client (hence the list or ordering) - // gets converted by the Scroller into Extractors for hits or actual results in case of aggregations - private final List columns; + // fields extracted from the response - not necessarily what the client sees + // for example in case of grouping or custom sorting, the response has extra columns + // that is filtered before getting to the client + + // the list contains both the field extraction and the id of its associated attribute (for custom sorting) + private final List> fields; // aliases (maps an alias to its actual resolved attribute) - private final Map aliases; + private final AttributeMap aliases; // pseudo functions (like count) - that are 'extracted' from other aggs private final Map pseudoFunctions; // scalar function processors - recorded as functions get folded; // at scrolling, their inputs (leaves) get updated - private final Map scalarFunctions; + private final AttributeMap scalarFunctions; private final Set sort; private final int limit; // computed - private final boolean aggsOnly; + private Boolean aggsOnly; + private Boolean customSort; public QueryContainer() { this(null, null, null, null, null, null, null, -1); } - public QueryContainer(Query query, Aggs aggs, List refs, Map aliases, + public QueryContainer(Query query, + Aggs aggs, + List> fields, + AttributeMap aliases, Map pseudoFunctions, - Map scalarFunctions, - Set sort, int limit) { + AttributeMap scalarFunctions, + Set sort, + int limit) { this.query = query; - this.aggs = aggs == null ? new Aggs() : aggs; - this.aliases = aliases == null || aliases.isEmpty() ? emptyMap() : aliases; + this.aggs = aggs == null ? Aggs.EMPTY : aggs; + this.fields = fields == null || fields.isEmpty() ? emptyList() : fields; + this.aliases = aliases == null || aliases.isEmpty() ? AttributeMap.emptyAttributeMap() : aliases; this.pseudoFunctions = pseudoFunctions == null || pseudoFunctions.isEmpty() ? emptyMap() : pseudoFunctions; - this.scalarFunctions = scalarFunctions == null || scalarFunctions.isEmpty() ? emptyMap() : scalarFunctions; - this.columns = refs == null || refs.isEmpty() ? emptyList() : refs; + this.scalarFunctions = scalarFunctions == null || scalarFunctions.isEmpty() ? AttributeMap.emptyAttributeMap() : scalarFunctions; this.sort = sort == null || sort.isEmpty() ? emptySet() : sort; this.limit = limit; - aggsOnly = columns.stream().allMatch(FieldExtraction::supportedByAggsOnlyQuery); + } + + /** + * If needed, create a comparator for each indicated column (which is indicated by an index pointing to the column number from the + * result set). + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public List> sortingColumns() { + if (customSort == Boolean.FALSE) { + return emptyList(); + } + + List> sortingColumns = new ArrayList<>(sort.size()); + + boolean aggSort = false; + for (Sort s : sort) { + Tuple tuple = new Tuple<>(Integer.valueOf(-1), null); + + if (s instanceof AttributeSort) { + AttributeSort as = (AttributeSort) s; + // find the relevant column of each aggregate function + if (as.attribute() instanceof AggregateFunctionAttribute) { + aggSort = true; + AggregateFunctionAttribute afa = (AggregateFunctionAttribute) as.attribute(); + afa = (AggregateFunctionAttribute) aliases.getOrDefault(afa, afa); + int atIndex = -1; + for (int i = 0; i < fields.size(); i++) { + Tuple field = fields.get(i); + if (field.v2().equals(afa.innerId())) { + atIndex = i; + break; + } + } + + if (atIndex == -1) { + throw new SqlIllegalArgumentException("Cannot find backing column for ordering aggregation [{}]", afa.name()); + } + // assemble a comparator for it + Comparator comp = s.direction() == Sort.Direction.ASC ? Comparator.naturalOrder() : Comparator.reverseOrder(); + comp = s.missing() == Sort.Missing.FIRST ? Comparator.nullsFirst(comp) : Comparator.nullsLast(comp); + + tuple = new Tuple<>(Integer.valueOf(atIndex), comp); + } + } + sortingColumns.add(tuple); + } + + if (customSort == null) { + customSort = Boolean.valueOf(aggSort); + } + + return aggSort ? sortingColumns : emptyList(); + } + + /** + * Since the container contains both the field extractors and the visible columns, + * compact the information in the listener through a bitset that acts as a mask + * on what extractors are used for the visible columns. + */ + public BitSet columnMask(List columns) { + BitSet mask = new BitSet(fields.size()); + for (Attribute column : columns) { + Attribute alias = aliases.get(column); + // find the column index + int index = -1; + ExpressionId id = column instanceof AggregateFunctionAttribute ? ((AggregateFunctionAttribute) column).innerId() : column.id(); + ExpressionId aliasId = alias != null ? (alias instanceof AggregateFunctionAttribute ? ((AggregateFunctionAttribute) alias) + .innerId() : alias.id()) : null; + for (int i = 0; i < fields.size(); i++) { + Tuple tuple = fields.get(i); + if (tuple.v2().equals(id) || (aliasId != null && tuple.v2().equals(aliasId))) { + index = i; + break; + } + } + if (index > -1) { + mask.set(index); + } else { + throw new SqlIllegalArgumentException("Cannot resolve field extractor index for column [{}]", column); + } + } + return mask; } public Query query() { @@ -100,11 +199,11 @@ public Aggs aggs() { return aggs; } - public List columns() { - return columns; + public List> fields() { + return fields; } - public Map aliases() { + public AttributeMap aliases() { return aliases; } @@ -121,11 +220,15 @@ public int limit() { } public boolean isAggsOnly() { - return aggsOnly; + if (aggsOnly == null) { + aggsOnly = Boolean.valueOf(this.fields.stream().allMatch(t -> t.v1().supportedByAggsOnlyQuery())); + } + + return aggsOnly.booleanValue(); } public boolean hasColumns() { - return !columns.isEmpty(); + return fields.size() > 0; } // @@ -133,37 +236,33 @@ public boolean hasColumns() { // public QueryContainer with(Query q) { - return new QueryContainer(q, aggs, columns, aliases, pseudoFunctions, scalarFunctions, sort, limit); - } - - public QueryContainer with(List r) { - return new QueryContainer(query, aggs, r, aliases, pseudoFunctions, scalarFunctions, sort, limit); + return new QueryContainer(q, aggs, fields, aliases, pseudoFunctions, scalarFunctions, sort, limit); } - public QueryContainer withAliases(Map a) { - return new QueryContainer(query, aggs, columns, a, pseudoFunctions, scalarFunctions, sort, limit); + public QueryContainer withAliases(AttributeMap a) { + return new QueryContainer(query, aggs, fields, a, pseudoFunctions, scalarFunctions, sort, limit); } public QueryContainer withPseudoFunctions(Map p) { - return new QueryContainer(query, aggs, columns, aliases, p, scalarFunctions, sort, limit); + return new QueryContainer(query, aggs, fields, aliases, p, scalarFunctions, sort, limit); } public QueryContainer with(Aggs a) { - return new QueryContainer(query, a, columns, aliases, pseudoFunctions, scalarFunctions, sort, limit); + return new QueryContainer(query, a, fields, aliases, pseudoFunctions, scalarFunctions, sort, limit); } public QueryContainer withLimit(int l) { - return l == limit ? this : new QueryContainer(query, aggs, columns, aliases, pseudoFunctions, scalarFunctions, sort, l); + return l == limit ? this : new QueryContainer(query, aggs, fields, aliases, pseudoFunctions, scalarFunctions, sort, l); } - public QueryContainer withScalarProcessors(Map procs) { - return new QueryContainer(query, aggs, columns, aliases, pseudoFunctions, procs, sort, limit); + public QueryContainer withScalarProcessors(AttributeMap procs) { + return new QueryContainer(query, aggs, fields, aliases, pseudoFunctions, procs, sort, limit); } - public QueryContainer sort(Sort sortable) { + public QueryContainer addSort(Sort sortable) { Set sort = new LinkedHashSet<>(this.sort); sort.add(sortable); - return new QueryContainer(query, aggs, columns, aliases, pseudoFunctions, scalarFunctions, sort, limit); + return new QueryContainer(query, aggs, fields, aliases, pseudoFunctions, scalarFunctions, sort, limit); } private String aliasName(Attribute attr) { @@ -190,7 +289,8 @@ private Tuple nestedHitFieldRef(FieldAttribute attr.field().isAggregatable(), attr.parent().name()); nestedRefs.add(nestedFieldRef); - return new Tuple<>(new QueryContainer(q, aggs, columns, aliases, pseudoFunctions, scalarFunctions, sort, limit), nestedFieldRef); + return new Tuple<>(new QueryContainer(q, aggs, fields, aliases, pseudoFunctions, scalarFunctions, sort, limit), + nestedFieldRef); } static Query rewriteToContainNestedField(@Nullable Query query, Source source, String path, String name, String format, @@ -257,13 +357,13 @@ public FieldExtraction resolve(Attribute attribute) { // update proc Map procs = new LinkedHashMap<>(qContainer.scalarFunctions()); procs.put(attribute, proc); - qContainer = qContainer.withScalarProcessors(procs); + qContainer = qContainer.withScalarProcessors(new AttributeMap<>(procs)); return new Tuple<>(qContainer, new ComputedRef(proc)); } public QueryContainer addColumn(Attribute attr) { Tuple tuple = toReference(attr); - return tuple.v1().addColumn(tuple.v2()); + return tuple.v1().addColumn(tuple.v2(), attr); } private Tuple toReference(Attribute attr) { @@ -288,11 +388,14 @@ private Tuple toReference(Attribute attr) { throw new SqlIllegalArgumentException("Unknown output attribute {}", attr); } - public QueryContainer addColumn(FieldExtraction ref) { - return with(combine(columns, ref)); + public QueryContainer addColumn(FieldExtraction ref, Attribute attr) { + ExpressionId id = attr instanceof AggregateFunctionAttribute ? ((AggregateFunctionAttribute) attr).innerId() : attr.id(); + return new QueryContainer(query, aggs, combine(fields, new Tuple<>(ref, id)), aliases, pseudoFunctions, + scalarFunctions, + sort, limit); } - public Map scalarFunctions() { + public AttributeMap scalarFunctions() { return scalarFunctions; } @@ -300,11 +403,14 @@ public Map scalarFunctions() { // agg methods // - public QueryContainer addAggCount(GroupByKey group, String functionId) { + public QueryContainer addAggCount(GroupByKey group, ExpressionId functionId) { FieldExtraction ref = group == null ? GlobalCountRef.INSTANCE : new GroupByRef(group.id(), Property.COUNT, null); Map pseudoFunctions = new LinkedHashMap<>(this.pseudoFunctions); - pseudoFunctions.put(functionId, group); - return new QueryContainer(query, aggs, combine(columns, ref), aliases, pseudoFunctions, scalarFunctions, sort, limit); + pseudoFunctions.put(functionId.toString(), group); + return new QueryContainer(query, aggs, combine(fields, new Tuple<>(ref, functionId)), + aliases, + pseudoFunctions, + scalarFunctions, sort, limit); } public QueryContainer addAgg(String groupId, LeafAgg agg) { @@ -329,7 +435,7 @@ public QueryContainer updateGroup(GroupByKey group) { @Override public int hashCode() { - return Objects.hash(query, aggs, columns, aliases); + return Objects.hash(query, aggs, fields, aliases, sort, limit); } @Override @@ -345,7 +451,7 @@ public boolean equals(Object obj) { QueryContainer other = (QueryContainer) obj; return Objects.equals(query, other.query) && Objects.equals(aggs, other.aggs) - && Objects.equals(columns, other.columns) + && Objects.equals(fields, other.fields) && Objects.equals(aliases, other.aliases) && Objects.equals(sort, other.sort) && Objects.equals(limit, other.limit); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/querydsl/container/Sort.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/querydsl/container/Sort.java index 549b53994ecbb..33a9865b64f3c 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/querydsl/container/Sort.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/querydsl/container/Sort.java @@ -9,7 +9,7 @@ import org.elasticsearch.xpack.sql.expression.Order.NullsPosition; import org.elasticsearch.xpack.sql.expression.Order.OrderDirection; -public class Sort { +public abstract class Sort { public enum Direction { ASC, DESC; diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java index 6a3de2cf174de..d3495c4f719a7 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import org.elasticsearch.xpack.sql.execution.search.CompositeAggregationCursor; +import org.elasticsearch.xpack.sql.execution.search.PagingListCursor; import org.elasticsearch.xpack.sql.execution.search.ScrollCursor; import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractors; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors; @@ -48,6 +49,7 @@ public static List getNamedWriteables() { entries.add(new NamedWriteableRegistry.Entry(Cursor.class, ScrollCursor.NAME, ScrollCursor::new)); entries.add(new NamedWriteableRegistry.Entry(Cursor.class, CompositeAggregationCursor.NAME, CompositeAggregationCursor::new)); entries.add(new NamedWriteableRegistry.Entry(Cursor.class, TextFormatterCursor.NAME, TextFormatterCursor::new)); + entries.add(new NamedWriteableRegistry.Entry(Cursor.class, PagingListCursor.NAME, PagingListCursor::new)); // plus all their dependencies entries.addAll(Processors.getNamedWriteables()); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSetCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSet.java similarity index 89% rename from x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSetCursor.java rename to x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSet.java index 7e943931e910c..9237850998a58 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSetCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSet.java @@ -7,10 +7,10 @@ import org.elasticsearch.xpack.sql.type.Schema; -class EmptyRowSetCursor extends AbstractRowSet implements SchemaRowSet { +class EmptyRowSet extends AbstractRowSet implements SchemaRowSet { private final Schema schema; - EmptyRowSetCursor(Schema schema) { + EmptyRowSet(Schema schema) { this.schema = schema; } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSetCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSet.java similarity index 84% rename from x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSetCursor.java rename to x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSet.java index 39987d21ac6b9..0122d333f7e9d 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSetCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSet.java @@ -9,25 +9,25 @@ import java.util.List; -class ListRowSetCursor extends AbstractRowSet implements SchemaRowSet { +public class ListRowSet extends AbstractRowSet implements SchemaRowSet { private final Schema schema; private final List> list; private int pos = 0; - ListRowSetCursor(Schema schema, List> list) { + protected ListRowSet(Schema schema, List> list) { this.schema = schema; this.list = list; } @Override protected boolean doHasCurrent() { - return pos < list.size(); + return pos < size(); } @Override protected boolean doNext() { - if (pos + 1 < list.size()) { + if (pos + 1 < size()) { pos++; return true; } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Rows.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Rows.java index 00b261b4be71a..dac192318a209 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Rows.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Rows.java @@ -36,7 +36,7 @@ public static SchemaRowSet of(List attrs, List> values) { } Schema schema = schema(attrs); - return new ListRowSetCursor(schema, values); + return new ListRowSet(schema, values); } public static SchemaRowSet singleton(List attrs, Object... values) { @@ -49,10 +49,10 @@ public static SchemaRowSet singleton(Schema schema, Object... values) { } public static SchemaRowSet empty(Schema schema) { - return new EmptyRowSetCursor(schema); + return new EmptyRowSet(schema); } public static SchemaRowSet empty(List attrs) { - return new EmptyRowSetCursor(schema(attrs)); + return new EmptyRowSet(schema(attrs)); } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java index fc023bb6a0186..ae1a6f14da522 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java @@ -15,6 +15,7 @@ import org.elasticsearch.xpack.sql.analysis.index.IndexResolution; import org.elasticsearch.xpack.sql.analysis.index.IndexResolver; import org.elasticsearch.xpack.sql.analysis.index.MappingException; +import org.elasticsearch.xpack.sql.execution.PlanExecutor; import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry; import org.elasticsearch.xpack.sql.optimizer.Optimizer; import org.elasticsearch.xpack.sql.parser.SqlParser; @@ -40,20 +41,17 @@ public class SqlSession { private final Verifier verifier; private final Optimizer optimizer; private final Planner planner; - + private final PlanExecutor planExecutor; + private final Configuration configuration; - public SqlSession(SqlSession other) { - this(other.configuration, other.client, other.functionRegistry, other.indexResolver, - other.preAnalyzer, other.verifier, other.optimizer, other.planner); - } - public SqlSession(Configuration configuration, Client client, FunctionRegistry functionRegistry, IndexResolver indexResolver, PreAnalyzer preAnalyzer, Verifier verifier, Optimizer optimizer, - Planner planner) { + Planner planner, + PlanExecutor planExecutor) { this.client = client; this.functionRegistry = functionRegistry; @@ -64,6 +62,7 @@ public SqlSession(Configuration configuration, Client client, FunctionRegistry f this.verifier = verifier; this.configuration = configuration; + this.planExecutor = planExecutor; } public FunctionRegistry functionRegistry() { @@ -90,6 +89,10 @@ public Verifier verifier() { return verifier; } + public PlanExecutor planExecutor() { + return planExecutor; + } + private LogicalPlan doParse(String sql, List params) { return new SqlParser().createStatement(sql, params); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/tree/Node.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/tree/Node.java index fb6d3da92758f..2e40244a41589 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/tree/Node.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/tree/Node.java @@ -281,6 +281,14 @@ public String nodeName() { return getClass().getSimpleName(); } + /** + * The values of all the properties that are important + * to this {@link Node}. + */ + public List nodeProperties() { + return info().properties(); + } + public String nodeString() { StringBuilder sb = new StringBuilder(); sb.append(nodeName()); @@ -349,7 +357,6 @@ final StringBuilder treeString(StringBuilder sb, int depth, BitSet hasParentPerD * {@code [} and {@code ]} of the output of {@link #treeString}. */ public String propertiesToString(boolean skipIfChild) { - NodeInfo> info = info(); StringBuilder sb = new StringBuilder(); List children = children(); @@ -358,7 +365,7 @@ public String propertiesToString(boolean skipIfChild) { int maxWidth = 0; boolean needsComma = false; - List props = info.properties(); + List props = nodeProperties(); for (Object prop : props) { // consider a property if it is not ignored AND // it's not a child (optional) @@ -388,12 +395,4 @@ public String propertiesToString(boolean skipIfChild) { return sb.toString(); } - - /** - * The values of all the properties that are important - * to this {@link Node}. - */ - public List properties() { - return info().properties(); - } -} +} \ No newline at end of file diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/tree/NodeInfo.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/tree/NodeInfo.java index 8af8ff637b3d7..d23c93b31beea 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/tree/NodeInfo.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/tree/NodeInfo.java @@ -349,6 +349,54 @@ public interface NodeCtor8 { T apply(Source l, P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6, P7 p7, P8 p8); } + public static , P1, P2, P3, P4, P5, P6, P7, P8, P9> NodeInfo create( + T n, NodeCtor9 ctor, + P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6, P7 p7, P8 p8, P9 p9) { + return new NodeInfo(n) { + @Override + protected List innerProperties() { + return Arrays.asList(p1, p2, p3, p4, p5, p6, p7, p8, p9); + } + + protected T innerTransform(Function rule) { + boolean same = true; + + @SuppressWarnings("unchecked") + P1 newP1 = (P1) rule.apply(p1); + same &= Objects.equals(p1, newP1); + @SuppressWarnings("unchecked") + P2 newP2 = (P2) rule.apply(p2); + same &= Objects.equals(p2, newP2); + @SuppressWarnings("unchecked") + P3 newP3 = (P3) rule.apply(p3); + same &= Objects.equals(p3, newP3); + @SuppressWarnings("unchecked") + P4 newP4 = (P4) rule.apply(p4); + same &= Objects.equals(p4, newP4); + @SuppressWarnings("unchecked") + P5 newP5 = (P5) rule.apply(p5); + same &= Objects.equals(p5, newP5); + @SuppressWarnings("unchecked") + P6 newP6 = (P6) rule.apply(p6); + same &= Objects.equals(p6, newP6); + @SuppressWarnings("unchecked") + P7 newP7 = (P7) rule.apply(p7); + same &= Objects.equals(p7, newP7); + @SuppressWarnings("unchecked") + P8 newP8 = (P8) rule.apply(p8); + same &= Objects.equals(p8, newP8); + @SuppressWarnings("unchecked") + P9 newP9 = (P9) rule.apply(p9); + same &= Objects.equals(p9, newP9); + + return same ? node : ctor.apply(node.source(), newP1, newP2, newP3, newP4, newP5, newP6, newP7, newP8, newP9); + } + }; + } + public interface NodeCtor9 { + T apply(Source l, P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6, P7 p7, P8 p8, P9 p9); + } + public static , P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> NodeInfo create( T n, NodeCtor10 ctor, P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6, P7 p7, P8 p8, P9 p9, P10 p10) { diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/util/Graphviz.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/util/Graphviz.java index 97e84efa3b0d5..ad6a97f5862c9 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/util/Graphviz.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/util/Graphviz.java @@ -132,7 +132,7 @@ private static void handleNode(StringBuilder output, Node n, AtomicInteger no + "\n"); indent(nodeInfo, currentIndent + NODE_LABEL_INDENT); - List props = n.properties(); + List props = n.nodeProperties(); List parsed = new ArrayList<>(props.size()); List> subTrees = new ArrayList<>(); diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/analysis/analyzer/VerifierErrorMessagesTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/analysis/analyzer/VerifierErrorMessagesTests.java index 4279910e0e03b..3526c2826a9ba 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/analysis/analyzer/VerifierErrorMessagesTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/analysis/analyzer/VerifierErrorMessagesTests.java @@ -312,14 +312,19 @@ public void testUnsupportedTypeInOrder() { error("SELECT * FROM test ORDER BY unsupported")); } - public void testGroupByOrderByNonKey() { - assertEquals("1:52: Cannot order by non-grouped column [a], expected [bool]", - error("SELECT AVG(int) a FROM test GROUP BY bool ORDER BY a")); + // public void testGroupByOrderByNonKey() { + // assertEquals("1:52: Cannot order by non-grouped column [a], expected [bool]", + // error("SELECT AVG(int) a FROM test GROUP BY bool ORDER BY a")); + // } + + public void testGroupByOrderByAggs() { + accept("SELECT int FROM test GROUP BY int ORDER BY COUNT(*)"); } - public void testGroupByOrderByFunctionOverKey() { - assertEquals("1:44: Cannot order by non-grouped column [MAX(int)], expected [int]", - error("SELECT int FROM test GROUP BY int ORDER BY MAX(int)")); + public void testGroupByOrderByAggAndGroupedColumn() { + assertEquals("1:49: Cannot order by aggregated [MAX(int)] and non-aggregated [int] columns" + + " at the same time; use either one or the other", + error("SELECT int FROM test GROUP BY int ORDER BY int, MAX(int)")); } public void testGroupByOrderByScore() { diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursorTests.java index dba3e3ddfda87..f2dccc396dbd3 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursorTests.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.BitSet; import java.util.List; import java.util.function.Supplier; @@ -27,7 +28,9 @@ public static CompositeAggregationCursor randomCompositeCursor() { for (int i = 0; i < extractorsSize; i++) { extractors.add(randomBucketExtractor()); } - return new CompositeAggregationCursor(new byte[randomInt(256)], extractors, randomIntBetween(10, 1024), randomAlphaOfLength(5)); + + return new CompositeAggregationCursor(new byte[randomInt(256)], extractors, randomBitSet(extractorsSize), + randomIntBetween(10, 1024), randomAlphaOfLength(5)); } static BucketExtractor randomBucketExtractor() { @@ -41,7 +44,9 @@ static BucketExtractor randomBucketExtractor() { @Override protected CompositeAggregationCursor mutateInstance(CompositeAggregationCursor instance) throws IOException { return new CompositeAggregationCursor(instance.next(), instance.extractors(), - randomValueOtherThan(instance.limit(), () -> randomIntBetween(1, 512)), instance.indices()); + randomValueOtherThan(instance.mask(), () -> randomBitSet(instance.extractors().size())), + randomValueOtherThan(instance.limit(), () -> randomIntBetween(1, 512)), + instance.indices()); } @Override @@ -68,4 +73,12 @@ protected CompositeAggregationCursor copyInstance(CompositeAggregationCursor ins } return (CompositeAggregationCursor) Cursors.decodeFromString(Cursors.encodeToString(version, instance)); } + + static BitSet randomBitSet(int size) { + BitSet mask = new BitSet(size); + for (int i = 0; i < size; i++) { + mask.set(i, randomBoolean()); + } + return mask; + } } \ No newline at end of file diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CursorTests.java index 67b0f2badd883..0da102edf5a97 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CursorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CursorTests.java @@ -23,17 +23,18 @@ import org.mockito.ArgumentCaptor; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collections; import java.util.List; import java.util.function.Supplier; import static org.elasticsearch.action.support.PlainActionFuture.newFuture; +import static org.elasticsearch.xpack.sql.action.BasicFormatter.FormatOption.CLI; +import static org.elasticsearch.xpack.sql.action.BasicFormatter.FormatOption.TEXT; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; -import static org.elasticsearch.xpack.sql.action.BasicFormatter.FormatOption.CLI; -import static org.elasticsearch.xpack.sql.action.BasicFormatter.FormatOption.TEXT; public class CursorTests extends ESTestCase { @@ -51,7 +52,7 @@ public void testScrollCursorClearCursor() { Client clientMock = mock(Client.class); ActionListener listenerMock = mock(ActionListener.class); String cursorString = randomAlphaOfLength(10); - Cursor cursor = new ScrollCursor(cursorString, Collections.emptyList(), randomInt()); + Cursor cursor = new ScrollCursor(cursorString, Collections.emptyList(), new BitSet(0), randomInt()); cursor.clear(TestUtils.TEST_CFG, clientMock, listenerMock); diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursorTests.java new file mode 100644 index 0000000000000..fafb8834dbb90 --- /dev/null +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursorTests.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.execution.search; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.sql.session.Cursors; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class PagingListCursorTests extends AbstractWireSerializingTestCase { + public static PagingListCursor randomPagingListCursor() { + int size = between(1, 20); + int depth = between(1, 20); + + List> values = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + values.add(Arrays.asList(randomArray(depth, s -> new Object[depth], () -> randomByte()))); + } + + return new PagingListCursor(values, between(1, 20)); + } + + @Override + protected PagingListCursor mutateInstance(PagingListCursor instance) throws IOException { + return new PagingListCursor(instance.data(), + randomValueOtherThan(instance.pageSize(), () -> between(1, 20))); + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry(Cursors.getNamedWriteables()); + } + + @Override + protected PagingListCursor createTestInstance() { + return randomPagingListCursor(); + } + + @Override + protected Reader instanceReader() { + return PagingListCursor::new; + } + + @Override + protected PagingListCursor copyInstance(PagingListCursor instance, Version version) throws IOException { + /* Randomly choose between internal protocol round trip and String based + * round trips used to toXContent. */ + if (randomBoolean()) { + return super.copyInstance(instance, version); + } + return (PagingListCursor) Cursors.decodeFromString(Cursors.encodeToString(version, instance)); + } +} \ No newline at end of file diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java index 9a3a2fe2eb9d5..d34107e6519d4 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java @@ -27,7 +27,8 @@ public static ScrollCursor randomScrollCursor() { for (int i = 0; i < extractorsSize; i++) { extractors.add(randomHitExtractor(0)); } - return new ScrollCursor(randomAlphaOfLength(5), extractors, randomIntBetween(10, 1024)); + return new ScrollCursor(randomAlphaOfLength(5), extractors, CompositeAggregationCursorTests.randomBitSet(extractorsSize), + randomIntBetween(10, 1024)); } static HitExtractor randomHitExtractor(int depth) { @@ -43,6 +44,7 @@ static HitExtractor randomHitExtractor(int depth) { @Override protected ScrollCursor mutateInstance(ScrollCursor instance) throws IOException { return new ScrollCursor(instance.scrollId(), instance.extractors(), + randomValueOtherThan(instance.mask(), () -> CompositeAggregationCursorTests.randomBitSet(instance.extractors().size())), randomValueOtherThan(instance.limit(), () -> randomIntBetween(1, 1024))); } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/SourceGeneratorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/SourceGeneratorTests.java index eabbfaba7976e..0c56d7783f8b6 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/SourceGeneratorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/SourceGeneratorTests.java @@ -85,7 +85,7 @@ public void testSelectScoreForcesTrackingScore() { public void testSortScoreSpecified() { QueryContainer container = new QueryContainer() - .sort(new ScoreSort(Direction.DESC, null)); + .addSort(new ScoreSort(Direction.DESC, null)); SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(container, null, randomIntBetween(1, 10)); assertEquals(singletonList(scoreSort()), sourceBuilder.sorts()); } @@ -94,13 +94,13 @@ public void testSortFieldSpecified() { FieldSortBuilder sortField = fieldSort("test").unmappedType("keyword"); QueryContainer container = new QueryContainer() - .sort(new AttributeSort(new FieldAttribute(Source.EMPTY, "test", new KeywordEsField("test")), Direction.ASC, + .addSort(new AttributeSort(new FieldAttribute(Source.EMPTY, "test", new KeywordEsField("test")), Direction.ASC, Missing.LAST)); SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(container, null, randomIntBetween(1, 10)); assertEquals(singletonList(sortField.order(SortOrder.ASC).missing("_last")), sourceBuilder.sorts()); container = new QueryContainer() - .sort(new AttributeSort(new FieldAttribute(Source.EMPTY, "test", new KeywordEsField("test")), Direction.DESC, + .addSort(new AttributeSort(new FieldAttribute(Source.EMPTY, "test", new KeywordEsField("test")), Direction.DESC, Missing.FIRST)); sourceBuilder = SourceGenerator.sourceBuilder(container, null, randomIntBetween(1, 10)); assertEquals(singletonList(sortField.order(SortOrder.DESC).missing("_first")), sourceBuilder.sorts()); diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/parser/SqlParserTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/parser/SqlParserTests.java index a29feff881637..dd44a8e464ae4 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/parser/SqlParserTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/parser/SqlParserTests.java @@ -306,7 +306,7 @@ public void testLimitStackOverflowForInAndLiteralsIsNotApplied() { In in = (In) filter.condition(); assertEquals("?a", in.value().toString()); assertEquals(noChildren, in.list().size()); - assertThat(in.list().get(0).toString(), startsWith("a + b#")); + assertThat(in.list().get(0).toString(), startsWith("Add[?a,?b]")); } public void testDecrementOfDepthCounter() { diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysParserTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysParserTests.java index e737258ef1982..a1accd28ab4d9 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysParserTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysParserTests.java @@ -53,7 +53,7 @@ private Tuple sql(String sql) { return Void.TYPE; }).when(resolver).resolveAsSeparateMappings(any(), any(), any()); - SqlSession session = new SqlSession(TestUtils.TEST_CFG, null, null, resolver, null, null, null, null); + SqlSession session = new SqlSession(TestUtils.TEST_CFG, null, null, resolver, null, null, null, null, null); return new Tuple<>(cmd, session); } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTablesTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTablesTests.java index 257c6733805d3..9487986a711e7 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTablesTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTablesTests.java @@ -243,7 +243,7 @@ private Tuple sql(String sql, List para IndexResolver resolver = mock(IndexResolver.class); when(resolver.clusterName()).thenReturn(CLUSTER_NAME); - SqlSession session = new SqlSession(null, null, null, resolver, null, null, null, null); + SqlSession session = new SqlSession(null, null, null, resolver, null, null, null, null, null); return new Tuple<>(cmd, session); } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypesTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypesTests.java index 41ddb518ce6d9..e6061f197149b 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypesTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypesTests.java @@ -36,7 +36,7 @@ private Tuple sql(String sql) { Command cmd = (Command) analyzer.analyze(parser.createStatement(sql), false); IndexResolver resolver = mock(IndexResolver.class); - SqlSession session = new SqlSession(null, null, null, resolver, null, null, null, null); + SqlSession session = new SqlSession(null, null, null, resolver, null, null, null, null, null); return new Tuple<>(cmd, session); } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/tree/NodeSubclassTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/tree/NodeSubclassTests.java index e7485157d3b8b..03f1a032cdab2 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/tree/NodeSubclassTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/tree/NodeSubclassTests.java @@ -253,8 +253,8 @@ private void assertTransformedOrReplacedChildren(T node, B transformed, Construc * the one property of the node that we intended to transform. */ assertEquals(node.source(), transformed.source()); - List op = node.properties(); - List tp = transformed.properties(); + List op = node.nodeProperties(); + List tp = transformed.nodeProperties(); for (int p = 0; p < op.size(); p++) { if (p == changedArgOffset - 1) { // -1 because location isn't in the list assertEquals(changedArgValue, tp.get(p));