Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ public class OpenSearchQueryRequest implements OpenSearchRequest {

private SearchResponse searchResponse = null;

/** RelNode tree for pushed-down operations (optional). */
@EqualsAndHashCode.Exclude @ToString.Exclude
private final org.apache.calcite.rel.RelNode pushedDownRelNodeTree;

private static final Logger LOGGER =
LogManager.getLogger(OpenSearchQueryRequest.class);

Expand All @@ -170,6 +174,7 @@ public OpenSearchQueryRequest(
sourceBuilder.timeout(DEFAULT_QUERY_TIMEOUT);
this.exprValueFactory = factory;
this.includes = includes;
this.pushedDownRelNodeTree = null;
}

/** Constructor of OpenSearchQueryRequest. */
Expand All @@ -178,10 +183,21 @@ public OpenSearchQueryRequest(
SearchSourceBuilder sourceBuilder,
OpenSearchExprValueFactory factory,
List<String> includes) {
this(indexName, sourceBuilder, factory, includes, (RelNode) null);
}

/** Constructor of OpenSearchQueryRequest with RelNode tree support. */
public OpenSearchQueryRequest(
IndexName indexName,
SearchSourceBuilder sourceBuilder,
OpenSearchExprValueFactory factory,
List<String> includes,
RelNode pushedDownRelNodeTree) {
this.indexName = indexName;
this.sourceBuilder = sourceBuilder;
this.exprValueFactory = factory;
this.includes = includes;
this.pushedDownRelNodeTree = pushedDownRelNodeTree;
}

/** Constructor of OpenSearchQueryRequest with PIT support. */
Expand All @@ -192,12 +208,25 @@ public OpenSearchQueryRequest(
List<String> includes,
TimeValue cursorKeepAlive,
String pitId) {
this(indexName, sourceBuilder, factory, includes, cursorKeepAlive, pitId, null);
}

/** Constructor of OpenSearchQueryRequest with PIT and RelNode tree support. */
public OpenSearchQueryRequest(
IndexName indexName,
SearchSourceBuilder sourceBuilder,
OpenSearchExprValueFactory factory,
List<String> includes,
TimeValue cursorKeepAlive,
String pitId,
RelNode pushedDownRelNodeTree) {
this.indexName = indexName;
this.sourceBuilder = sourceBuilder;
this.exprValueFactory = factory;
this.includes = includes;
this.cursorKeepAlive = cursorKeepAlive;
this.pitId = pitId;
this.pushedDownRelNodeTree = pushedDownRelNodeTree;
}

/** true if the request is a count aggregation request. */
Expand Down Expand Up @@ -243,6 +272,10 @@ public OpenSearchQueryRequest(StreamInput in, OpenSearchStorageEngine engine) th
exprValueFactory =
new OpenSearchExprValueFactory(
index.getFieldOpenSearchTypes(), index.isFieldTypeTolerance());

// RelNode tree is not serialized/deserialized for now
// It is only used during the initial query execution
this.pushedDownRelNodeTree = null;
}

