diff --git a/integ-test/src/test/resources/clickbench/queries/expected/expected-q39.json b/integ-test/src/test/resources/clickbench/queries/expected/expected-q39.json index c3893e79a1a..2726f424910 100644 --- a/integ-test/src/test/resources/clickbench/queries/expected/expected-q39.json +++ b/integ-test/src/test/resources/clickbench/queries/expected/expected-q39.json @@ -10,11 +10,15 @@ } ], "datarows": [ + [ + 1, + "https://example.com/page?ref=google" + ], [ 1, "https://www.google.com/search?q=test" ] ], - "total": 1, - "size": 1 -} + "total": 2, + "size": 2 +} \ No newline at end of file diff --git a/integ-test/src/test/resources/clickbench/queries/expected/expected-q40.json b/integ-test/src/test/resources/clickbench/queries/expected/expected-q40.json index 3da5b5d205b..0e0ac11fd05 100644 --- a/integ-test/src/test/resources/clickbench/queries/expected/expected-q40.json +++ b/integ-test/src/test/resources/clickbench/queries/expected/expected-q40.json @@ -26,6 +26,14 @@ } ], "datarows": [ + [ + 1, + -1, + 2, + 0, + "", + "https://google.com/maps" + ], [ 1, -1, @@ -59,6 +67,6 @@ "https://example.com/page?ref=google" ] ], - "total": 4, - "size": 4 -} + "total": 5, + "size": 5 +} \ No newline at end of file diff --git a/integ-test/src/test/resources/clickbench/queries/expected/expected-q42.json b/integ-test/src/test/resources/clickbench/queries/expected/expected-q42.json index e4c852b2c16..88f1e34c248 100644 --- a/integ-test/src/test/resources/clickbench/queries/expected/expected-q42.json +++ b/integ-test/src/test/resources/clickbench/queries/expected/expected-q42.json @@ -14,12 +14,17 @@ } ], "datarows": [ + [ + 1, + 1024, + 768 + ], [ 1, 1280, 1024 ] ], - "total": 1, - "size": 1 -} + "total": 2, + "size": 2 +} \ No newline at end of file diff --git a/integ-test/src/test/resources/clickbench/queries/expected/expected-q43.json b/integ-test/src/test/resources/clickbench/queries/expected/expected-q43.json index 3450372b184..97ad19c599d 100644 --- a/integ-test/src/test/resources/clickbench/queries/expected/expected-q43.json +++ b/integ-test/src/test/resources/clickbench/queries/expected/expected-q43.json @@ -10,6 +10,10 @@ } ], "datarows": [ + [ + 1, + "2013-07-02 16:00:00" + ], [ 1, "2013-07-05 11:00:00" @@ -27,6 +31,6 @@ "2013-07-15 10:00:00" ] ], - "total": 4, - "size": 4 + "total": 5, + "size": 5 } diff --git a/integ-test/src/test/resources/clickbench/queries/q39.ppl b/integ-test/src/test/resources/clickbench/queries/q39.ppl index d3a76649372..b03863ce5af 100644 --- a/integ-test/src/test/resources/clickbench/queries/q39.ppl +++ b/integ-test/src/test/resources/clickbench/queries/q39.ppl @@ -8,4 +8,4 @@ source=hits | where CounterID = 62 and EventDate >= '2013-07-01 00:00:00' and EventDate <= '2013-07-31 00:00:00' and IsRefresh = 0 and IsLink != 0 and IsDownload = 0 | stats bucket_nullable=false count() as PageViews by URL | sort - PageViews -| head 10 from 1 +| head 1010 diff --git a/integ-test/src/test/resources/clickbench/queries/q40.ppl b/integ-test/src/test/resources/clickbench/queries/q40.ppl index 22ceb36c75d..5ebda81437b 100644 --- a/integ-test/src/test/resources/clickbench/queries/q40.ppl +++ b/integ-test/src/test/resources/clickbench/queries/q40.ppl @@ -8,4 +8,4 @@ source=hits | eval Src=case(SearchEngineID = 0 and AdvEngineID = 0, Referer else ''), Dst=URL | stats /*bucket_nullable=false*/ count() as PageViews by TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst | sort - PageViews -| head 10 from 1 +| head 1010 diff --git a/integ-test/src/test/resources/clickbench/queries/q41.ppl b/integ-test/src/test/resources/clickbench/queries/q41.ppl index 8cde43b3152..71811c72616 100644 --- a/integ-test/src/test/resources/clickbench/queries/q41.ppl +++ b/integ-test/src/test/resources/clickbench/queries/q41.ppl @@ -8,4 +8,4 @@ source=hits | where CounterID = 62 and EventDate >= '2013-07-01 00:00:00' and EventDate <= '2013-07-31 00:00:00' and IsRefresh = 0 and TraficSourceID in (-1, 6) and RefererHash = 3594120000172545465 | stats bucket_nullable=false count() as PageViews by URLHash, EventDate | sort - PageViews -| head 10 from 100 +| head 110 diff --git a/integ-test/src/test/resources/clickbench/queries/q42.ppl b/integ-test/src/test/resources/clickbench/queries/q42.ppl index a4a6a740e6d..8befc5fd08a 100644 --- a/integ-test/src/test/resources/clickbench/queries/q42.ppl +++ b/integ-test/src/test/resources/clickbench/queries/q42.ppl @@ -8,4 +8,4 @@ source=hits | where CounterID = 62 and EventDate >= '2013-07-01 00:00:00' and EventDate <= '2013-07-31 00:00:00' and IsRefresh = 0 and DontCountHits = 0 and URLHash = 2868770270353813622 | stats bucket_nullable=false count() as PageViews by WindowClientWidth, WindowClientHeight | sort - PageViews -| head 10 from 1 +| head 10010 diff --git a/integ-test/src/test/resources/clickbench/queries/q43.ppl b/integ-test/src/test/resources/clickbench/queries/q43.ppl index 9e0d7a5ce94..281b9801c20 100644 --- a/integ-test/src/test/resources/clickbench/queries/q43.ppl +++ b/integ-test/src/test/resources/clickbench/queries/q43.ppl @@ -11,4 +11,4 @@ source=hits | eval M = date_format(EventTime, '%Y-%m-%d %H:00:00') | stats /*bucket_nullable=false*/ count() as PageViews by M | sort M -| head 10 from 1 +| head 1010 diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java index 8e22a0d12ee..a5a804b16a8 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java @@ -273,7 +273,7 @@ public OpenSearchQueryRequest(StreamInput in, OpenSearchStorageEngine engine) th exprValueFactory = new OpenSearchExprValueFactory( index.getFieldOpenSearchTypes(), index.isFieldTypeTolerance()); - + // RelNode tree is not serialized/deserialized for now // It is only used during the initial query execution this.pushedDownRelNodeTree = null; @@ -418,14 +418,6 @@ public void writeTo(StreamOutput out) throws IOException { } public static byte[] convertToSubstraitAndSerialize(RelNode relNode) { - relNode = null; // Setting the relNode as Null so that it always uses full relNode from the ThreadLocal - if (relNode == null) { - LOGGER.info("RelNode is null, retrieving from ThreadLocal (CalciteToolsHelper.OpenSearchRelRunners)"); - relNode = CalciteToolsHelper.OpenSearchRelRunners.getCurrentRelNode(); - } else { - LOGGER.info("RelNode provided directly from pushedDownRelNodeTree"); - } - CalciteToolsHelper.OpenSearchRelRunners.clearCurrentRelNode(); LOGGER.info("Calcite Logical Plan before Conversion\n {}", RelOptUtil.toString(relNode)); // Preprocess the Calcite plan diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java index 6988890919b..5c4e911a34e 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java @@ -82,7 +82,7 @@ public void register(RelOptPlanner planner) { // remove this rule otherwise opensearch can't correctly interpret approx_count_distinct() // it is converted to cardinality aggregation in OpenSearch planner.removeRule(CoreRules.AGGREGATE_EXPAND_DISTINCT_AGGREGATES); - + // Remove FILTER_REDUCE_EXPRESSIONS rule to prevent conversion of range comparisons to SEARCH // This is needed for Substrait compatibility which doesn't support SEARCH operations planner.removeRule(CoreRules.FILTER_REDUCE_EXPRESSIONS); @@ -118,18 +118,21 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) { public Enumerable<@Nullable Object> scan() { // Reconstruct pushed-down RelNode tree RelNode pushedDownTree = null; + RelNode fullRelNodeTree = null; try { if (pushDownContext != null && !pushDownContext.isEmpty()) { - LOG.info("Full RelNode tree:\n{}", RelOptUtil.toString(CalciteToolsHelper.OpenSearchRelRunners.getCurrentRelNode())); + fullRelNodeTree = CalciteToolsHelper.OpenSearchRelRunners.getCurrentRelNode(); + CalciteToolsHelper.OpenSearchRelRunners.clearCurrentRelNode(); + LOG.info("Full RelNode tree:\n{}", RelOptUtil.toString(fullRelNodeTree)); LOG.info("=== PushDownContext contains {} operations ===", pushDownContext.size()); int index = 0; for (var operation : pushDownContext) { - LOG.info(" Operation {}: type={}, relNode={}", - index++, - operation.type(), + LOG.info(" Operation {}: type={}, relNode={}", + index++, + operation.type(), operation.relNode() != null ? operation.relNode().toString() : "NULL"); } - + // Create a base CalciteLogicalIndexScan for reconstruction CalciteLogicalIndexScan logicalIndexScan = new CalciteLogicalIndexScan(getCluster(), getTable(), osIndex); pushedDownTree = pushDownContext.reconstructPushedDownRelNodeTree(logicalIndexScan); @@ -139,18 +142,21 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) { LOG.error("Failed to reconstruct pushed-down RelNode tree", e); throw new RuntimeException("Failed to reconstruct pushed-down RelNode tree", e); } - + // Pass pushedDownTree to OpenSearchRequest via RequestBuilder final RelNode finalPushedDownTree = pushedDownTree; - + // If needed for testing use finalFullRelNodeTree + final RelNode finalFullRelNodeTree = fullRelNodeTree; + return new AbstractEnumerable<>() { @Override public Enumerator enumerator() { OpenSearchRequestBuilder requestBuilder = getOrCreateRequestBuilder(); // Set the RelNode tree on the request builder - if (finalPushedDownTree != null) { - requestBuilder.setPushedDownRelNodeTree(finalPushedDownTree); - } + requestBuilder.setPushedDownRelNodeTree(finalFullRelNodeTree); +// if (finalPushedDownTree != null) { +// requestBuilder.setPushedDownRelNodeTree(finalPushedDownTree); +// } return new OpenSearchIndexEnumerator( osIndex.getClient(), getFieldPath(), diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java index a16d1c5ba84..02bb02db0f0 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java @@ -139,7 +139,7 @@ public void register(RelOptPlanner planner) { } else { planner.addRule(OpenSearchIndexRules.RELEVANCE_FUNCTION_PUSHDOWN); } - + // Remove FILTER_REDUCE_EXPRESSIONS rule to prevent conversion of range comparisons to SEARCH // This is needed for Substrait compatibility which doesn't support SEARCH operations planner.removeRule(CoreRules.FILTER_REDUCE_EXPRESSIONS); @@ -158,10 +158,10 @@ public AbstractRelNode pushDownFilter(Filter filter) { filter.getCondition(), schema, fieldTypes, rowType, getCluster()); // TODO: handle the case where condition contains a score function CalciteLogicalIndexScan newScan = this.copy(); - + // Log the filter condition being stored to check if SEARCH optimization already happened LOG.info("Filter condition being stored: {}", filter.getCondition()); - + newScan.pushDownContext.add( queryExpression.getScriptCount() > 0 ? PushDownType.SCRIPT : PushDownType.FILTER, new FilterDigest( @@ -335,7 +335,7 @@ public CalciteLogicalIndexScan pushDownSortAggregateMeasure(Sort sort) { aggAction.rePushDownSortAggMeasure( sort.getCollation().getFieldCollations(), rowType.getFieldNames()); Object digest = sort.getCollation().getFieldCollations(); - newScan.pushDownContext.add(PushDownType.SORT_AGG_METRICS, digest, newAction, sort); + newScan.pushDownContext.add(PushDownType.SORT_AGG_METRICS, digest, newAction); return newScan; } catch (Exception e) { if (LOG.isDebugEnabled()) { @@ -442,7 +442,7 @@ public AbstractRelNode pushDownLimit(LogicalSort sort, Integer limit, Integer of updated ? aggAction -> aggAction.pushDownLimitIntoBucketSize(limit + offset) : aggAction -> {}; - newScan.pushDownContext.add(PushDownType.LIMIT, new LimitDigest(limit, offset), action); + newScan.pushDownContext.add(PushDownType.LIMIT, new LimitDigest(limit, offset), action, sort); return offset > 0 ? sort.copy(sort.getTraitSet(), List.of(newScan)) : newScan; } else { CalciteLogicalIndexScan newScan = this.copyWithNewSchema(getRowType());