Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1810,7 +1810,6 @@ private RelNode buildStreamWindowJoinPlan(
context.relBuilder.push(leftWithHelpers);
context.relBuilder.variable(v::set);

context.relBuilder.push(leftWithHelpers);
RexNode rightSeq = context.relBuilder.field(seqCol);
RexNode outerSeq = context.relBuilder.field(v.get(), seqCol);

Expand All @@ -1829,8 +1828,7 @@ private RelNode buildStreamWindowJoinPlan(
context.relBuilder.filter(filter);

// aggregate all window functions on right side
List<AggCall> aggCalls = buildAggCallsForWindowFunctions(node.getWindowFunctionList(), context);
context.relBuilder.aggregate(context.relBuilder.groupKey(), aggCalls);
Comment on lines -1832 to -1833
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuancu @songkant-aws we'd better not to call context.relBuilder.aggregate directly, instead, use aggregateWithTrimming to build aggregation in stack. I see some codes in visitChart, rankByColumnSplit and visitPatterns. Can you create a issue to replace?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or co-auther with @ishaoxy to address them in this PR.

Copy link
Collaborator

@yuancu yuancu Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did so because I needed to build multiple aggregations for the chart command. aggregateWithTrimming works on PPL's AST (composed of UnresolvedPlan) and creates RexNode AST, but I have already gone through this in the first aggregation. Therefore, I had to call relBuilder.aggregate the second time I created an aggregation to aggregate on RexNode.

aggregateWithTrimming(List.of(), node.getWindowFunctionList(), context, false);
RelNode rightAgg = context.relBuilder.build();

// correlate LEFT with RIGHT using seq + group fields
Expand Down Expand Up @@ -2016,27 +2014,6 @@ private String extractGroupFieldName(UnresolvedExpression groupExpr) {
}
}

private List<AggCall> buildAggCallsForWindowFunctions(
List<UnresolvedExpression> windowExprs, CalcitePlanContext context) {
List<AggCall> aggCalls = new ArrayList<>();
for (UnresolvedExpression expr : windowExprs) {
if (expr instanceof Alias a && a.getDelegated() instanceof WindowFunction wf) {
Function func = (Function) wf.getFunction();
List<UnresolvedExpression> args = func.getFuncArgs();
// first argument is the input field, others are function params
UnresolvedExpression field = args.isEmpty() ? null : args.get(0);
List<UnresolvedExpression> rest =
args.size() <= 1 ? List.of() : args.subList(1, args.size());
AggregateFunction aggFunc = new AggregateFunction(func.getFuncName(), field, rest);
AggCall call = aggVisitor.analyze(new Alias(a.getName(), aggFunc), context);
aggCalls.add(call);
} else {
throw new IllegalArgumentException("Unsupported window function in streamstats");
}
}
return aggCalls;
}

private List<RexNode> buildRequiredLeft(
CalcitePlanContext context, String seqCol, List<UnresolvedExpression> groupList) {
List<RexNode> requiredLeft = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ calcite:
LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalAggregate(group=[{}], avg_age=[AVG($8)])
LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalAggregate(group=[{}], avg_age=[AVG($0)])
LogicalProject(age=[$8])
LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18])
EnumerableLimit(fetch=[10000])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ calcite:
LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalAggregate(group=[{}], avg_age=[AVG($8)])
LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalAggregate(group=[{}], avg_age=[AVG($0)])
LogicalProject(age=[$8])
LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16])
EnumerableLimit(fetch=[10000])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ calcite:
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalAggregate(group=[{}], avg_age=[AVG($8)])
LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalAggregate(group=[{}], avg_age=[AVG($0)])
LogicalProject(age=[$8])
LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18])
EnumerableLimit(fetch=[10000])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ calcite:
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalAggregate(group=[{}], avg_age=[AVG($8)])
LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), =($4, $cor0.gender))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalAggregate(group=[{}], avg_age=[AVG($0)])
LogicalProject(age=[$8])
LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), =($4, $cor0.gender))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16])
EnumerableLimit(fetch=[10000])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ calcite:
LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalAggregate(group=[{}], avg_age=[AVG($8)])
LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalAggregate(group=[{}], avg_age=[AVG($0)])
LogicalProject(age=[$8])
LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18])
EnumerableLimit(fetch=[10000])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ calcite:
LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalAggregate(group=[{}], avg_age=[AVG($8)])
LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalAggregate(group=[{}], avg_age=[AVG($0)])
LogicalProject(age=[$8])
LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16])
EnumerableLimit(fetch=[10000])
Expand Down
Loading
Loading