Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@
}
],
"datarows": [
[
1,
"https://example.com/page?ref=google"
],
[
1,
"https://www.google.com/search?q=test"
]
],
"total": 1,
"size": 1
}
"total": 2,
"size": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@
}
],
"datarows": [
[
1,
-1,
2,
0,
"",
"https://google.com/maps"
],
[
1,
-1,
Expand Down Expand Up @@ -59,6 +67,6 @@
"https://example.com/page?ref=google"
]
],
"total": 4,
"size": 4
}
"total": 5,
"size": 5
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@
}
],
"datarows": [
[
1,
1024,
768
],
[
1,
1280,
1024
]
],
"total": 1,
"size": 1
}
"total": 2,
"size": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
}
],
"datarows": [
[
1,
"2013-07-02 16:00:00"
],
[
1,
"2013-07-05 11:00:00"
Expand All @@ -27,6 +31,6 @@
"2013-07-15 10:00:00"
]
],
"total": 4,
"size": 4
"total": 5,
"size": 5
}
2 changes: 1 addition & 1 deletion integ-test/src/test/resources/clickbench/queries/q39.ppl
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ source=hits
| where CounterID = 62 and EventDate >= '2013-07-01 00:00:00' and EventDate <= '2013-07-31 00:00:00' and IsRefresh = 0 and IsLink != 0 and IsDownload = 0
| stats bucket_nullable=false count() as PageViews by URL
| sort - PageViews
| head 10 from 1
| head 1010
2 changes: 1 addition & 1 deletion integ-test/src/test/resources/clickbench/queries/q40.ppl
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ source=hits
| eval Src=case(SearchEngineID = 0 and AdvEngineID = 0, Referer else ''), Dst=URL
| stats /*bucket_nullable=false*/ count() as PageViews by TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst
| sort - PageViews
| head 10 from 1
| head 1010
2 changes: 1 addition & 1 deletion integ-test/src/test/resources/clickbench/queries/q41.ppl
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ source=hits
| where CounterID = 62 and EventDate >= '2013-07-01 00:00:00' and EventDate <= '2013-07-31 00:00:00' and IsRefresh = 0 and TraficSourceID in (-1, 6) and RefererHash = 3594120000172545465
| stats bucket_nullable=false count() as PageViews by URLHash, EventDate
| sort - PageViews
| head 10 from 100
| head 110
2 changes: 1 addition & 1 deletion integ-test/src/test/resources/clickbench/queries/q42.ppl
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ source=hits
| where CounterID = 62 and EventDate >= '2013-07-01 00:00:00' and EventDate <= '2013-07-31 00:00:00' and IsRefresh = 0 and DontCountHits = 0 and URLHash = 2868770270353813622
| stats bucket_nullable=false count() as PageViews by WindowClientWidth, WindowClientHeight
| sort - PageViews
| head 10 from 1
| head 10010
2 changes: 1 addition & 1 deletion integ-test/src/test/resources/clickbench/queries/q43.ppl
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ source=hits
| eval M = date_format(EventTime, '%Y-%m-%d %H:00:00')
| stats /*bucket_nullable=false*/ count() as PageViews by M
| sort M
| head 10 from 1
| head 1010
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public OpenSearchQueryRequest(StreamInput in, OpenSearchStorageEngine engine) th
exprValueFactory =
new OpenSearchExprValueFactory(
index.getFieldOpenSearchTypes(), index.isFieldTypeTolerance());

// RelNode tree is not serialized/deserialized for now
// It is only used during the initial query execution
this.pushedDownRelNodeTree = null;
Expand Down Expand Up @@ -418,14 +418,6 @@ public void writeTo(StreamOutput out) throws IOException {
}

