Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.ast.tree.Window;

/** AST nodes visitor Defines the traverse path. */
public abstract class AbstractNodeVisitor<T, C> {
Expand Down Expand Up @@ -347,6 +348,10 @@ public T visitPatterns(Patterns patterns, C context) {
return visitChildren(patterns, context);
}

public T visitWindow(Window window, C context) {
return visitChildren(window, context);
}

public T visitJoin(Join node, C context) {
return visitChildren(node, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.expression;

import lombok.Getter;

public abstract class WindowBound {
private WindowBound() {}

@Getter
public static class OffSetWindowBound extends WindowBound {
private final long offset;
private final boolean isPreceding;

OffSetWindowBound(long offset, boolean isPreceding) {
this.offset = offset;
this.isPreceding = isPreceding;
}

public boolean isPreceding() {
return isPreceding;
}
}

public static class CurrentRowWindowBound extends WindowBound {
CurrentRowWindowBound() {}

@Override
public String toString() {
return "CURRENT ROW";
}
}

public static class UnboundedWindowBound extends WindowBound {
private final boolean isPreceding;

UnboundedWindowBound(boolean isPreceding) {
this.isPreceding = isPreceding;
}

public boolean isPreceding() {
return isPreceding;
}

@Override
public boolean equals(Object o) {
return this == o
|| o instanceof UnboundedWindowBound
&& isPreceding == ((UnboundedWindowBound) o).isPreceding;
}

@Override
public String toString() {
return isPreceding ? "UNBOUNDED PRECEDING" : "UNBOUNDED FOLLOWING";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.expression;

import java.util.Locale;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;

@EqualsAndHashCode(callSuper = false)
@Getter
@RequiredArgsConstructor
@ToString
public class WindowFrame extends UnresolvedExpression {
private final FrameType type;
private final WindowBound lower;
private final WindowBound upper;

public enum FrameType {
RANGE,
ROWS
}

public static WindowFrame defaultFrame() {
return new WindowFrame(
FrameType.ROWS, createBound("UNBOUNDED PRECEDING"), createBound("UNBOUNDED FOLLOWING"));
}

public static WindowFrame create(FrameType type, Literal lower, Literal upper) {
WindowBound lowerBound = null;
WindowBound upperBound = null;
if (lower != null) {
if (lower.getType() == DataType.STRING) {
lowerBound = createBound(lower.getValue().toString());
} else {
throw new IllegalArgumentException(
String.format("Unsupported bound type: %s", lower.getType()));
}
}
if (upper != null) {
if (upper.getType() == DataType.STRING) {
upperBound = createBound(upper.getValue().toString());
} else {
throw new IllegalArgumentException(
String.format("Unsupported bound type: %s", upper.getType()));
}
}
return new WindowFrame(type, lowerBound, upperBound);
}

private static WindowBound createBound(String boundType) {
boundType = boundType.trim().toUpperCase(Locale.ROOT);
if ("CURRENT ROW".equals(boundType)) {
return new WindowBound.CurrentRowWindowBound();
} else if ("UNBOUNDED PRECEDING".equals(boundType)) {
return new WindowBound.UnboundedWindowBound(true);
} else if ("UNBOUNDED FOLLOWING".equals(boundType)) {
return new WindowBound.UnboundedWindowBound(false);
} else if (boundType.endsWith(" PRECEDING")) {
long number = Long.parseLong(boundType.split(" PRECEDING")[0]);
return new WindowBound.OffSetWindowBound(number, true);
} else if (boundType.endsWith(" FOLLOWING")) {
long number = Long.parseLong(boundType.split(" FOLLOWING")[0]);
return new WindowBound.OffSetWindowBound(number, false);
} else {
throw new IllegalArgumentException(String.format("Unsupported bound type: %s", boundType));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,41 @@

package org.opensearch.sql.ast.expression;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;
import org.opensearch.sql.ast.tree.Sort.SortOption;

@AllArgsConstructor
@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Getter
@RequiredArgsConstructor
@ToString
public class WindowFunction extends UnresolvedExpression {

private final UnresolvedExpression function;
private List<UnresolvedExpression> partitionByList;
private List<Pair<SortOption, UnresolvedExpression>> sortList;
@Setter private List<UnresolvedExpression> partitionByList = new ArrayList<>();
@Setter private List<Pair<SortOption, UnresolvedExpression>> sortList = new ArrayList<>();
@Setter private WindowFrame windowFrame = WindowFrame.defaultFrame();

public WindowFunction(
UnresolvedExpression function,
List<UnresolvedExpression> partitionByList,
List<Pair<SortOption, UnresolvedExpression>> sortList) {
this.function = function;
this.partitionByList = partitionByList;
this.sortList = sortList;
}

@Override
public List<? extends Node> getChild() {
ImmutableList.Builder<UnresolvedExpression> children = ImmutableList.builder();
children.add(function);
children.addAll(partitionByList);
sortList.forEach(pair -> children.add(pair.getRight()));
return children.build();
return List.of(function);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
public class Patterns extends UnresolvedPlan {

private final UnresolvedExpression windowFunction;

private UnresolvedPlan child;

@Override
Expand Down
41 changes: 41 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Window.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
public class Window extends UnresolvedPlan {

private final List<UnresolvedExpression> windowFunctionList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to add window boundary? For example, I want to execute a ppl equal to a sql like
select AVG(SUM(total_amount)) OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS moving_avg_3day (this is one of the moving avg, which we plan to support for t2visbuilder).

Copy link
Member Author

@LantaoJin LantaoJin Apr 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's easy to support it. but not this PR of eventstats command. We can extend this syntax after PPL lang unified. e.g support boundary when we implement streamstats command.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a WindowFrame for further using.

private UnresolvedPlan child;

@Override
public Window attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitWindow(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import org.opensearch.sql.ast.expression.AggregateFunction;
import org.opensearch.sql.ast.expression.Alias;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.calcite.utils.AggregateUtils;
import org.opensearch.sql.calcite.utils.PlanUtils;
import org.opensearch.sql.expression.function.BuiltinFunctionName;

public class CalciteAggCallVisitor extends AbstractNodeVisitor<AggCall, CalcitePlanContext> {
private final CalciteRexNodeVisitor rexNodeVisitor;
Expand All @@ -41,6 +42,14 @@ public AggCall visitAggregateFunction(AggregateFunction node, CalcitePlanContext
for (UnresolvedExpression arg : node.getArgList()) {
argList.add(rexNodeVisitor.analyze(arg, context));
}
return AggregateUtils.translate(node, field, context, argList);
return BuiltinFunctionName.ofAggregation(node.getFuncName())
.map(
functionName -> {
return PlanUtils.makeAggCall(
context, functionName, node.getDistinct(), field, argList);
})
.orElseThrow(
() ->
new UnsupportedOperationException("Unexpected aggregation: " + node.getFuncName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY;

import java.sql.Connection;
import java.util.List;
import java.util.Optional;
import java.util.Stack;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -43,6 +44,7 @@ public class CalcitePlanContext {
@Getter @Setter private boolean isProjectVisited = false;

private final Stack<RexCorrelVariable> correlVar = new Stack<>();
private final Stack<List<RexNode>> windowPartitions = new Stack<>();

private CalcitePlanContext(FrameworkConfig config, QueryType queryType) {
this.config = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
import org.opensearch.sql.calcite.utils.JoinAndLookupUtils;
import org.opensearch.sql.exception.CalciteUnsupportedException;
Expand Down Expand Up @@ -629,6 +630,15 @@ public RelNode visitDedupe(Dedupe node, CalcitePlanContext context) {
return context.relBuilder.peek();
}

@Override
public RelNode visitWindow(Window node, CalcitePlanContext context) {
visitChildren(node, context);
List<RexNode> overExpressions =
node.getWindowFunctionList().stream().map(w -> rexVisitor.analyze(w, context)).toList();
context.relBuilder.projectPlus(overExpressions);
return context.relBuilder.peek();
}

/*
* Unsupported Commands of PPL with Calcite for OpenSearch 3.0.0-beta
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@

package org.opensearch.sql.calcite;

import static java.util.Objects.requireNonNull;
import static org.apache.calcite.sql.SqlKind.AS;
import static org.opensearch.sql.ast.expression.SpanUnit.NONE;
import static org.opensearch.sql.ast.expression.SpanUnit.UNKNOWN;
import static org.opensearch.sql.calcite.utils.BuiltinFunctionUtils.VARCHAR_FORCE_NULLABLE;
import static org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils.TransferUserDefinedFunction;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.SqlOperator;
Expand Down Expand Up @@ -49,6 +53,7 @@
import org.opensearch.sql.ast.expression.SpanUnit;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.ast.expression.When;
import org.opensearch.sql.ast.expression.WindowFunction;
import org.opensearch.sql.ast.expression.Xor;
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
import org.opensearch.sql.ast.expression.subquery.InSubquery;
Expand Down Expand Up @@ -368,6 +373,43 @@ public RexNode visitFunction(Function node, CalcitePlanContext context) {
return context.rexBuilder.makeCall(returnType, operator, translatedArguments);
}

@Override
public RexNode visitWindowFunction(WindowFunction node, CalcitePlanContext context) {
Function windowFunction = (Function) node.getFunction();
List<RexNode> arguments =
windowFunction.getFuncArgs().stream().map(arg -> analyze(arg, context)).toList();
List<RexNode> partitions =
node.getPartitionByList().stream()
.map(arg -> analyze(arg, context))
.map(this::extractRexNodeFromAlias)
.toList();
return BuiltinFunctionName.ofWindowFunction(windowFunction.getFuncName())
.map(
functionName -> {
RexNode field = arguments.isEmpty() ? null : arguments.getFirst();
List<RexNode> args =
(arguments.isEmpty() || arguments.size() == 1)
? Collections.emptyList()
: arguments.subList(1, arguments.size());
return PlanUtils.makeOver(
context, functionName, field, args, partitions, node.getWindowFrame());
})
.orElseThrow(
() ->
new UnsupportedOperationException(
"Unexpected window function: " + windowFunction.getFuncName()));
}

/** extract the expression of Alias from a node */
private RexNode extractRexNodeFromAlias(RexNode node) {
requireNonNull(node);
if (node.getKind() == AS) {
return ((RexCall) node).getOperands().get(0);
} else {
return node;
}
}

@Override
public RexNode visitInSubquery(InSubquery node, CalcitePlanContext context) {
List<RexNode> nodes = node.getChild().stream().map(child -> analyze(child, context)).toList();
Expand Down
Loading
Loading