@Override
Expand All @@ -258,7 +291,8 @@ public OpenSearchResponse search(
// get the value before set searchDone = true
boolean isCountAggRequest = isCountAggRequest();
searchDone = true;
sourceBuilder.queryPlanIR(convertToSubstraitAndSerialize(exprValueFactory));
// Convert pushed-down RelNode tree to Substrait bytes
sourceBuilder.queryPlanIR(convertToSubstraitAndSerialize(pushedDownRelNodeTree));
return new OpenSearchResponse(
searchAction.apply(
new SearchRequest().indices(indexName.getIndexNames()).source(sourceBuilder)),
Expand All @@ -280,7 +314,8 @@ public OpenSearchResponse searchWithPIT(Function<SearchRequest, SearchResponse>
SearchHits.empty(), exprValueFactory, includes, isCountAggRequest());
} else {
this.sourceBuilder.pointInTimeBuilder(new PointInTimeBuilder(this.pitId));
sourceBuilder.queryPlanIR(convertToSubstraitAndSerialize(exprValueFactory));
// Convert pushed-down RelNode tree to Substrait bytes
sourceBuilder.queryPlanIR(convertToSubstraitAndSerialize(pushedDownRelNodeTree));
this.sourceBuilder.timeout(cursorKeepAlive);
// check for search after
if (searchAfter != null) {
Expand Down Expand Up @@ -381,8 +416,13 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

public static byte[] convertToSubstraitAndSerialize(OpenSearchExprValueFactory index) {
RelNode relNode = CalciteToolsHelper.OpenSearchRelRunners.getCurrentRelNode();
public static byte[] convertToSubstraitAndSerialize(RelNode relNode) {
if (relNode == null) {
LOGGER.info("RelNode is null, retrieving from ThreadLocal (CalciteToolsHelper.OpenSearchRelRunners)");
relNode = CalciteToolsHelper.OpenSearchRelRunners.getCurrentRelNode();
} else {
LOGGER.info("RelNode provided directly from pushedDownRelNodeTree");
}
CalciteToolsHelper.OpenSearchRelRunners.clearCurrentRelNode();
LOGGER.info("Calcite Logical Plan before Conversion\n {}", RelOptUtil.toString(relNode));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.stream.Stream;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.calcite.rel.RelNode;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.lucene.search.join.ScoreMode;
import org.opensearch.action.search.CreatePitRequest;
Expand Down Expand Up @@ -73,6 +75,17 @@ public class OpenSearchRequestBuilder {

@ToString.Exclude private final Settings settings;

/** RelNode tree for pushed-down operations (optional).
* -- SETTER --
* Set the RelNode tree for pushed-down operations.
* This allows passing the reconstructed RelNode tree to be converted to Substrait later.
*
* @param pushedDownRelNodeTree RelNode tree representing pushed-down operations
*/
@Setter
@EqualsAndHashCode.Exclude @ToString.Exclude
private org.apache.calcite.rel.RelNode pushedDownRelNodeTree = null;

public static class PushDownUnSupportedException extends RuntimeException {
public PushDownUnSupportedException(String message) {
super(message);
Expand All @@ -92,7 +105,8 @@ public OpenSearchRequestBuilder(
this.exprValueFactory = exprValueFactory;
}

/**

/**
* Build DSL request.
*
* @return query request with PIT or scroll request
Expand All @@ -112,7 +126,8 @@ public OpenSearchRequest build(
* 2. If mapping is empty. It means no data in the index. PIT search relies on `_id` fields to do sort, thus it will fail if using PIT search in this case.
*/
if (sourceBuilder.size() == 0 || isMappingEmpty) {
return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory, List.of());
return new OpenSearchQueryRequest(
indexName, sourceBuilder, exprValueFactory, List.of(), pushedDownRelNodeTree);
}
return buildRequestWithPit(indexName, cursorKeepAlive, client);
}
Expand All @@ -129,12 +144,13 @@ private OpenSearchRequest buildRequestWithPit(
// Search with PIT request
String pitId = createPit(indexName, cursorKeepAlive, client);
return new OpenSearchQueryRequest(
indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, pitId);
indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, pitId, pushedDownRelNodeTree);
} else {
sourceBuilder.from(startFrom);
sourceBuilder.size(size);
// Search with non-Pit request
return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory, includes);
return new OpenSearchQueryRequest(
indexName, sourceBuilder, exprValueFactory, includes, pushedDownRelNodeTree);
}
} else {
if (startFrom != 0) {
Expand All @@ -144,7 +160,7 @@ private OpenSearchRequest buildRequestWithPit(
// Search with PIT request
String pitId = createPit(indexName, cursorKeepAlive, client);
return new OpenSearchQueryRequest(
indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, pitId);
indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, pitId, pushedDownRelNodeTree);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.rules.CoreRules;
import org.apache.calcite.rel.type.RelDataType;
Expand All @@ -29,6 +31,7 @@
import org.checkerframework.checker.nullness.qual.Nullable;
import org.opensearch.sql.calcite.plan.OpenSearchRules;
import org.opensearch.sql.calcite.plan.Scannable;
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder;
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;
import org.opensearch.sql.opensearch.storage.scan.context.PushDownContext;
Expand Down Expand Up @@ -79,6 +82,10 @@ public void register(RelOptPlanner planner) {
// remove this rule otherwise opensearch can't correctly interpret approx_count_distinct()
// it is converted to cardinality aggregation in OpenSearch
planner.removeRule(CoreRules.AGGREGATE_EXPAND_DISTINCT_AGGREGATES);

// Remove FILTER_REDUCE_EXPRESSIONS rule to prevent conversion of range comparisons to SEARCH
// This is needed for Substrait compatibility which doesn't support SEARCH operations
planner.removeRule(CoreRules.FILTER_REDUCE_EXPRESSIONS);
}

@Override
Expand Down Expand Up @@ -109,10 +116,41 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
*/
@Override
public Enumerable<@Nullable Object> scan() {
// Reconstruct pushed-down RelNode tree
RelNode pushedDownTree = null;
try {
if (pushDownContext != null && !pushDownContext.isEmpty()) {
LOG.info("Full RelNode tree:\n{}", RelOptUtil.toString(CalciteToolsHelper.OpenSearchRelRunners.getCurrentRelNode()));
LOG.info("=== PushDownContext contains {} operations ===", pushDownContext.size());
int index = 0;
for (var operation : pushDownContext) {
LOG.info(" Operation {}: type={}, relNode={}",
index++,
operation.type(),
operation.relNode() != null ? operation.relNode().toString() : "NULL");
}

// Create a base CalciteLogicalIndexScan for reconstruction
CalciteLogicalIndexScan logicalIndexScan = new CalciteLogicalIndexScan(getCluster(), getTable(), osIndex);
pushedDownTree = pushDownContext.reconstructPushedDownRelNodeTree(logicalIndexScan);
LOG.info("Reconstructed pushed-down RelNode tree:\n{}", pushedDownTree.explain());
}
} catch (Exception e) {
LOG.error("Failed to reconstruct pushed-down RelNode tree", e);
throw new RuntimeException("Failed to reconstruct pushed-down RelNode tree", e);
}

// Pass pushedDownTree to OpenSearchRequest via RequestBuilder
final RelNode finalPushedDownTree = pushedDownTree;

return new AbstractEnumerable<>() {
@Override
public Enumerator<Object> enumerator() {
OpenSearchRequestBuilder requestBuilder = getOrCreateRequestBuilder();
// Set the RelNode tree on the request builder
if (finalPushedDownTree != null) {
requestBuilder.setPushedDownRelNodeTree(finalPushedDownTree);
}
return new OpenSearchIndexEnumerator(
osIndex.getClient(),
getFieldPath(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.AbstractRelNode;
import org.apache.calcite.rel.rules.CoreRules;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelFieldCollation;
Expand Down Expand Up @@ -133,6 +134,10 @@ public void register(RelOptPlanner planner) {
} else {
planner.addRule(OpenSearchIndexRules.RELEVANCE_FUNCTION_PUSHDOWN);
}

// Remove FILTER_REDUCE_EXPRESSIONS rule to prevent conversion of range comparisons to SEARCH
// This is needed for Substrait compatibility which doesn't support SEARCH operations
planner.removeRule(CoreRules.FILTER_REDUCE_EXPRESSIONS);
}

public AbstractRelNode pushDownFilter(Filter filter) {
Expand All @@ -148,6 +153,10 @@ public AbstractRelNode pushDownFilter(Filter filter) {
filter.getCondition(), schema, fieldTypes, rowType, getCluster());
// TODO: handle the case where condition contains a score function
CalciteLogicalIndexScan newScan = this.copy();

// Log the filter condition being stored to check if SEARCH optimization already happened
LOG.info("Filter condition being stored: {}", filter.getCondition());

newScan.pushDownContext.add(
queryExpression.getScriptCount() > 0 ? PushDownType.SCRIPT : PushDownType.FILTER,
new FilterDigest(
Expand All @@ -157,7 +166,8 @@ public AbstractRelNode pushDownFilter(Filter filter) {
queryExpression.getAnalyzedNodes(), getCluster().getRexBuilder())
: filter.getCondition()),
(OSRequestBuilderAction)
requestBuilder -> requestBuilder.pushDownFilter(queryExpression.builder()));
requestBuilder -> requestBuilder.pushDownFilter(queryExpression.builder()),
filter); // Store the Filter RelNode for Substrait conversion

// If the query expression is partial, we need to replace the input of the filter with the
// partial pushed scan and the filter condition with non-pushed-down conditions.
Expand Down Expand Up @@ -268,7 +278,17 @@ public CalciteLogicalIndexScan pushDownProject(List<Integer> selectedColumns) {
(OSRequestBuilderAction)
requestBuilder -> requestBuilder.pushDownProjectStream(projectedFields.stream());
}
newScan.pushDownContext.add(PushDownType.PROJECT, newSchema.getFieldNames(), action);
// Create a Project RelNode for Substrait conversion
org.apache.calcite.rex.RexBuilder rexBuilder = getCluster().getRexBuilder();
List<org.apache.calcite.rex.RexNode> projects = new java.util.ArrayList<>();
for (int columnIndex : selectedColumns) {
projects.add(rexBuilder.makeInputRef(this, columnIndex));
}
org.apache.calcite.rel.core.Project projectRelNode =
org.apache.calcite.rel.logical.LogicalProject.create(
this, java.util.Collections.emptyList(), projects, newSchema);

newScan.pushDownContext.add(PushDownType.PROJECT, newSchema.getFieldNames(), action, projectRelNode);
return newScan;
}

Expand Down Expand Up @@ -383,7 +403,20 @@ public AbstractRelNode pushDownAggregate(Aggregate aggregate, Project project) {
aggregationBuilder,
extendedTypeMapping,
outputFields.subList(0, aggregate.getGroupSet().cardinality()));
newScan.pushDownContext.add(PushDownType.AGGREGATION, aggregate, action);

// Store the input Project node BEFORE the Aggregate
// This Project comes between the Aggregate and the Filter in the pattern: Agg → Project → Filter
if (project != null) {
LOG.info("Project to add: {}", project);
// Create a no-op OSRequestBuilderAction (not AggregationBuilderAction!)
// This ensures the Project is added to operationsForRequestBuilder, not operationsForAgg
// no-op since we don't need to modify the OpenSearch query, we only need RelNode for Substrait conversion
// We're using OSRequestBuilderAction for its routing behavior, It ensures the operation goes to
// operationsForRequestBuilder (not operationsForAgg)
OSRequestBuilderAction projectAction = requestBuilder -> {};
newScan.pushDownContext.add(PushDownType.PROJECT, project.getRowType().getFieldNames(), projectAction, project);
}
newScan.pushDownContext.add(PushDownType.AGGREGATION, aggregate, action, aggregate);
return newScan;
} catch (Exception e) {
LOG.info("Cannot pushdown the aggregate {}", aggregate, e);
Expand Down Expand Up @@ -414,7 +447,8 @@ public AbstractRelNode pushDownLimit(LogicalSort sort, Integer limit, Integer of
newScan.pushDownContext.add(
PushDownType.LIMIT,
new LimitDigest(limit, offset),
(OSRequestBuilderAction) requestBuilder -> requestBuilder.pushDownLimit(limit, offset));
(OSRequestBuilderAction) requestBuilder -> requestBuilder.pushDownLimit(limit, offset),
sort); // Store the Sort RelNode
return newScan;
}
} catch (Exception e) {
Expand Down
Loading
Loading