From f41b4a64b793bf96cdfcfd3e08e40c37b9591fd4 Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Wed, 3 Dec 2025 21:07:52 -0800 Subject: [PATCH 1/7] Changes to convert only pushed down operations RelNode and Substrait Signed-off-by: Vinay Krishna Pudyodu --- .../request/OpenSearchQueryRequest.java | 52 +++++- .../request/OpenSearchRequestBuilder.java | 26 ++- .../scan/CalciteEnumerableIndexScan.java | 41 +++++ .../storage/scan/CalciteLogicalIndexScan.java | 20 ++- .../storage/scan/context/PushDownContext.java | 149 ++++++++++++++++++ .../scan/context/PushDownOperation.java | 14 +- 6 files changed, 287 insertions(+), 15 deletions(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java index d5ad7bcf416..1dd093ec918 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java @@ -151,6 +151,10 @@ public class OpenSearchQueryRequest implements OpenSearchRequest { private SearchResponse searchResponse = null; + /** RelNode tree for pushed-down operations (optional). */ + @EqualsAndHashCode.Exclude @ToString.Exclude + private final org.apache.calcite.rel.RelNode pushedDownRelNodeTree; + private static final Logger LOGGER = LogManager.getLogger(OpenSearchQueryRequest.class); @@ -170,6 +174,7 @@ public OpenSearchQueryRequest( sourceBuilder.timeout(DEFAULT_QUERY_TIMEOUT); this.exprValueFactory = factory; this.includes = includes; + this.pushedDownRelNodeTree = null; } /** Constructor of OpenSearchQueryRequest. */ @@ -178,10 +183,21 @@ public OpenSearchQueryRequest( SearchSourceBuilder sourceBuilder, OpenSearchExprValueFactory factory, List includes) { + this(indexName, sourceBuilder, factory, includes, (RelNode) null); + } + + /** Constructor of OpenSearchQueryRequest with RelNode tree support. */ + public OpenSearchQueryRequest( + IndexName indexName, + SearchSourceBuilder sourceBuilder, + OpenSearchExprValueFactory factory, + List includes, + RelNode pushedDownRelNodeTree) { this.indexName = indexName; this.sourceBuilder = sourceBuilder; this.exprValueFactory = factory; this.includes = includes; + this.pushedDownRelNodeTree = pushedDownRelNodeTree; } /** Constructor of OpenSearchQueryRequest with PIT support. */ @@ -192,12 +208,25 @@ public OpenSearchQueryRequest( List includes, TimeValue cursorKeepAlive, String pitId) { + this(indexName, sourceBuilder, factory, includes, cursorKeepAlive, pitId, null); + } + + /** Constructor of OpenSearchQueryRequest with PIT and RelNode tree support. */ + public OpenSearchQueryRequest( + IndexName indexName, + SearchSourceBuilder sourceBuilder, + OpenSearchExprValueFactory factory, + List includes, + TimeValue cursorKeepAlive, + String pitId, + RelNode pushedDownRelNodeTree) { this.indexName = indexName; this.sourceBuilder = sourceBuilder; this.exprValueFactory = factory; this.includes = includes; this.cursorKeepAlive = cursorKeepAlive; this.pitId = pitId; + this.pushedDownRelNodeTree = pushedDownRelNodeTree; } /** true if the request is a count aggregation request. */ @@ -243,6 +272,10 @@ public OpenSearchQueryRequest(StreamInput in, OpenSearchStorageEngine engine) th exprValueFactory = new OpenSearchExprValueFactory( index.getFieldOpenSearchTypes(), index.isFieldTypeTolerance()); + + // RelNode tree is not serialized/deserialized for now + // It is only used during the initial query execution + this.pushedDownRelNodeTree = null; } @Override @@ -258,7 +291,8 @@ public OpenSearchResponse search( // get the value before set searchDone = true boolean isCountAggRequest = isCountAggRequest(); searchDone = true; - sourceBuilder.queryPlanIR(convertToSubstraitAndSerialize(exprValueFactory)); + // Convert pushed-down RelNode tree to Substrait bytes + sourceBuilder.queryPlanIR(convertToSubstraitAndSerialize(pushedDownRelNodeTree)); return new OpenSearchResponse( searchAction.apply( new SearchRequest().indices(indexName.getIndexNames()).source(sourceBuilder)), @@ -280,7 +314,8 @@ public OpenSearchResponse searchWithPIT(Function SearchHits.empty(), exprValueFactory, includes, isCountAggRequest()); } else { this.sourceBuilder.pointInTimeBuilder(new PointInTimeBuilder(this.pitId)); - sourceBuilder.queryPlanIR(convertToSubstraitAndSerialize(exprValueFactory)); + // Convert pushed-down RelNode tree to Substrait bytes + sourceBuilder.queryPlanIR(convertToSubstraitAndSerialize(pushedDownRelNodeTree)); this.sourceBuilder.timeout(cursorKeepAlive); // check for search after if (searchAfter != null) { @@ -381,9 +416,16 @@ public void writeTo(StreamOutput out) throws IOException { } } - public static byte[] convertToSubstraitAndSerialize(OpenSearchExprValueFactory index) { - RelNode relNode = CalciteToolsHelper.OpenSearchRelRunners.getCurrentRelNode(); - CalciteToolsHelper.OpenSearchRelRunners.clearCurrentRelNode(); + public static byte[] convertToSubstraitAndSerialize(RelNode relNode) { + // If relNode is null, get it from ThreadLocal (fallback for non-Calcite queries) +// relNode = null; + if (relNode == null) { + LOGGER.info("RelNode is null, retrieving from ThreadLocal (CalciteToolsHelper.OpenSearchRelRunners)"); + relNode = CalciteToolsHelper.OpenSearchRelRunners.getCurrentRelNode(); + CalciteToolsHelper.OpenSearchRelRunners.clearCurrentRelNode(); + } else { + LOGGER.info("RelNode provided directly from pushedDownRelNodeTree"); + } LOGGER.info("Calcite Logical Plan before Conversion\n {}", RelOptUtil.toString(relNode)); // Preprocess the Calcite plan diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java index e40962d59fb..703107901ac 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java @@ -21,7 +21,9 @@ import java.util.stream.Stream; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.Setter; import lombok.ToString; +import org.apache.calcite.rel.RelNode; import org.apache.commons.lang3.tuple.Pair; import org.apache.lucene.search.join.ScoreMode; import org.opensearch.action.search.CreatePitRequest; @@ -73,6 +75,17 @@ public class OpenSearchRequestBuilder { @ToString.Exclude private final Settings settings; + /** RelNode tree for pushed-down operations (optional). + * -- SETTER -- + * Set the RelNode tree for pushed-down operations. + * This allows passing the reconstructed RelNode tree to be converted to Substrait later. + * + * @param pushedDownRelNodeTree RelNode tree representing pushed-down operations + */ + @Setter + @EqualsAndHashCode.Exclude @ToString.Exclude + private org.apache.calcite.rel.RelNode pushedDownRelNodeTree = null; + public static class PushDownUnSupportedException extends RuntimeException { public PushDownUnSupportedException(String message) { super(message); @@ -92,7 +105,8 @@ public OpenSearchRequestBuilder( this.exprValueFactory = exprValueFactory; } - /** + + /** * Build DSL request. * * @return query request with PIT or scroll request @@ -112,7 +126,8 @@ public OpenSearchRequest build( * 2. If mapping is empty. It means no data in the index. PIT search relies on `_id` fields to do sort, thus it will fail if using PIT search in this case. */ if (sourceBuilder.size() == 0 || isMappingEmpty) { - return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory, List.of()); + return new OpenSearchQueryRequest( + indexName, sourceBuilder, exprValueFactory, List.of(), pushedDownRelNodeTree); } return buildRequestWithPit(indexName, cursorKeepAlive, client); } @@ -129,12 +144,13 @@ private OpenSearchRequest buildRequestWithPit( // Search with PIT request String pitId = createPit(indexName, cursorKeepAlive, client); return new OpenSearchQueryRequest( - indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, pitId); + indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, pitId, pushedDownRelNodeTree); } else { sourceBuilder.from(startFrom); sourceBuilder.size(size); // Search with non-Pit request - return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory, includes); + return new OpenSearchQueryRequest( + indexName, sourceBuilder, exprValueFactory, includes, pushedDownRelNodeTree); } } else { if (startFrom != 0) { @@ -144,7 +160,7 @@ private OpenSearchRequest buildRequestWithPit( // Search with PIT request String pitId = createPit(indexName, cursorKeepAlive, client); return new OpenSearchQueryRequest( - indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, pitId); + indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, pitId, pushedDownRelNodeTree); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java index 20d8a6c34fd..e6af8a3cebc 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java @@ -21,6 +21,7 @@ import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.hint.RelHint; import org.apache.calcite.rel.rules.CoreRules; import org.apache.calcite.rel.type.RelDataType; @@ -29,6 +30,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.opensearch.sql.calcite.plan.OpenSearchRules; import org.opensearch.sql.calcite.plan.Scannable; +import org.opensearch.sql.calcite.utils.CalciteToolsHelper; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; import org.opensearch.sql.opensearch.storage.scan.context.PushDownContext; @@ -109,10 +111,49 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) { */ @Override public Enumerable<@Nullable Object> scan() { + // Reconstruct pushed-down RelNode tree + RelNode pushedDownTree = null; + try { + if (pushDownContext != null && !pushDownContext.isEmpty()) { + // Log all stored RelNodes in PushDownContext + LOG.info("=== PushDownContext contains {} operations ===", pushDownContext.size()); + int index = 0; + for (var operation : pushDownContext) { + LOG.info(" Operation {}: type={}, relNode={}", + index++, + operation.type(), + operation.relNode() != null ? operation.relNode().toString() : "NULL"); + } + + // Create a base CalciteLogicalIndexScan for reconstruction + CalciteLogicalIndexScan baseScan = new CalciteLogicalIndexScan( + getCluster(), + getTable(), + osIndex); + + LOG.info("Base scan for reconstruction: {}", baseScan); + + // Reconstruct the complete RelNode tree from push-down operations + pushedDownTree = pushDownContext.reconstructPushedDownRelNodeTree(baseScan); + + LOG.info("Reconstructed pushed-down RelNode tree:\n{}", pushedDownTree.explain()); + } + } catch (Exception e) { + LOG.error("Failed to reconstruct pushed-down RelNode tree", e); + throw new RuntimeException("Failed to reconstruct pushed-down RelNode tree", e); + } + + // Pass pushedDownTree to OpenSearchRequest via RequestBuilder + final RelNode finalPushedDownTree = pushedDownTree; + return new AbstractEnumerable<>() { @Override public Enumerator enumerator() { OpenSearchRequestBuilder requestBuilder = getOrCreateRequestBuilder(); + // Set the RelNode tree on the request builder + if (finalPushedDownTree != null) { + requestBuilder.setPushedDownRelNodeTree(finalPushedDownTree); + } return new OpenSearchIndexEnumerator( osIndex.getClient(), getFieldPath(), diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java index a6ff44719d9..2f509b93b3c 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java @@ -157,7 +157,8 @@ public AbstractRelNode pushDownFilter(Filter filter) { queryExpression.getAnalyzedNodes(), getCluster().getRexBuilder()) : filter.getCondition()), (OSRequestBuilderAction) - requestBuilder -> requestBuilder.pushDownFilter(queryExpression.builder())); + requestBuilder -> requestBuilder.pushDownFilter(queryExpression.builder()), + filter); // Store the Filter RelNode for Substrait conversion // If the query expression is partial, we need to replace the input of the filter with the // partial pushed scan and the filter condition with non-pushed-down conditions. @@ -268,7 +269,17 @@ public CalciteLogicalIndexScan pushDownProject(List selectedColumns) { (OSRequestBuilderAction) requestBuilder -> requestBuilder.pushDownProjectStream(projectedFields.stream()); } - newScan.pushDownContext.add(PushDownType.PROJECT, newSchema.getFieldNames(), action); + // Create a Project RelNode for Substrait conversion + org.apache.calcite.rex.RexBuilder rexBuilder = getCluster().getRexBuilder(); + List projects = new java.util.ArrayList<>(); + for (int columnIndex : selectedColumns) { + projects.add(rexBuilder.makeInputRef(this, columnIndex)); + } + org.apache.calcite.rel.core.Project projectRelNode = + org.apache.calcite.rel.logical.LogicalProject.create( + this, java.util.Collections.emptyList(), projects, newSchema); + + newScan.pushDownContext.add(PushDownType.PROJECT, newSchema.getFieldNames(), action, projectRelNode); return newScan; } @@ -383,7 +394,7 @@ public AbstractRelNode pushDownAggregate(Aggregate aggregate, Project project) { aggregationBuilder, extendedTypeMapping, outputFields.subList(0, aggregate.getGroupSet().cardinality())); - newScan.pushDownContext.add(PushDownType.AGGREGATION, aggregate, action); + newScan.pushDownContext.add(PushDownType.AGGREGATION, aggregate, action, aggregate); // Store the Aggregate RelNode return newScan; } catch (Exception e) { LOG.info("Cannot pushdown the aggregate {}", aggregate, e); @@ -414,7 +425,8 @@ public AbstractRelNode pushDownLimit(LogicalSort sort, Integer limit, Integer of newScan.pushDownContext.add( PushDownType.LIMIT, new LimitDigest(limit, offset), - (OSRequestBuilderAction) requestBuilder -> requestBuilder.pushDownLimit(limit, offset)); + (OSRequestBuilderAction) requestBuilder -> requestBuilder.pushDownLimit(limit, offset), + sort); // Store the Sort RelNode return newScan; } } catch (Exception e) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java index 1b50a2a8751..936a29aa970 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java @@ -123,6 +123,10 @@ public boolean add(PushDownOperation operation) { public void add(PushDownType type, Object digest, AbstractAction action) { add(new PushDownOperation(type, digest, action)); } + + public void add(PushDownType type, Object digest, AbstractAction action, org.apache.calcite.rel.RelNode relNode) { + add(new PushDownOperation(type, digest, action, relNode)); + } public boolean containsDigest(Object digest) { return this.stream().anyMatch(action -> action.digest().equals(digest)); @@ -136,4 +140,149 @@ public OpenSearchRequestBuilder createRequestBuilder() { } return newRequestBuilder; } + + /** + * Reconstruct the complete pushed-down RelNode tree from stored operations. + * Builds: Scan → Filter → Project → Aggregate → Limit + * + * @param baseScan The base CalciteLogicalIndexScan to start from + * @return The complete RelNode tree with all push-downs applied + */ + public org.apache.calcite.rel.RelNode reconstructPushedDownRelNodeTree( + org.apache.calcite.rel.RelNode baseScan) { + org.apache.calcite.rel.RelNode current = baseScan; + + int step = 0; + for (PushDownOperation operation : this) { + org.apache.calcite.rel.RelNode storedRelNode = operation.relNode(); + if (storedRelNode != null) { + org.apache.calcite.rel.RelNode before = current; + current = replaceInput(storedRelNode, current); + System.out.println(String.format(" Step %d: Applied %s", step, operation.type())); + System.out.println(String.format(" Before: %s", before)); + System.out.println(String.format(" After: %s", current)); + step++; + } + } + + return current; + } + + /** + * Replace the input of a RelNode with a new input. + * Creates a new RelNode instance with the updated input and properly derived row type. + * Uses RelBuilder to ensure proper row type derivation. + */ + private org.apache.calcite.rel.RelNode replaceInput( + org.apache.calcite.rel.RelNode relNode, + org.apache.calcite.rel.RelNode newInput) { + + // Create a RelBuilder for proper row type derivation + org.apache.calcite.tools.FrameworkConfig config = + org.apache.calcite.tools.Frameworks.newConfigBuilder() + .typeSystem(org.opensearch.sql.executor.OpenSearchTypeSystem.INSTANCE) + .build(); + java.sql.Connection connection = + org.opensearch.sql.calcite.utils.CalciteToolsHelper.connect( + config, + org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY); + org.apache.calcite.tools.RelBuilder builder = + org.opensearch.sql.calcite.utils.CalciteToolsHelper.create( + config, + org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY, + connection); + + builder.push(newInput); + + if (relNode instanceof org.apache.calcite.rel.logical.LogicalFilter) { + org.apache.calcite.rel.logical.LogicalFilter filter = + (org.apache.calcite.rel.logical.LogicalFilter) relNode; + // Use RelBuilder.filter() to properly derive row type + builder.filter(filter.getCondition()); + return builder.build(); + } + + if (relNode instanceof org.apache.calcite.rel.logical.LogicalAggregate) { + org.apache.calcite.rel.logical.LogicalAggregate agg = + (org.apache.calcite.rel.logical.LogicalAggregate) relNode; + + // Convert AggregateCall list to RelBuilder.AggCall list + java.util.List aggCalls = + new java.util.ArrayList<>(); + for (org.apache.calcite.rel.core.AggregateCall aggCall : agg.getAggCallList()) { + aggCalls.add( + builder.aggregateCall( + aggCall.getAggregation(), + aggCall.getArgList().stream() + .map(builder::field) + .collect(java.util.stream.Collectors.toList())) + .distinct(aggCall.isDistinct()) + .as(aggCall.getName())); + } + + // Use RelBuilder.aggregate() to properly derive row type + builder.aggregate( + builder.groupKey(agg.getGroupSet(), agg.getGroupSets()), + aggCalls); + return builder.build(); + } + + if (relNode instanceof org.apache.calcite.rel.logical.LogicalProject) { + org.apache.calcite.rel.logical.LogicalProject proj = + (org.apache.calcite.rel.logical.LogicalProject) relNode; + // Use RelBuilder.project() to properly derive row type + builder.project(proj.getProjects(), proj.getRowType().getFieldNames()); + return builder.build(); + } + + if (relNode instanceof org.apache.calcite.rel.logical.LogicalSort) { + org.apache.calcite.rel.logical.LogicalSort sort = + (org.apache.calcite.rel.logical.LogicalSort) relNode; + // Use RelBuilder.sort() to properly derive row type + builder.sortLimit( + sort.offset != null ? ((org.apache.calcite.rex.RexLiteral) sort.offset).getValueAs(Integer.class) : -1, + sort.fetch != null ? ((org.apache.calcite.rex.RexLiteral) sort.fetch).getValueAs(Integer.class) : -1, + builder.fields(sort.getCollation())); + return builder.build(); + } + + // Fallback code commented out - we only handle Logical* types with RelBuilder + // If we encounter other types, throw an exception to make it explicit + /* + // Fallback to copy() for other RelNode types + if (relNode instanceof org.apache.calcite.rel.core.Filter) { + org.apache.calcite.rel.core.Filter filter = (org.apache.calcite.rel.core.Filter) relNode; + return filter.copy(filter.getTraitSet(), newInput, filter.getCondition()); + } + + if (relNode instanceof org.apache.calcite.rel.core.Aggregate) { + org.apache.calcite.rel.core.Aggregate agg = (org.apache.calcite.rel.core.Aggregate) relNode; + return agg.copy( + agg.getTraitSet(), + newInput, + agg.getGroupSet(), + agg.getGroupSets(), + agg.getAggCallList()); + } + + if (relNode instanceof org.apache.calcite.rel.core.Project) { + org.apache.calcite.rel.core.Project proj = (org.apache.calcite.rel.core.Project) relNode; + return proj.copy(proj.getTraitSet(), newInput, proj.getProjects(), proj.getRowType()); + } + + if (relNode instanceof org.apache.calcite.rel.core.Sort) { + org.apache.calcite.rel.core.Sort sort = (org.apache.calcite.rel.core.Sort) relNode; + return sort.copy( + sort.getTraitSet(), + newInput, + sort.getCollation(), + sort.offset, + sort.fetch); + } + */ + + // If we don't know how to handle this RelNode type, throw an exception + throw new UnsupportedOperationException( + "Unsupported RelNode type for reconstruction: " + relNode.getClass().getName()); + } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownOperation.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownOperation.java index c5779564369..7f6da2ef22a 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownOperation.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownOperation.java @@ -5,14 +5,26 @@ package org.opensearch.sql.opensearch.storage.scan.context; +import org.apache.calcite.rel.RelNode; + /** * Represents a push down operation that can be applied to an OpenSearchRequestBuilder. * * @param type PushDownType enum * @param digest the digest of the pushed down operator * @param action the lambda action to apply on the OpenSearchRequestBuilder + * @param relNode the RelNode for this push-down operation (used for Substrait conversion) */ -public record PushDownOperation(PushDownType type, Object digest, AbstractAction action) { +public record PushDownOperation( + PushDownType type, Object digest, AbstractAction action, RelNode relNode) { + + /** + * Constructor for backward compatibility (without RelNode) + */ + public PushDownOperation(PushDownType type, Object digest, AbstractAction action) { + this(type, digest, action, null); + } + public String toString() { return type + "->" + digest; } From b604409a5825d3d220412580441a586b62d4d8a3 Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Wed, 3 Dec 2025 22:50:01 -0800 Subject: [PATCH 2/7] Adding the missing Project operator after the filter Signed-off-by: Vinay Krishna Pudyodu --- .../storage/scan/CalciteLogicalIndexScan.java | 14 ++++++++++++++ .../storage/scan/context/PushDownContext.java | 7 ++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java index 2f509b93b3c..da3aee87920 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java @@ -394,6 +394,20 @@ public AbstractRelNode pushDownAggregate(Aggregate aggregate, Project project) { aggregationBuilder, extendedTypeMapping, outputFields.subList(0, aggregate.getGroupSet().cardinality())); + + // Store the input Project node BEFORE the Aggregate + // This Project comes between the Aggregate and the Scan in the pattern: Agg → Project → Scan + // It's critical for correct field index mapping in the Substrait conversion + // IMPORTANT: Use OSRequestBuilderAction (not AggregationBuilderAction) so it's added to + // operationsForRequestBuilder and comes BEFORE the aggregate in iteration order + if (project != null) { + LOG.info("Project to add: {}", project); + // Create a no-op OSRequestBuilderAction (not AggregationBuilderAction!) + // This ensures the Project is added to operationsForRequestBuilder, not operationsForAgg + OSRequestBuilderAction projectAction = requestBuilder -> {}; + newScan.pushDownContext.add(PushDownType.PROJECT, project.getRowType().getFieldNames(), projectAction, project); + } + newScan.pushDownContext.add(PushDownType.AGGREGATION, aggregate, action, aggregate); // Store the Aggregate RelNode return newScan; } catch (Exception e) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java index 936a29aa970..c2ce4a316d4 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java @@ -152,8 +152,13 @@ public org.apache.calcite.rel.RelNode reconstructPushedDownRelNodeTree( org.apache.calcite.rel.RelNode baseScan) { org.apache.calcite.rel.RelNode current = baseScan; + // Convert to list for lookahead + java.util.List operations = new java.util.ArrayList<>(); + this.forEach(operations::add); + int step = 0; - for (PushDownOperation operation : this) { + for (int i = 0; i < operations.size(); i++) { + PushDownOperation operation = operations.get(i); org.apache.calcite.rel.RelNode storedRelNode = operation.relNode(); if (storedRelNode != null) { org.apache.calcite.rel.RelNode before = current; From f7eb400fb1a84b2a4ec3be22940a4ad2643ff50f Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Thu, 4 Dec 2025 10:30:51 -0800 Subject: [PATCH 3/7] Small refactors in PushDownContext and logs Signed-off-by: Vinay Krishna Pudyodu --- .../scan/CalciteEnumerableIndexScan.java | 14 +- .../storage/scan/CalciteLogicalIndexScan.java | 21 ++- .../storage/scan/context/PushDownContext.java | 157 +++++++----------- 3 files changed, 74 insertions(+), 118 deletions(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java index e6af8a3cebc..7a55ff15658 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java @@ -30,7 +30,6 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.opensearch.sql.calcite.plan.OpenSearchRules; import org.opensearch.sql.calcite.plan.Scannable; -import org.opensearch.sql.calcite.utils.CalciteToolsHelper; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; import org.opensearch.sql.opensearch.storage.scan.context.PushDownContext; @@ -115,7 +114,6 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) { RelNode pushedDownTree = null; try { if (pushDownContext != null && !pushDownContext.isEmpty()) { - // Log all stored RelNodes in PushDownContext LOG.info("=== PushDownContext contains {} operations ===", pushDownContext.size()); int index = 0; for (var operation : pushDownContext) { @@ -126,16 +124,8 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) { } // Create a base CalciteLogicalIndexScan for reconstruction - CalciteLogicalIndexScan baseScan = new CalciteLogicalIndexScan( - getCluster(), - getTable(), - osIndex); - - LOG.info("Base scan for reconstruction: {}", baseScan); - - // Reconstruct the complete RelNode tree from push-down operations - pushedDownTree = pushDownContext.reconstructPushedDownRelNodeTree(baseScan); - + CalciteLogicalIndexScan logicalIndexScan = new CalciteLogicalIndexScan(getCluster(), getTable(), osIndex); + pushedDownTree = pushDownContext.reconstructPushedDownRelNodeTree(logicalIndexScan); LOG.info("Reconstructed pushed-down RelNode tree:\n{}", pushedDownTree.explain()); } } catch (Exception e) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java index da3aee87920..b7c67b233e6 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java @@ -396,19 +396,18 @@ public AbstractRelNode pushDownAggregate(Aggregate aggregate, Project project) { outputFields.subList(0, aggregate.getGroupSet().cardinality())); // Store the input Project node BEFORE the Aggregate - // This Project comes between the Aggregate and the Scan in the pattern: Agg → Project → Scan - // It's critical for correct field index mapping in the Substrait conversion - // IMPORTANT: Use OSRequestBuilderAction (not AggregationBuilderAction) so it's added to - // operationsForRequestBuilder and comes BEFORE the aggregate in iteration order + // This Project comes between the Aggregate and the Filter in the pattern: Agg → Project → Filter if (project != null) { - LOG.info("Project to add: {}", project); - // Create a no-op OSRequestBuilderAction (not AggregationBuilderAction!) - // This ensures the Project is added to operationsForRequestBuilder, not operationsForAgg - OSRequestBuilderAction projectAction = requestBuilder -> {}; - newScan.pushDownContext.add(PushDownType.PROJECT, project.getRowType().getFieldNames(), projectAction, project); + LOG.info("Project to add: {}", project); + // Create a no-op OSRequestBuilderAction (not AggregationBuilderAction!) + // This ensures the Project is added to operationsForRequestBuilder, not operationsForAgg + // no-op since we don't need to modify the OpenSearch query, we only need RelNode for Substrait conversion + // We're using OSRequestBuilderAction for its routing behavior, It ensures the operation goes to + // operationsForRequestBuilder (not operationsForAgg) + OSRequestBuilderAction projectAction = requestBuilder -> {}; + newScan.pushDownContext.add(PushDownType.PROJECT, project.getRowType().getFieldNames(), projectAction, project); } - - newScan.pushDownContext.add(PushDownType.AGGREGATION, aggregate, action, aggregate); // Store the Aggregate RelNode + newScan.pushDownContext.add(PushDownType.AGGREGATION, aggregate, action, aggregate); return newScan; } catch (Exception e) { LOG.info("Cannot pushdown the aggregate {}", aggregate, e); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java index c2ce4a316d4..1212e2ae392 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java @@ -6,18 +6,42 @@ package org.opensearch.sql.opensearch.storage.scan.context; import com.google.common.collect.Iterators; + +import java.sql.Connection; import java.util.AbstractCollection; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + import lombok.Getter; +import org.apache.calcite.rel.RelNode; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.logical.LogicalSort; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.RelBuilder; import org.jetbrains.annotations.NotNull; +import org.opensearch.sql.calcite.utils.CalciteToolsHelper; +import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; +import org.opensearch.sql.executor.OpenSearchTypeSystem; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; /** Push down context is used to store all the push down operations that are applied to the query */ @Getter public class PushDownContext extends AbstractCollection { + private static final Logger LOGGER = LogManager.getLogger(PushDownContext.class); + private final OpenSearchIndex osIndex; private final OpenSearchRequestBuilder requestBuilder; private ArrayDeque operationsForRequestBuilder; @@ -124,7 +148,7 @@ public void add(PushDownType type, Object digest, AbstractAction action) { add(new PushDownOperation(type, digest, action)); } - public void add(PushDownType type, Object digest, AbstractAction action, org.apache.calcite.rel.RelNode relNode) { + public void add(PushDownType type, Object digest, AbstractAction action, RelNode relNode) { add(new PushDownOperation(type, digest, action, relNode)); } @@ -145,32 +169,23 @@ public OpenSearchRequestBuilder createRequestBuilder() { * Reconstruct the complete pushed-down RelNode tree from stored operations. * Builds: Scan → Filter → Project → Aggregate → Limit * - * @param baseScan The base CalciteLogicalIndexScan to start from + * @param logicalIndexScan The base CalciteLogicalIndexScan to start from * @return The complete RelNode tree with all push-downs applied */ - public org.apache.calcite.rel.RelNode reconstructPushedDownRelNodeTree( - org.apache.calcite.rel.RelNode baseScan) { - org.apache.calcite.rel.RelNode current = baseScan; - - // Convert to list for lookahead - java.util.List operations = new java.util.ArrayList<>(); - this.forEach(operations::add); - - int step = 0; - for (int i = 0; i < operations.size(); i++) { - PushDownOperation operation = operations.get(i); - org.apache.calcite.rel.RelNode storedRelNode = operation.relNode(); - if (storedRelNode != null) { - org.apache.calcite.rel.RelNode before = current; - current = replaceInput(storedRelNode, current); - System.out.println(String.format(" Step %d: Applied %s", step, operation.type())); - System.out.println(String.format(" Before: %s", before)); - System.out.println(String.format(" After: %s", current)); - step++; + public RelNode reconstructPushedDownRelNodeTree(RelNode logicalIndexScan) { + RelNode current = logicalIndexScan; + List pushDownOperations = new ArrayList<>(this); + + for (PushDownOperation pushDownOperation : pushDownOperations) { + RelNode storedRelNode = pushDownOperation.relNode(); + if (storedRelNode != null) { + LOGGER.info("RelNode: {}", storedRelNode); + RelNode before = current; + current = replaceInput(storedRelNode, current); + LOGGER.info("{} being added as input to {}", before, current); + } } - } - - return current; + return current; } /** @@ -178,49 +193,40 @@ public org.apache.calcite.rel.RelNode reconstructPushedDownRelNodeTree( * Creates a new RelNode instance with the updated input and properly derived row type. * Uses RelBuilder to ensure proper row type derivation. */ - private org.apache.calcite.rel.RelNode replaceInput( - org.apache.calcite.rel.RelNode relNode, - org.apache.calcite.rel.RelNode newInput) { + private RelNode replaceInput(RelNode relNode, RelNode newInput) { // Create a RelBuilder for proper row type derivation - org.apache.calcite.tools.FrameworkConfig config = - org.apache.calcite.tools.Frameworks.newConfigBuilder() - .typeSystem(org.opensearch.sql.executor.OpenSearchTypeSystem.INSTANCE) + FrameworkConfig config = + Frameworks.newConfigBuilder() + .typeSystem(OpenSearchTypeSystem.INSTANCE) .build(); - java.sql.Connection connection = - org.opensearch.sql.calcite.utils.CalciteToolsHelper.connect( - config, - org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY); - org.apache.calcite.tools.RelBuilder builder = - org.opensearch.sql.calcite.utils.CalciteToolsHelper.create( + Connection connection = CalciteToolsHelper.connect(config, OpenSearchTypeFactory.TYPE_FACTORY); + RelBuilder builder = + CalciteToolsHelper.create( config, - org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY, + OpenSearchTypeFactory.TYPE_FACTORY, connection); builder.push(newInput); - if (relNode instanceof org.apache.calcite.rel.logical.LogicalFilter) { - org.apache.calcite.rel.logical.LogicalFilter filter = - (org.apache.calcite.rel.logical.LogicalFilter) relNode; - // Use RelBuilder.filter() to properly derive row type + if (relNode instanceof LogicalFilter filter) { + // Use RelBuilder.filter() to properly derive row type builder.filter(filter.getCondition()); return builder.build(); } - if (relNode instanceof org.apache.calcite.rel.logical.LogicalAggregate) { - org.apache.calcite.rel.logical.LogicalAggregate agg = - (org.apache.calcite.rel.logical.LogicalAggregate) relNode; - - // Convert AggregateCall list to RelBuilder.AggCall list - java.util.List aggCalls = + if (relNode instanceof LogicalAggregate agg) { + + // Convert AggregateCall list to RelBuilder.AggCall list + List aggCalls = new java.util.ArrayList<>(); - for (org.apache.calcite.rel.core.AggregateCall aggCall : agg.getAggCallList()) { + for (AggregateCall aggCall : agg.getAggCallList()) { aggCalls.add( builder.aggregateCall( aggCall.getAggregation(), aggCall.getArgList().stream() .map(builder::field) - .collect(java.util.stream.Collectors.toList())) + .collect(Collectors.toList())) .distinct(aggCall.isDistinct()) .as(aggCall.getName())); } @@ -232,60 +238,21 @@ private org.apache.calcite.rel.RelNode replaceInput( return builder.build(); } - if (relNode instanceof org.apache.calcite.rel.logical.LogicalProject) { - org.apache.calcite.rel.logical.LogicalProject proj = - (org.apache.calcite.rel.logical.LogicalProject) relNode; - // Use RelBuilder.project() to properly derive row type + if (relNode instanceof LogicalProject proj) { + // Use RelBuilder.project() to properly derive row type builder.project(proj.getProjects(), proj.getRowType().getFieldNames()); return builder.build(); } - if (relNode instanceof org.apache.calcite.rel.logical.LogicalSort) { - org.apache.calcite.rel.logical.LogicalSort sort = - (org.apache.calcite.rel.logical.LogicalSort) relNode; - // Use RelBuilder.sort() to properly derive row type + if (relNode instanceof LogicalSort sort) { + // Use RelBuilder.sort() to properly derive row type builder.sortLimit( - sort.offset != null ? ((org.apache.calcite.rex.RexLiteral) sort.offset).getValueAs(Integer.class) : -1, - sort.fetch != null ? ((org.apache.calcite.rex.RexLiteral) sort.fetch).getValueAs(Integer.class) : -1, + sort.offset != null ? Objects.requireNonNull(((RexLiteral) sort.offset).getValueAs(Integer.class)) : -1, + sort.fetch != null ? Objects.requireNonNull(((RexLiteral) sort.fetch).getValueAs(Integer.class)) : -1, builder.fields(sort.getCollation())); return builder.build(); } - - // Fallback code commented out - we only handle Logical* types with RelBuilder - // If we encounter other types, throw an exception to make it explicit - /* - // Fallback to copy() for other RelNode types - if (relNode instanceof org.apache.calcite.rel.core.Filter) { - org.apache.calcite.rel.core.Filter filter = (org.apache.calcite.rel.core.Filter) relNode; - return filter.copy(filter.getTraitSet(), newInput, filter.getCondition()); - } - - if (relNode instanceof org.apache.calcite.rel.core.Aggregate) { - org.apache.calcite.rel.core.Aggregate agg = (org.apache.calcite.rel.core.Aggregate) relNode; - return agg.copy( - agg.getTraitSet(), - newInput, - agg.getGroupSet(), - agg.getGroupSets(), - agg.getAggCallList()); - } - - if (relNode instanceof org.apache.calcite.rel.core.Project) { - org.apache.calcite.rel.core.Project proj = (org.apache.calcite.rel.core.Project) relNode; - return proj.copy(proj.getTraitSet(), newInput, proj.getProjects(), proj.getRowType()); - } - - if (relNode instanceof org.apache.calcite.rel.core.Sort) { - org.apache.calcite.rel.core.Sort sort = (org.apache.calcite.rel.core.Sort) relNode; - return sort.copy( - sort.getTraitSet(), - newInput, - sort.getCollation(), - sort.offset, - sort.fetch); - } - */ - + // If we don't know how to handle this RelNode type, throw an exception throw new UnsupportedOperationException( "Unsupported RelNode type for reconstruction: " + relNode.getClass().getName()); From 87c40efe0d8aef02d7f927e855385edda6846ba4 Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Thu, 4 Dec 2025 11:32:25 -0800 Subject: [PATCH 4/7] Added full relNode log Signed-off-by: Vinay Krishna Pudyodu --- .../sql/opensearch/request/OpenSearchQueryRequest.java | 2 +- .../opensearch/storage/scan/CalciteEnumerableIndexScan.java | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java index 1dd093ec918..b8eed983e92 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java @@ -422,10 +422,10 @@ public static byte[] convertToSubstraitAndSerialize(RelNode relNode) { if (relNode == null) { LOGGER.info("RelNode is null, retrieving from ThreadLocal (CalciteToolsHelper.OpenSearchRelRunners)"); relNode = CalciteToolsHelper.OpenSearchRelRunners.getCurrentRelNode(); - CalciteToolsHelper.OpenSearchRelRunners.clearCurrentRelNode(); } else { LOGGER.info("RelNode provided directly from pushedDownRelNodeTree"); } + CalciteToolsHelper.OpenSearchRelRunners.clearCurrentRelNode(); LOGGER.info("Calcite Logical Plan before Conversion\n {}", RelOptUtil.toString(relNode)); // Preprocess the Calcite plan diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java index 7a55ff15658..a320f6bf2d0 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java @@ -20,6 +20,7 @@ import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.hint.RelHint; @@ -30,6 +31,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.opensearch.sql.calcite.plan.OpenSearchRules; import org.opensearch.sql.calcite.plan.Scannable; +import org.opensearch.sql.calcite.utils.CalciteToolsHelper; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; import org.opensearch.sql.opensearch.storage.scan.context.PushDownContext; @@ -114,6 +116,7 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) { RelNode pushedDownTree = null; try { if (pushDownContext != null && !pushDownContext.isEmpty()) { + LOG.info("Full RelNode tree:\n{}", RelOptUtil.toString(CalciteToolsHelper.OpenSearchRelRunners.getCurrentRelNode())); LOG.info("=== PushDownContext contains {} operations ===", pushDownContext.size()); int index = 0; for (var operation : pushDownContext) { From 83fd1d32f7c8a4bcc8a8db39a1c5ce1b0fbcf09b Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Thu, 4 Dec 2025 11:33:04 -0800 Subject: [PATCH 5/7] Removed unwanted comments Signed-off-by: Vinay Krishna Pudyodu --- .../sql/opensearch/request/OpenSearchQueryRequest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java index b8eed983e92..f1d62df28a0 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java @@ -417,8 +417,6 @@ public void writeTo(StreamOutput out) throws IOException { } public static byte[] convertToSubstraitAndSerialize(RelNode relNode) { - // If relNode is null, get it from ThreadLocal (fallback for non-Calcite queries) -// relNode = null; if (relNode == null) { LOGGER.info("RelNode is null, retrieving from ThreadLocal (CalciteToolsHelper.OpenSearchRelRunners)"); relNode = CalciteToolsHelper.OpenSearchRelRunners.getCurrentRelNode(); From dc6f5a473cb25cfad9bb07f2fa36de58c506a4c1 Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Thu, 4 Dec 2025 13:27:22 -0800 Subject: [PATCH 6/7] Use Direct RelNode.copy() Instead of RelBuilder Signed-off-by: Vinay Krishna Pudyodu --- .../storage/scan/context/PushDownContext.java | 88 ++++++------------- 1 file changed, 26 insertions(+), 62 deletions(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java index 1212e2ae392..b18bdacfa9c 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java @@ -7,33 +7,22 @@ import com.google.common.collect.Iterators; -import java.sql.Connection; import java.util.AbstractCollection; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; import lombok.Getter; import org.apache.calcite.rel.RelNode; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalSort; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.tools.FrameworkConfig; -import org.apache.calcite.tools.Frameworks; -import org.apache.calcite.tools.RelBuilder; import org.jetbrains.annotations.NotNull; -import org.opensearch.sql.calcite.utils.CalciteToolsHelper; -import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; -import org.opensearch.sql.executor.OpenSearchTypeSystem; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; @@ -176,10 +165,9 @@ public RelNode reconstructPushedDownRelNodeTree(RelNode logicalIndexScan) { RelNode current = logicalIndexScan; List pushDownOperations = new ArrayList<>(this); - for (PushDownOperation pushDownOperation : pushDownOperations) { - RelNode storedRelNode = pushDownOperation.relNode(); + for (int i = 0; i < pushDownOperations.size(); i++) { + RelNode storedRelNode = pushDownOperations.get(i).relNode(); if (storedRelNode != null) { - LOGGER.info("RelNode: {}", storedRelNode); RelNode before = current; current = replaceInput(storedRelNode, current); LOGGER.info("{} being added as input to {}", before, current); @@ -190,67 +178,43 @@ public RelNode reconstructPushedDownRelNodeTree(RelNode logicalIndexScan) { /** * Replace the input of a RelNode with a new input. - * Creates a new RelNode instance with the updated input and properly derived row type. - * Uses RelBuilder to ensure proper row type derivation. + * Creates a new RelNode instance with the updated input while preserving the exact structure. + * Uses direct copy() methods to avoid Calcite optimizations that change the condition structure. */ private RelNode replaceInput(RelNode relNode, RelNode newInput) { - // Create a RelBuilder for proper row type derivation - FrameworkConfig config = - Frameworks.newConfigBuilder() - .typeSystem(OpenSearchTypeSystem.INSTANCE) - .build(); - Connection connection = CalciteToolsHelper.connect(config, OpenSearchTypeFactory.TYPE_FACTORY); - RelBuilder builder = - CalciteToolsHelper.create( - config, - OpenSearchTypeFactory.TYPE_FACTORY, - connection); - - builder.push(newInput); - if (relNode instanceof LogicalFilter filter) { - // Use RelBuilder.filter() to properly derive row type - builder.filter(filter.getCondition()); - return builder.build(); + // Use direct copy() to preserve exact filter condition (avoid SEARCH optimization) + return filter.copy(filter.getTraitSet(), newInput, filter.getCondition()); } if (relNode instanceof LogicalAggregate agg) { - - // Convert AggregateCall list to RelBuilder.AggCall list - List aggCalls = - new java.util.ArrayList<>(); - for (AggregateCall aggCall : agg.getAggCallList()) { - aggCalls.add( - builder.aggregateCall( - aggCall.getAggregation(), - aggCall.getArgList().stream() - .map(builder::field) - .collect(Collectors.toList())) - .distinct(aggCall.isDistinct()) - .as(aggCall.getName())); - } - - // Use RelBuilder.aggregate() to properly derive row type - builder.aggregate( - builder.groupKey(agg.getGroupSet(), agg.getGroupSets()), - aggCalls); - return builder.build(); + // Use direct copy() to preserve exact aggregate structure + return agg.copy( + agg.getTraitSet(), + newInput, + agg.getGroupSet(), + agg.getGroupSets(), + agg.getAggCallList()); } if (relNode instanceof LogicalProject proj) { - // Use RelBuilder.project() to properly derive row type - builder.project(proj.getProjects(), proj.getRowType().getFieldNames()); - return builder.build(); + // Use direct copy() to preserve exact project expressions + return proj.copy( + proj.getTraitSet(), + newInput, + proj.getProjects(), + proj.getRowType()); } if (relNode instanceof LogicalSort sort) { - // Use RelBuilder.sort() to properly derive row type - builder.sortLimit( - sort.offset != null ? Objects.requireNonNull(((RexLiteral) sort.offset).getValueAs(Integer.class)) : -1, - sort.fetch != null ? Objects.requireNonNull(((RexLiteral) sort.fetch).getValueAs(Integer.class)) : -1, - builder.fields(sort.getCollation())); - return builder.build(); + // Use direct copy() to preserve exact sort collation + return sort.copy( + sort.getTraitSet(), + newInput, + sort.getCollation(), + sort.offset, + sort.fetch); } // If we don't know how to handle this RelNode type, throw an exception From cc5b1ed1bc5578b093a1fc5e30b70e68107f3ff0 Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Thu, 4 Dec 2025 15:14:17 -0800 Subject: [PATCH 7/7] Disabled CoreRules.FILTER_REDUCE_EXPRESSIONS this is done to disable the calcite optimization on timerange queries ie calcite optimizes `>=($0, TIMESTAMP('2013-07-01 00:00:00')) AND <=($0, TIMESTAMP('2013-07-31 00:00:00'))` to `SEARCH($0, Sarg[['2013-07-01 00:00:00'..'2013-07-31 00:00:00']])` Signed-off-by: Vinay Krishna Pudyodu --- .../storage/scan/CalciteEnumerableIndexScan.java | 4 ++++ .../opensearch/storage/scan/CalciteLogicalIndexScan.java | 9 +++++++++ 2 files changed, 13 insertions(+) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java index a320f6bf2d0..6988890919b 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java @@ -82,6 +82,10 @@ public void register(RelOptPlanner planner) { // remove this rule otherwise opensearch can't correctly interpret approx_count_distinct() // it is converted to cardinality aggregation in OpenSearch planner.removeRule(CoreRules.AGGREGATE_EXPAND_DISTINCT_AGGREGATES); + + // Remove FILTER_REDUCE_EXPRESSIONS rule to prevent conversion of range comparisons to SEARCH + // This is needed for Substrait compatibility which doesn't support SEARCH operations + planner.removeRule(CoreRules.FILTER_REDUCE_EXPRESSIONS); } @Override diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java index b7c67b233e6..2d91be66a7a 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java @@ -18,6 +18,7 @@ import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.AbstractRelNode; +import org.apache.calcite.rel.rules.CoreRules; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelFieldCollation; @@ -133,6 +134,10 @@ public void register(RelOptPlanner planner) { } else { planner.addRule(OpenSearchIndexRules.RELEVANCE_FUNCTION_PUSHDOWN); } + + // Remove FILTER_REDUCE_EXPRESSIONS rule to prevent conversion of range comparisons to SEARCH + // This is needed for Substrait compatibility which doesn't support SEARCH operations + planner.removeRule(CoreRules.FILTER_REDUCE_EXPRESSIONS); } public AbstractRelNode pushDownFilter(Filter filter) { @@ -148,6 +153,10 @@ public AbstractRelNode pushDownFilter(Filter filter) { filter.getCondition(), schema, fieldTypes, rowType, getCluster()); // TODO: handle the case where condition contains a score function CalciteLogicalIndexScan newScan = this.copy(); + + // Log the filter condition being stored to check if SEARCH optimization already happened + LOG.info("Filter condition being stored: {}", filter.getCondition()); + newScan.pushDownContext.add( queryExpression.getScriptCount() > 0 ? PushDownType.SCRIPT : PushDownType.FILTER, new FilterDigest(