public static byte[] convertToSubstraitAndSerialize(RelNode relNode) {
relNode = null; // Setting the relNode as Null so that it always uses full relNode from the ThreadLocal
if (relNode == null) {
LOGGER.info("RelNode is null, retrieving from ThreadLocal (CalciteToolsHelper.OpenSearchRelRunners)");
relNode = CalciteToolsHelper.OpenSearchRelRunners.getCurrentRelNode();
} else {
LOGGER.info("RelNode provided directly from pushedDownRelNodeTree");
}
CalciteToolsHelper.OpenSearchRelRunners.clearCurrentRelNode();
LOGGER.info("Calcite Logical Plan before Conversion\n {}", RelOptUtil.toString(relNode));

// Preprocess the Calcite plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void register(RelOptPlanner planner) {
// remove this rule otherwise opensearch can't correctly interpret approx_count_distinct()
// it is converted to cardinality aggregation in OpenSearch
planner.removeRule(CoreRules.AGGREGATE_EXPAND_DISTINCT_AGGREGATES);

// Remove FILTER_REDUCE_EXPRESSIONS rule to prevent conversion of range comparisons to SEARCH
// This is needed for Substrait compatibility which doesn't support SEARCH operations
planner.removeRule(CoreRules.FILTER_REDUCE_EXPRESSIONS);
Expand Down Expand Up @@ -118,18 +118,21 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
public Enumerable<@Nullable Object> scan() {
// Reconstruct pushed-down RelNode tree
RelNode pushedDownTree = null;
RelNode fullRelNodeTree = null;
try {
if (pushDownContext != null && !pushDownContext.isEmpty()) {
LOG.info("Full RelNode tree:\n{}", RelOptUtil.toString(CalciteToolsHelper.OpenSearchRelRunners.getCurrentRelNode()));
fullRelNodeTree = CalciteToolsHelper.OpenSearchRelRunners.getCurrentRelNode();
CalciteToolsHelper.OpenSearchRelRunners.clearCurrentRelNode();
LOG.info("Full RelNode tree:\n{}", RelOptUtil.toString(fullRelNodeTree));
LOG.info("=== PushDownContext contains {} operations ===", pushDownContext.size());
int index = 0;
for (var operation : pushDownContext) {
LOG.info(" Operation {}: type={}, relNode={}",
index++,
operation.type(),
LOG.info(" Operation {}: type={}, relNode={}",
index++,
operation.type(),
operation.relNode() != null ? operation.relNode().toString() : "NULL");
}

// Create a base CalciteLogicalIndexScan for reconstruction
CalciteLogicalIndexScan logicalIndexScan = new CalciteLogicalIndexScan(getCluster(), getTable(), osIndex);
pushedDownTree = pushDownContext.reconstructPushedDownRelNodeTree(logicalIndexScan);
Expand All @@ -139,18 +142,21 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
LOG.error("Failed to reconstruct pushed-down RelNode tree", e);
throw new RuntimeException("Failed to reconstruct pushed-down RelNode tree", e);
}

// Pass pushedDownTree to OpenSearchRequest via RequestBuilder
final RelNode finalPushedDownTree = pushedDownTree;

// If needed for testing use finalFullRelNodeTree
final RelNode finalFullRelNodeTree = fullRelNodeTree;

return new AbstractEnumerable<>() {
@Override
public Enumerator<Object> enumerator() {
OpenSearchRequestBuilder requestBuilder = getOrCreateRequestBuilder();
// Set the RelNode tree on the request builder
if (finalPushedDownTree != null) {
requestBuilder.setPushedDownRelNodeTree(finalPushedDownTree);
}
requestBuilder.setPushedDownRelNodeTree(finalFullRelNodeTree);
// if (finalPushedDownTree != null) {
// requestBuilder.setPushedDownRelNodeTree(finalPushedDownTree);
// }
return new OpenSearchIndexEnumerator(
osIndex.getClient(),
getFieldPath(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void register(RelOptPlanner planner) {
} else {
planner.addRule(OpenSearchIndexRules.RELEVANCE_FUNCTION_PUSHDOWN);
}

// Remove FILTER_REDUCE_EXPRESSIONS rule to prevent conversion of range comparisons to SEARCH
// This is needed for Substrait compatibility which doesn't support SEARCH operations
planner.removeRule(CoreRules.FILTER_REDUCE_EXPRESSIONS);
Expand All @@ -158,10 +158,10 @@ public AbstractRelNode pushDownFilter(Filter filter) {
filter.getCondition(), schema, fieldTypes, rowType, getCluster());
// TODO: handle the case where condition contains a score function
CalciteLogicalIndexScan newScan = this.copy();

// Log the filter condition being stored to check if SEARCH optimization already happened
LOG.info("Filter condition being stored: {}", filter.getCondition());

newScan.pushDownContext.add(
queryExpression.getScriptCount() > 0 ? PushDownType.SCRIPT : PushDownType.FILTER,
new FilterDigest(
Expand Down Expand Up @@ -335,7 +335,7 @@ public CalciteLogicalIndexScan pushDownSortAggregateMeasure(Sort sort) {
aggAction.rePushDownSortAggMeasure(
sort.getCollation().getFieldCollations(), rowType.getFieldNames());
Object digest = sort.getCollation().getFieldCollations();
newScan.pushDownContext.add(PushDownType.SORT_AGG_METRICS, digest, newAction, sort);
newScan.pushDownContext.add(PushDownType.SORT_AGG_METRICS, digest, newAction);
return newScan;
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -442,7 +442,7 @@ public AbstractRelNode pushDownLimit(LogicalSort sort, Integer limit, Integer of
updated
? aggAction -> aggAction.pushDownLimitIntoBucketSize(limit + offset)
: aggAction -> {};
newScan.pushDownContext.add(PushDownType.LIMIT, new LimitDigest(limit, offset), action);
newScan.pushDownContext.add(PushDownType.LIMIT, new LimitDigest(limit, offset), action, sort);
return offset > 0 ? sort.copy(sort.getTraitSet(), List.of(newScan)) : newScan;
} else {
CalciteLogicalIndexScan newScan = this.copyWithNewSchema(getRowType());
Expand Down
Loading