diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index 49e80b6096d..39401db606f 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -70,6 +70,7 @@ import org.opensearch.sql.ast.tree.TableFunction; import org.opensearch.sql.ast.tree.Trendline; import org.opensearch.sql.ast.tree.Values; +import org.opensearch.sql.ast.tree.Window; /** AST nodes visitor Defines the traverse path. */ public abstract class AbstractNodeVisitor { @@ -347,6 +348,10 @@ public T visitPatterns(Patterns patterns, C context) { return visitChildren(patterns, context); } + public T visitWindow(Window window, C context) { + return visitChildren(window, context); + } + public T visitJoin(Join node, C context) { return visitChildren(node, context); } diff --git a/core/src/main/java/org/opensearch/sql/ast/expression/WindowBound.java b/core/src/main/java/org/opensearch/sql/ast/expression/WindowBound.java new file mode 100644 index 00000000000..d4241165cbc --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/expression/WindowBound.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.expression; + +import lombok.Getter; + +public abstract class WindowBound { + private WindowBound() {} + + @Getter + public static class OffSetWindowBound extends WindowBound { + private final long offset; + private final boolean isPreceding; + + OffSetWindowBound(long offset, boolean isPreceding) { + this.offset = offset; + this.isPreceding = isPreceding; + } + + public boolean isPreceding() { + return isPreceding; + } + } + + public static class CurrentRowWindowBound extends WindowBound { + CurrentRowWindowBound() {} + + @Override + public String toString() { + return "CURRENT ROW"; + } + } + + public static class UnboundedWindowBound extends WindowBound { + private final boolean isPreceding; + + UnboundedWindowBound(boolean isPreceding) { + this.isPreceding = isPreceding; + } + + public boolean isPreceding() { + return isPreceding; + } + + @Override + public boolean equals(Object o) { + return this == o + || o instanceof UnboundedWindowBound + && isPreceding == ((UnboundedWindowBound) o).isPreceding; + } + + @Override + public String toString() { + return isPreceding ? "UNBOUNDED PRECEDING" : "UNBOUNDED FOLLOWING"; + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/ast/expression/WindowFrame.java b/core/src/main/java/org/opensearch/sql/ast/expression/WindowFrame.java new file mode 100644 index 00000000000..7d46c6a0c74 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/expression/WindowFrame.java @@ -0,0 +1,73 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.expression; + +import java.util.Locale; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +@EqualsAndHashCode(callSuper = false) +@Getter +@RequiredArgsConstructor +@ToString +public class WindowFrame extends UnresolvedExpression { + private final FrameType type; + private final WindowBound lower; + private final WindowBound upper; + + public enum FrameType { + RANGE, + ROWS + } + + public static WindowFrame defaultFrame() { + return new WindowFrame( + FrameType.ROWS, createBound("UNBOUNDED PRECEDING"), createBound("UNBOUNDED FOLLOWING")); + } + + public static WindowFrame create(FrameType type, Literal lower, Literal upper) { + WindowBound lowerBound = null; + WindowBound upperBound = null; + if (lower != null) { + if (lower.getType() == DataType.STRING) { + lowerBound = createBound(lower.getValue().toString()); + } else { + throw new IllegalArgumentException( + String.format("Unsupported bound type: %s", lower.getType())); + } + } + if (upper != null) { + if (upper.getType() == DataType.STRING) { + upperBound = createBound(upper.getValue().toString()); + } else { + throw new IllegalArgumentException( + String.format("Unsupported bound type: %s", upper.getType())); + } + } + return new WindowFrame(type, lowerBound, upperBound); + } + + private static WindowBound createBound(String boundType) { + boundType = boundType.trim().toUpperCase(Locale.ROOT); + if ("CURRENT ROW".equals(boundType)) { + return new WindowBound.CurrentRowWindowBound(); + } else if ("UNBOUNDED PRECEDING".equals(boundType)) { + return new WindowBound.UnboundedWindowBound(true); + } else if ("UNBOUNDED FOLLOWING".equals(boundType)) { + return new WindowBound.UnboundedWindowBound(false); + } else if (boundType.endsWith(" PRECEDING")) { + long number = Long.parseLong(boundType.split(" PRECEDING")[0]); + return new WindowBound.OffSetWindowBound(number, true); + } else if (boundType.endsWith(" FOLLOWING")) { + long number = Long.parseLong(boundType.split(" FOLLOWING")[0]); + return new WindowBound.OffSetWindowBound(number, false); + } else { + throw new IllegalArgumentException(String.format("Unsupported bound type: %s", boundType)); + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/ast/expression/WindowFunction.java b/core/src/main/java/org/opensearch/sql/ast/expression/WindowFunction.java index 47f52657659..e750f15564d 100644 --- a/core/src/main/java/org/opensearch/sql/ast/expression/WindowFunction.java +++ b/core/src/main/java/org/opensearch/sql/ast/expression/WindowFunction.java @@ -5,36 +5,41 @@ package org.opensearch.sql.ast.expression; -import com.google.common.collect.ImmutableList; +import java.util.ArrayList; import java.util.List; -import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.Setter; import lombok.ToString; import org.apache.commons.lang3.tuple.Pair; import org.opensearch.sql.ast.AbstractNodeVisitor; import org.opensearch.sql.ast.Node; import org.opensearch.sql.ast.tree.Sort.SortOption; -@AllArgsConstructor +@RequiredArgsConstructor @EqualsAndHashCode(callSuper = false) @Getter -@RequiredArgsConstructor @ToString public class WindowFunction extends UnresolvedExpression { private final UnresolvedExpression function; - private List partitionByList; - private List> sortList; + @Setter private List partitionByList = new ArrayList<>(); + @Setter private List> sortList = new ArrayList<>(); + @Setter private WindowFrame windowFrame = WindowFrame.defaultFrame(); + + public WindowFunction( + UnresolvedExpression function, + List partitionByList, + List> sortList) { + this.function = function; + this.partitionByList = partitionByList; + this.sortList = sortList; + } @Override public List getChild() { - ImmutableList.Builder children = ImmutableList.builder(); - children.add(function); - children.addAll(partitionByList); - sortList.forEach(pair -> children.add(pair.getRight())); - return children.build(); + return List.of(function); } @Override diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Patterns.java b/core/src/main/java/org/opensearch/sql/ast/tree/Patterns.java index a885a368f45..4e7b896fe1e 100644 --- a/core/src/main/java/org/opensearch/sql/ast/tree/Patterns.java +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Patterns.java @@ -29,7 +29,6 @@ public class Patterns extends UnresolvedPlan { private final UnresolvedExpression windowFunction; - private UnresolvedPlan child; @Override diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Window.java b/core/src/main/java/org/opensearch/sql/ast/tree/Window.java new file mode 100644 index 00000000000..e34630f62fe --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Window.java @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.expression.UnresolvedExpression; + +@Getter +@ToString +@EqualsAndHashCode(callSuper = false) +@RequiredArgsConstructor +public class Window extends UnresolvedPlan { + + private final List windowFunctionList; + private UnresolvedPlan child; + + @Override + public Window attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return ImmutableList.of(this.child); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitWindow(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteAggCallVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteAggCallVisitor.java index 0dc2ea6c116..4a4ac8ea2b8 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteAggCallVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteAggCallVisitor.java @@ -14,7 +14,8 @@ import org.opensearch.sql.ast.expression.AggregateFunction; import org.opensearch.sql.ast.expression.Alias; import org.opensearch.sql.ast.expression.UnresolvedExpression; -import org.opensearch.sql.calcite.utils.AggregateUtils; +import org.opensearch.sql.calcite.utils.PlanUtils; +import org.opensearch.sql.expression.function.BuiltinFunctionName; public class CalciteAggCallVisitor extends AbstractNodeVisitor { private final CalciteRexNodeVisitor rexNodeVisitor; @@ -41,6 +42,14 @@ public AggCall visitAggregateFunction(AggregateFunction node, CalcitePlanContext for (UnresolvedExpression arg : node.getArgList()) { argList.add(rexNodeVisitor.analyze(arg, context)); } - return AggregateUtils.translate(node, field, context, argList); + return BuiltinFunctionName.ofAggregation(node.getFuncName()) + .map( + functionName -> { + return PlanUtils.makeAggCall( + context, functionName, node.getDistinct(), field, argList); + }) + .orElseThrow( + () -> + new UnsupportedOperationException("Unexpected aggregation: " + node.getFuncName())); } } diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index 189db7d03b1..8d6f09b83f0 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -8,6 +8,7 @@ import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY; import java.sql.Connection; +import java.util.List; import java.util.Optional; import java.util.Stack; import java.util.function.BiFunction; @@ -43,6 +44,7 @@ public class CalcitePlanContext { @Getter @Setter private boolean isProjectVisited = false; private final Stack correlVar = new Stack<>(); + private final Stack> windowPartitions = new Stack<>(); private CalcitePlanContext(FrameworkConfig config, QueryType queryType) { this.config = config; diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 0b4b90ece00..f16ec86d792 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -75,6 +75,7 @@ import org.opensearch.sql.ast.tree.TableFunction; import org.opensearch.sql.ast.tree.Trendline; import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.ast.tree.Window; import org.opensearch.sql.calcite.plan.OpenSearchConstants; import org.opensearch.sql.calcite.utils.JoinAndLookupUtils; import org.opensearch.sql.exception.CalciteUnsupportedException; @@ -629,6 +630,15 @@ public RelNode visitDedupe(Dedupe node, CalcitePlanContext context) { return context.relBuilder.peek(); } + @Override + public RelNode visitWindow(Window node, CalcitePlanContext context) { + visitChildren(node, context); + List overExpressions = + node.getWindowFunctionList().stream().map(w -> rexVisitor.analyze(w, context)).toList(); + context.relBuilder.projectPlus(overExpressions); + return context.relBuilder.peek(); + } + /* * Unsupported Commands of PPL with Calcite for OpenSearch 3.0.0-beta */ diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java index 7c6c4f1724c..65c46feee0b 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java @@ -5,6 +5,8 @@ package org.opensearch.sql.calcite; +import static java.util.Objects.requireNonNull; +import static org.apache.calcite.sql.SqlKind.AS; import static org.opensearch.sql.ast.expression.SpanUnit.NONE; import static org.opensearch.sql.ast.expression.SpanUnit.UNKNOWN; import static org.opensearch.sql.calcite.utils.BuiltinFunctionUtils.VARCHAR_FORCE_NULLABLE; @@ -12,6 +14,7 @@ import java.math.BigDecimal; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; @@ -19,6 +22,7 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlIntervalQualifier; import org.apache.calcite.sql.SqlOperator; @@ -49,6 +53,7 @@ import org.opensearch.sql.ast.expression.SpanUnit; import org.opensearch.sql.ast.expression.UnresolvedExpression; import org.opensearch.sql.ast.expression.When; +import org.opensearch.sql.ast.expression.WindowFunction; import org.opensearch.sql.ast.expression.Xor; import org.opensearch.sql.ast.expression.subquery.ExistsSubquery; import org.opensearch.sql.ast.expression.subquery.InSubquery; @@ -368,6 +373,43 @@ public RexNode visitFunction(Function node, CalcitePlanContext context) { return context.rexBuilder.makeCall(returnType, operator, translatedArguments); } + @Override + public RexNode visitWindowFunction(WindowFunction node, CalcitePlanContext context) { + Function windowFunction = (Function) node.getFunction(); + List arguments = + windowFunction.getFuncArgs().stream().map(arg -> analyze(arg, context)).toList(); + List partitions = + node.getPartitionByList().stream() + .map(arg -> analyze(arg, context)) + .map(this::extractRexNodeFromAlias) + .toList(); + return BuiltinFunctionName.ofWindowFunction(windowFunction.getFuncName()) + .map( + functionName -> { + RexNode field = arguments.isEmpty() ? null : arguments.getFirst(); + List args = + (arguments.isEmpty() || arguments.size() == 1) + ? Collections.emptyList() + : arguments.subList(1, arguments.size()); + return PlanUtils.makeOver( + context, functionName, field, args, partitions, node.getWindowFrame()); + }) + .orElseThrow( + () -> + new UnsupportedOperationException( + "Unexpected window function: " + windowFunction.getFuncName())); + } + + /** extract the expression of Alias from a node */ + private RexNode extractRexNodeFromAlias(RexNode node) { + requireNonNull(node); + if (node.getKind() == AS) { + return ((RexCall) node).getOperands().get(0); + } else { + return node; + } + } + @Override public RexNode visitInSubquery(InSubquery node, CalcitePlanContext context) { List nodes = node.getChild().stream().map(child -> analyze(child, context)).toList(); diff --git a/core/src/main/java/org/opensearch/sql/calcite/udf/udaf/NullableSqlSumAggFunction.java b/core/src/main/java/org/opensearch/sql/calcite/udf/udaf/NullableSqlSumAggFunction.java deleted file mode 100644 index e250f808e6d..00000000000 --- a/core/src/main/java/org/opensearch/sql/calcite/udf/udaf/NullableSqlSumAggFunction.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -/* - * This file contains code from the Apache Calcite project (original license below). - * It contains modifications, which are licensed as above: - */ - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.opensearch.sql.calcite.udf.udaf; - -import com.google.common.collect.ImmutableList; -import java.util.List; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.sql.SqlAggFunction; -import org.apache.calcite.sql.SqlFunctionCategory; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlSplittableAggFunction; -import org.apache.calcite.sql.type.OperandTypes; -import org.apache.calcite.sql.type.ReturnTypes; -import org.apache.calcite.sql.type.SqlTypeTransforms; -import org.apache.calcite.util.Optionality; -import org.checkerframework.checker.nullness.qual.Nullable; - -public class NullableSqlSumAggFunction extends SqlAggFunction { - - // ~ Instance fields -------------------------------------------------------- - - @Deprecated // to be removed before 2.0 - private final RelDataType type; - - // ~ Constructors ----------------------------------------------------------- - - public NullableSqlSumAggFunction(RelDataType type) { - super( - "SUM", - null, - SqlKind.SUM, - ReturnTypes.AGG_SUM.andThen(SqlTypeTransforms.FORCE_NULLABLE), // modified here - null, - OperandTypes.NUMERIC, - SqlFunctionCategory.NUMERIC, - false, - false, - Optionality.FORBIDDEN); - this.type = type; - } - - // ~ Methods ---------------------------------------------------------------- - - @SuppressWarnings("deprecation") - @Override - public List getParameterTypes(RelDataTypeFactory typeFactory) { - return ImmutableList.of(type); - } - - @Deprecated // to be removed before 2.0 - public RelDataType getType() { - return type; - } - - @SuppressWarnings("deprecation") - @Override - public RelDataType getReturnType(RelDataTypeFactory typeFactory) { - return type; - } - - @Override - public @Nullable T unwrap(Class clazz) { - if (clazz.isInstance(SqlSplittableAggFunction.SumSplitter.INSTANCE)) { - return clazz.cast(SqlSplittableAggFunction.SumSplitter.INSTANCE); - } - return super.unwrap(clazz); - } - - @Override - public SqlAggFunction getRollup() { - return this; - } -} diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/AggregateUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/AggregateUtils.java deleted file mode 100644 index ce8f1cc7618..00000000000 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/AggregateUtils.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.calcite.utils; - -import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.STDDEV_POP_NULLABLE; -import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.STDDEV_SAMP_NULLABLE; -import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.VAR_POP_NULLABLE; -import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.VAR_SAMP_NULLABLE; -import static org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils.TransferUserDefinedAggFunction; - -import com.google.common.collect.ImmutableList; -import java.util.ArrayList; -import java.util.List; -import org.apache.calcite.rel.RelCollations; -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.SqlAggFunction; -import org.apache.calcite.sql.type.ReturnTypes; -import org.apache.calcite.tools.RelBuilder; -import org.opensearch.sql.ast.expression.AggregateFunction; -import org.opensearch.sql.calcite.CalcitePlanContext; -import org.opensearch.sql.calcite.udf.udaf.PercentileApproxFunction; -import org.opensearch.sql.calcite.udf.udaf.TakeAggFunction; -import org.opensearch.sql.expression.function.BuiltinFunctionName; - -public interface AggregateUtils { - - static RelBuilder.AggCall translate( - AggregateFunction agg, RexNode field, CalcitePlanContext context, List argList) { - if (BuiltinFunctionName.ofAggregation(agg.getFuncName()).isEmpty()) - throw new IllegalStateException("Unexpected value: " + agg.getFuncName()); - - // Additional aggregation function operators will be added here - BuiltinFunctionName functionName = BuiltinFunctionName.ofAggregation(agg.getFuncName()).get(); - switch (functionName) { - case MAX: - return context.relBuilder.max(field); - case MIN: - return context.relBuilder.min(field); - case AVG: - return context.relBuilder.avg(agg.getDistinct(), null, field); - case COUNT: - return context.relBuilder.count( - agg.getDistinct(), null, field == null ? ImmutableList.of() : ImmutableList.of(field)); - case SUM: - return context.relBuilder.sum(agg.getDistinct(), null, field); - // case MEAN: - // throw new UnsupportedOperationException("MEAN is not supported in PPL"); - // case STDDEV: - // return context.relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV, - // field); - case VARSAMP: - return context.relBuilder.aggregateCall(VAR_SAMP_NULLABLE, field); - case VARPOP: - return context.relBuilder.aggregateCall(VAR_POP_NULLABLE, field); - case STDDEV_POP: - return context.relBuilder.aggregateCall(STDDEV_POP_NULLABLE, field); - case STDDEV_SAMP: - return context.relBuilder.aggregateCall(STDDEV_SAMP_NULLABLE, field); - // case PERCENTILE_APPROX: - // return - // context.relBuilder.aggregateCall(SqlStdOperatorTable.PERCENTILE_CONT, field); - case TAKE: - return TransferUserDefinedAggFunction( - TakeAggFunction.class, - "TAKE", - UserDefinedFunctionUtils.getReturnTypeInferenceForArray(), - List.of(field), - argList, - context.relBuilder); - case PERCENTILE_APPROX: - List newArgList = new ArrayList<>(argList); - newArgList.add(context.rexBuilder.makeFlag(field.getType().getSqlTypeName())); - return TransferUserDefinedAggFunction( - PercentileApproxFunction.class, - "percentile_approx", - ReturnTypes.ARG0_FORCE_NULLABLE, - List.of(field), - newArgList, - context.relBuilder); - } - throw new IllegalStateException("Not Supported value: " + agg.getFuncName()); - } - - static AggregateCall aggCreate(SqlAggFunction agg, boolean isDistinct, RexNode field) { - int index = ((RexInputRef) field).getIndex(); - return AggregateCall.create( - agg, - isDistinct, - false, - false, - ImmutableList.of(), - ImmutableList.of(index), - -1, - null, - RelCollations.EMPTY, - field.getType(), - null); - } -} diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java index a6b4a2dde10..bed535ea7f8 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java @@ -27,8 +27,6 @@ package org.opensearch.sql.calcite.utils; -import static org.apache.calcite.linq4j.Nullness.castNonNull; - import com.google.common.collect.ImmutableList; import java.sql.Connection; import java.sql.PreparedStatement; @@ -73,7 +71,6 @@ import org.apache.calcite.util.Util; import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.calcite.udf.udaf.NullableSqlAvgAggFunction; -import org.opensearch.sql.calcite.udf.udaf.NullableSqlSumAggFunction; /** * Calcite Tools Helper. This class is used to create customized: 1. Connection 2. JavaTypeFactory @@ -156,21 +153,6 @@ public OpenSearchRelBuilder(Context context, RelOptCluster cluster, RelOptSchema super(context, cluster, relOptSchema); } - @Override - public AggCall sum(boolean distinct, String alias, RexNode operand) { - return aggregateCall( - SUM_NULLABLE, - distinct, - false, - false, - null, - null, - ImmutableList.of(), - alias, - ImmutableList.of(), - ImmutableList.of(operand)); - } - @Override public AggCall avg(boolean distinct, String alias, RexNode operand) { return aggregateCall( @@ -188,8 +170,6 @@ public AggCall avg(boolean distinct, String alias, RexNode operand) { } } - public static final SqlAggFunction SUM_NULLABLE = - new NullableSqlSumAggFunction(castNonNull(null)); public static final SqlAggFunction AVG_NULLABLE = new NullableSqlAvgAggFunction(SqlKind.AVG); public static final SqlAggFunction STDDEV_POP_NULLABLE = new NullableSqlAvgAggFunction(SqlKind.STDDEV_POP); diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java index 32b17c5c99e..bfab5e78788 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java @@ -5,8 +5,35 @@ package org.opensearch.sql.calcite.utils; +import static org.apache.calcite.rex.RexWindowBounds.CURRENT_ROW; +import static org.apache.calcite.rex.RexWindowBounds.UNBOUNDED_FOLLOWING; +import static org.apache.calcite.rex.RexWindowBounds.UNBOUNDED_PRECEDING; +import static org.apache.calcite.rex.RexWindowBounds.following; +import static org.apache.calcite.rex.RexWindowBounds.preceding; +import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.STDDEV_POP_NULLABLE; +import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.STDDEV_SAMP_NULLABLE; +import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.VAR_POP_NULLABLE; +import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.VAR_SAMP_NULLABLE; +import static org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils.TransferUserDefinedAggFunction; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexWindowBound; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.tools.RelBuilder; import org.opensearch.sql.ast.expression.IntervalUnit; import org.opensearch.sql.ast.expression.SpanUnit; +import org.opensearch.sql.ast.expression.WindowBound; +import org.opensearch.sql.ast.expression.WindowFrame; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.calcite.udf.udaf.PercentileApproxFunction; +import org.opensearch.sql.calcite.udf.udaf.TakeAggFunction; +import org.opensearch.sql.expression.function.BuiltinFunctionName; public interface PlanUtils { @@ -25,4 +52,211 @@ static SpanUnit intervalUnitToSpanUnit(IntervalUnit unit) { default -> throw new UnsupportedOperationException("Unsupported interval unit: " + unit); }; } + + static RexNode makeOver( + CalcitePlanContext context, + BuiltinFunctionName functionName, + RexNode field, + List argList, + List partitions, + @Nullable WindowFrame windowFrame) { + if (windowFrame == null) { + windowFrame = WindowFrame.defaultFrame(); + } + boolean rows = windowFrame.getType() == WindowFrame.FrameType.ROWS; + RexWindowBound lowerBound = convert(context, windowFrame.getLower()); + RexWindowBound upperBound = convert(context, windowFrame.getUpper()); + switch (functionName) { + // There is no "avg" AggImplementor in Calcite, we have to change avg window + // function to `sum over(...).toRex / count over(...).toRex` + case AVG: + // avg(x) ==> + // sum(x) / count(x) + return context.relBuilder.call( + SqlStdOperatorTable.DIVIDE, + sumOver(context, field, partitions, rows, lowerBound, upperBound), + context.relBuilder.cast( + countOver(context, field, partitions, rows, lowerBound, upperBound), + SqlTypeName.DOUBLE)); + // stddev_pop(x) ==> + // power((sum(x * x) - sum(x) * sum(x) / count(x)) / count(x), 0.5) + // + // stddev_samp(x) ==> + // power((sum(x * x) - sum(x) * sum(x) / count(x)) / (count(x) - 1), 0.5) + // + // var_pop(x) ==> + // (sum(x * x) - sum(x) * sum(x) / count(x)) / count(x) + // + // var_samp(x) ==> + // (sum(x * x) - sum(x) * sum(x) / count(x)) / (count(x) - 1) + case STDDEV_POP: + return variance(context, field, partitions, rows, lowerBound, upperBound, true, true); + case STDDEV_SAMP: + return variance(context, field, partitions, rows, lowerBound, upperBound, false, true); + case VARPOP: + return variance(context, field, partitions, rows, lowerBound, upperBound, true, false); + case VARSAMP: + return variance(context, field, partitions, rows, lowerBound, upperBound, false, false); + default: + return withOver( + makeAggCall(context, functionName, false, field, argList), + partitions, + rows, + lowerBound, + upperBound); + } + } + + private static RexNode sumOver( + CalcitePlanContext ctx, + RexNode operation, + List partitions, + boolean rows, + RexWindowBound lowerBound, + RexWindowBound upperBound) { + return withOver(ctx.relBuilder.sum(operation), partitions, rows, lowerBound, upperBound); + } + + private static RexNode countOver( + CalcitePlanContext ctx, + RexNode operation, + List partitions, + boolean rows, + RexWindowBound lowerBound, + RexWindowBound upperBound) { + return withOver( + ctx.relBuilder.count(ImmutableList.of(operation)), + partitions, + rows, + lowerBound, + upperBound); + } + + private static RexNode withOver( + RelBuilder.AggCall aggCall, + List partitions, + boolean rows, + RexWindowBound lowerBound, + RexWindowBound upperBound) { + return aggCall + .over() + .partitionBy(partitions) + .let( + c -> + rows + ? c.rowsBetween(lowerBound, upperBound) + : c.rangeBetween(lowerBound, upperBound)) + .toRex(); + } + + private static RexNode variance( + CalcitePlanContext ctx, + RexNode operator, + List partitions, + boolean rows, + RexWindowBound lowerBound, + RexWindowBound upperBound, + boolean biased, + boolean sqrt) { + RexNode argSquared = ctx.relBuilder.call(SqlStdOperatorTable.MULTIPLY, operator, operator); + RexNode sumArgSquared = sumOver(ctx, argSquared, partitions, rows, lowerBound, upperBound); + RexNode sum = sumOver(ctx, operator, partitions, rows, lowerBound, upperBound); + RexNode sumSquared = ctx.relBuilder.call(SqlStdOperatorTable.MULTIPLY, sum, sum); + RexNode count = countOver(ctx, operator, partitions, rows, lowerBound, upperBound); + RexNode countCast = ctx.relBuilder.cast(count, SqlTypeName.DOUBLE); + RexNode avgSumSquared = ctx.relBuilder.call(SqlStdOperatorTable.DIVIDE, sumSquared, countCast); + RexNode diff = ctx.relBuilder.call(SqlStdOperatorTable.MINUS, sumArgSquared, avgSumSquared); + RexNode denominator; + if (biased) { + denominator = countCast; + } else { + RexNode one = ctx.relBuilder.literal(1); + denominator = ctx.relBuilder.call(SqlStdOperatorTable.MINUS, countCast, one); + } + RexNode div = ctx.relBuilder.call(SqlStdOperatorTable.DIVIDE, diff, denominator); + RexNode result = div; + if (sqrt) { + RexNode half = ctx.relBuilder.literal(0.5); + result = ctx.relBuilder.call(SqlStdOperatorTable.POWER, div, half); + } + return result; + } + + static RexWindowBound convert(CalcitePlanContext context, WindowBound windowBound) { + if (windowBound instanceof WindowBound.UnboundedWindowBound unbounded) { + if (unbounded.isPreceding()) { + return UNBOUNDED_PRECEDING; + } else { + return UNBOUNDED_FOLLOWING; + } + } else if (windowBound instanceof WindowBound.CurrentRowWindowBound current) { + return CURRENT_ROW; + } else if (windowBound instanceof WindowBound.OffSetWindowBound offset) { + if (offset.isPreceding()) { + return preceding(context.relBuilder.literal(offset.getOffset())); + } else { + return following(context.relBuilder.literal(offset.getOffset())); + } + } else { + throw new UnsupportedOperationException("Unexpected window bound: " + windowBound); + } + } + + static RelBuilder.AggCall makeAggCall( + CalcitePlanContext context, + BuiltinFunctionName functionName, + boolean distinct, + RexNode field, + List argList) { + switch (functionName) { + case MAX: + return context.relBuilder.max(field); + case MIN: + return context.relBuilder.min(field); + case AVG: + return context.relBuilder.avg(distinct, null, field); + case COUNT: + return context.relBuilder.count( + distinct, null, field == null ? ImmutableList.of() : ImmutableList.of(field)); + case SUM: + return context.relBuilder.sum(distinct, null, field); + // case MEAN: + // throw new UnsupportedOperationException("MEAN is not supported in PPL"); + // case STDDEV: + // return context.relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV, + // field); + case VARSAMP: + return context.relBuilder.aggregateCall(VAR_SAMP_NULLABLE, field); + case VARPOP: + return context.relBuilder.aggregateCall(VAR_POP_NULLABLE, field); + case STDDEV_POP: + return context.relBuilder.aggregateCall(STDDEV_POP_NULLABLE, field); + case STDDEV_SAMP: + return context.relBuilder.aggregateCall(STDDEV_SAMP_NULLABLE, field); + // case PERCENTILE_APPROX: + // return + // context.relBuilder.aggregateCall(SqlStdOperatorTable.PERCENTILE_CONT, field); + case TAKE: + return TransferUserDefinedAggFunction( + TakeAggFunction.class, + "TAKE", + UserDefinedFunctionUtils.getReturnTypeInferenceForArray(), + List.of(field), + argList, + context.relBuilder); + case PERCENTILE_APPROX: + List newArgList = new ArrayList<>(argList); + newArgList.add(context.rexBuilder.makeFlag(field.getType().getSqlTypeName())); + return TransferUserDefinedAggFunction( + PercentileApproxFunction.class, + "percentile_approx", + ReturnTypes.ARG0_FORCE_NULLABLE, + List.of(field), + newArgList, + context.relBuilder); + default: + throw new UnsupportedOperationException( + "Unexpected aggregation: " + functionName.getName().getFunctionName()); + } + } } diff --git a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java index db5f88f26bf..f15b689fc74 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java @@ -307,6 +307,22 @@ public enum BuiltinFunctionName { .put("percentile_approx", BuiltinFunctionName.PERCENTILE_APPROX) .build(); + private static final Map WINDOW_FUNC_MAPPING = + new ImmutableMap.Builder() + .put("max", BuiltinFunctionName.MAX) + .put("min", BuiltinFunctionName.MIN) + .put("avg", BuiltinFunctionName.AVG) + .put("count", BuiltinFunctionName.COUNT) + .put("sum", BuiltinFunctionName.SUM) + .put("var_pop", BuiltinFunctionName.VARPOP) + .put("var_samp", BuiltinFunctionName.VARSAMP) + .put("variance", BuiltinFunctionName.VARPOP) + .put("std", BuiltinFunctionName.STDDEV_POP) + .put("stddev", BuiltinFunctionName.STDDEV_POP) + .put("stddev_pop", BuiltinFunctionName.STDDEV_POP) + .put("stddev_samp", BuiltinFunctionName.STDDEV_SAMP) + .build(); + public static Optional of(String str) { return Optional.ofNullable(ALL_NATIVE_FUNCTIONS.getOrDefault(FunctionName.of(str), null)); } @@ -315,4 +331,9 @@ public static Optional ofAggregation(String functionName) { return Optional.ofNullable( AGGREGATION_FUNC_MAPPING.getOrDefault(functionName.toLowerCase(Locale.ROOT), null)); } + + public static Optional ofWindowFunction(String functionName) { + return Optional.ofNullable( + WINDOW_FUNC_MAPPING.getOrDefault(functionName.toLowerCase(Locale.ROOT), null)); + } } diff --git a/docs/user/ppl/cmd/eventstats.rst b/docs/user/ppl/cmd/eventstats.rst new file mode 100644 index 00000000000..134b0ea7a9b --- /dev/null +++ b/docs/user/ppl/cmd/eventstats.rst @@ -0,0 +1,353 @@ +============= +evenstats +============= + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 2 + + +Description +============ +| (Experimental) +| (From 3.1.0) +| Using ``evenstats`` command to enriches your event data with calculated summary statistics. It operates by analyzing specified fields within your events, computing various statistical measures, and then appending these results as new fields to each original event. + +| Key aspects of `eventstats`: + +1. It performs calculations across the entire result set or within defined groups. +2. The original events remain intact, with new fields added to contain the statistical results. +3. The command is particularly useful for comparative analysis, identifying outliers, or providing additional context to individual events. + +| Difference between ``stats`` and ``eventstats`` +The ``stats`` and ``eventstats`` commands are both used for calculating statistics, but they have some key differences in how they operate and what they produce: + +* Output Format: + * ``stats``: Produces a summary table with only the calculated statistics. + * ``eventstats``: Adds the calculated statistics as new fields to the existing events, preserving the original data. +* Event Retention: + * ``stats``: Reduces the result set to only the statistical summary, discarding individual events. + * ``eventstats``: Retains all original events and adds new fields with the calculated statistics. +* Use Cases: + * ``stats``: Best for creating summary reports or dashboards. Often used as a final command to summarize results. + * ``eventstats``: Useful when you need to enrich events with statistical context for further analysis or filtering. Can be used mid-search to add statistics that can be used in subsequent commands. + + +Version +======= +3.1.0 + + +Syntax +====== +eventstats ... [by-clause] + + +* function: mandatory. A aggregation function or window function. + +* by-clause: optional. + + * Syntax: by [span-expression,] [field,]... + * Description: The by clause could be the fields and expressions like scalar functions and aggregation functions. Besides, the span clause can be used to split specific field into buckets in the same interval, the stats then does the aggregation by these span buckets. + * Default: If no is specified, the stats command returns only one row, which is the aggregation over the entire result set. + +* span-expression: optional, at most one. + + * Syntax: span(field_expr, interval_expr) + * Description: The unit of the interval expression is the natural unit by default. If the field is a date and time type field, and the interval is in date/time units, you will need to specify the unit in the interval expression. For example, to split the field ``age`` into buckets by 10 years, it looks like ``span(age, 10)``. And here is another example of time span, the span to split a ``timestamp`` field into hourly intervals, it looks like ``span(timestamp, 1h)``. + +* Available time unit: ++----------------------------+ +| Span Interval Units | ++============================+ +| millisecond (ms) | ++----------------------------+ +| second (s) | ++----------------------------+ +| minute (m, case sensitive) | ++----------------------------+ +| hour (h) | ++----------------------------+ +| day (d) | ++----------------------------+ +| week (w) | ++----------------------------+ +| month (M, case sensitive) | ++----------------------------+ +| quarter (q) | ++----------------------------+ +| year (y) | ++----------------------------+ + +Aggregation Functions +===================== +COUNT +----- + +Description +>>>>>>>>>>> + +Usage: Returns a count of the number of expr in the rows retrieved by a SELECT statement. + +Example:: + + PPL> source=accounts | eventstats count(); + fetched rows / total rows = 4/4 + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+---------+ + | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | count() | + |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+---------| + | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 4 | + | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 4 | + | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 4 | + | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 4 | + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+---------+ + +SUM +--- + +Description +>>>>>>>>>>> + +Usage: SUM(expr). Returns the sum of expr. + +Example:: + + PPL> source=accounts | eventstats sum(age) by gender; + fetched rows / total rows = 4/4 + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------+ + | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | sum(age) | + |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------| + | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 28 | + | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 101 | + | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 101 | + | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 101 | + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------+ + +AVG +--- + +Description +>>>>>>>>>>> + +Usage: AVG(expr). Returns the average value of expr. + +Example:: + + PPL> source=accounts | eventstats avg(age) by gender; + fetched rows / total rows = 4/4 + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+ + | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | avg(age) | + |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------| + | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 28.0 | + | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 33.666666666666664 | + | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 33.666666666666664 | + | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 33.666666666666664 | + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+ + +MAX +--- + +Description +>>>>>>>>>>> + +Usage: MAX(expr). Returns the maximum value of expr. + +Example:: + + PPL> source=accounts | eventstats max(age); + fetched rows / total rows = 4/4 + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------+ + | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | max(age) | + |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------| + | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 36 | + | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 36 | + | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 36 | + | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 36 | + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------+ + +MIN +--- + +Description +>>>>>>>>>>> + +Usage: MIN(expr). Returns the minimum value of expr. + +Example:: + + PPL> source=accounts | eventstats min(age) by gender; + fetched rows / total rows = 4/4 + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------+ + | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | min(age) | + |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------| + | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 28 | + | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 32 | + | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 32 | + | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 32 | + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------+ + + +VAR_SAMP +-------- + +Description +>>>>>>>>>>> + +Usage: VAR_SAMP(expr). Returns the sample variance of expr. + +Example:: + + PPL> source=accounts | eventstats var_samp(age); + fetched rows / total rows = 4/4 + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+ + | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | var_samp(age) | + |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------| + | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 10.916666666666666 | + | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 10.916666666666666 | + | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 10.916666666666666 | + | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 10.916666666666666 | + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+ + + +VAR_POP +------- + +Description +>>>>>>>>>>> + +Usage: VAR_POP(expr). Returns the population standard variance of expr. + +Example:: + + PPL> source=accounts | eventstats var_pop(age); + fetched rows / total rows = 4/4 + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------+ + | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | var_pop(age) | + |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------| + | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 8.1875 | + | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 8.1875 | + | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 8.1875 | + | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 8.1875 | + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------+ + + +STDDEV_SAMP +----------- + +Description +>>>>>>>>>>> + +Usage: STDDEV_SAMP(expr). Return the sample standard deviation of expr. + +Example:: + + PPL> source=accounts | eventstats stddev_samp(age); + fetched rows / total rows = 4/4 + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-------------------+ + | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | stddev_samp(age) | + |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-------------------| + | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 3.304037933599835 | + | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 3.304037933599835 | + | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 3.304037933599835 | + | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 3.304037933599835 | + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-------------------+ + + +STDDEV_POP +---------- + +Description +>>>>>>>>>>> + +Usage: STDDEV_POP(expr). Return the population standard deviation of expr. + +Example:: + + PPL> source=accounts | eventstats stddev_pop(age); + fetched rows / total rows = 4/4 + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+ + | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | stddev_pop(age) | + |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------| + | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 2.8613807855648994 | + | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 2.8613807855648994 | + | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 2.8613807855648994 | + | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 2.8613807855648994 | + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+ + + +Configuration +============= +This command requires Calcite enabled. + +Enable Calcite:: + + >> curl -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings -d '{ + "transient" : { + "plugins.calcite.enabled" : true + } + }' + +Result set:: + + { + "acknowledged": true, + "persistent": { + "plugins": { + "calcite": { + "enabled": "true" + } + } + }, + "transient": {} + } + +Usage +===== + +Eventstats:: + + source = table | eventstats avg(a) + source = table | where a < 50 | eventstats count(c) + source = table | eventstats min(c), max(c) by b + source = table | eventstats count(c) as count_by by b | where count_by > 1000 + + +Example 1: Calculate the average, sum and count of a field by group +================================================================== + +The example show calculate the average age, sum age and count of events of all the accounts group by gender. + +PPL query:: + + PPL> source=accounts | eventstats avg(age), sum(age), count() by gender; + fetched rows / total rows = 4/4 + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+----------+---------+ + | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | avg(age) | sum(age) | count() | + |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+----------+---------| + | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 28.0 | 28 | 1 | + | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 33.666666666666664 | 101 | 3 | + | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 33.666666666666664 | 101 | 3 | + | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 33.666666666666664 | 101 | 3 | + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+----------+---------+ + + +Example 2: Calculate the count by a gender and span +=================================================== + +The example gets the count of age by the interval of 10 years and group by gender. + +PPL query:: + + PPL> source=accounts | eventstats count() as cnt by span(age, 5) as age_span, gender + fetched rows / total rows = 4/4 + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----+ + | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | cnt | + |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----| + | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 2 | + | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 2 | + | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 1 | + | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 1 | + +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----+ + diff --git a/docs/user/ppl/cmd/join.rst b/docs/user/ppl/cmd/join.rst index b2dcc0bfc82..058b438aad4 100644 --- a/docs/user/ppl/cmd/join.rst +++ b/docs/user/ppl/cmd/join.rst @@ -12,11 +12,12 @@ join Description ============ | (Experimental) +| (From 3.0.0) | Using ``join`` command to combines two datasets together. The left side could be an index or results from a piped commands, the right side could be either an index or a subquery. Version ======= -3.0.0-beta+ +3.0.0 Syntax ====== diff --git a/docs/user/ppl/cmd/lookup.rst b/docs/user/ppl/cmd/lookup.rst index 5f5be3f83ec..dfa093c117b 100644 --- a/docs/user/ppl/cmd/lookup.rst +++ b/docs/user/ppl/cmd/lookup.rst @@ -12,13 +12,14 @@ lookup Description ============ | (Experimental) +| (From 3.0.0) | Lookup command enriches your search data by adding or replacing data from a lookup index (dimension table). You can extend fields of an index with values from a dimension table, append or replace values when lookup condition is matched. As an alternative of join command, lookup command is more suitable for enriching the source data with a static dataset. Version ======= -3.0.0-beta+ +3.0.0 Syntax ====== diff --git a/docs/user/ppl/cmd/subquery.rst b/docs/user/ppl/cmd/subquery.rst index de56fcfafb7..534db112b7e 100644 --- a/docs/user/ppl/cmd/subquery.rst +++ b/docs/user/ppl/cmd/subquery.rst @@ -12,11 +12,12 @@ subquery (aka subsearch) Description ============ | (Experimental) +| (From 3.0.0) | The subquery (aka subsearch) commands contain 4 types: ``InSubquery``, ``ExistsSubquery``, ``ScalarSubquery`` and ``RelationSubquery``. The first three are expressions, they are used in WHERE clause (``where ``) and search filter(``search source=* ``). ``RelationSubquery`` is not an expression, it is a statement. Version ======= -3.0.0-beta+ +3.0.0 Syntax ====== diff --git a/docs/user/ppl/index.rst b/docs/user/ppl/index.rst index ed3b019192c..bcdbdad3e94 100644 --- a/docs/user/ppl/index.rst +++ b/docs/user/ppl/index.rst @@ -86,11 +86,13 @@ The query start with search command and then flowing a set of command delimited - `metadata commands `_ - - `(Experimental) join command `_ + - `(Experimental)(From 3.0.0) join command `_ - - `(Experimental) lookup command `_ + - `(Experimental)(From 3.0.0) lookup command `_ - - `(Experimental) subquery (aka subsearch) command `_ + - `(Experimental)(From 3.0.0) subquery (aka subsearch) command `_ + + - `(Experimental)(From 3.1.0) eventstats command `_ * **Functions** diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLBuiltinFunctionIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLBuiltinFunctionIT.java index 6b1b33e09cc..fdf0f51b05a 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLBuiltinFunctionIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLBuiltinFunctionIT.java @@ -26,7 +26,6 @@ public class CalcitePPLBuiltinFunctionIT extends CalcitePPLIntegTestCase { public void init() throws IOException { super.init(); loadIndex(Index.STATE_COUNTRY); - loadIndex(Index.STATE_COUNTRY_WITH_NULL); loadIndex(Index.DATA_TYPE_NUMERIC); loadIndex(Index.DOG); loadIndex(Index.NULL_MISSING); diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLCastFunctionIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLCastFunctionIT.java index 48ed021e356..8869ba04e9f 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLCastFunctionIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLCastFunctionIT.java @@ -110,10 +110,13 @@ public void testCastNullValues() { + " ],\n" + " [\n" + " null\n" + + " ],\n" + + " [\n" + + " null\n" + " ]\n" + " ],\n" - + " \"total\": 5,\n" - + " \"size\": 5\n" + + " \"total\": 6,\n" + + " \"size\": 6\n" + "}", actual); } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLConditionBuiltinFunctionIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLConditionBuiltinFunctionIT.java index 4b986f15a60..e1c35f78dbe 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLConditionBuiltinFunctionIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLConditionBuiltinFunctionIT.java @@ -44,7 +44,7 @@ public void testIsNotNull() { verifySchema(actual, schema("name", "string")); - verifyDataRows(actual, rows("John"), rows("Jane"), rows("Jake"), rows("Hello")); + verifyDataRows(actual, rows("John"), rows("Jane"), rows("Jake"), rows("Hello"), rows("Kevin")); } @Test @@ -63,6 +63,7 @@ public void testNullIf() { rows("Jane", null), rows(null, 10), rows("Jake", 70), + rows("Kevin", null), rows("Hello", 30)); } @@ -83,6 +84,7 @@ public void testNullIfWithExpression() { rows("Jane", "HJane"), rows(null, null), rows("Jake", "HJake"), + rows("Kevin", "HKevin"), rows("Hello", null)); } @@ -102,6 +104,7 @@ public void testIfNull() { rows("Jane", 20), rows("Unknown", 10), rows("Jake", 70), + rows("Kevin", null), rows("Hello", 30)); } @@ -110,7 +113,8 @@ public void testIf() { JSONObject actual = executeQuery( String.format( - "source=%s | eval judge = if(age>50, 'old', 'young') | fields judge, age", + "source=%s | where isnotnull(age) | eval judge = if(age>50, 'old', 'young') |" + + " fields judge, age", TEST_INDEX_STATE_COUNTRY_WITH_NULL)); verifySchema(actual, schema("judge", "string"), schema("age", "integer")); diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLDateTimeBuiltinFunctionIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLDateTimeBuiltinFunctionIT.java index 06a1f05c702..f6b5295d95a 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLDateTimeBuiltinFunctionIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLDateTimeBuiltinFunctionIT.java @@ -39,7 +39,6 @@ public class CalcitePPLDateTimeBuiltinFunctionIT extends CalcitePPLIntegTestCase public void init() throws IOException { super.init(); loadIndex(Index.STATE_COUNTRY); - loadIndex(Index.STATE_COUNTRY_WITH_NULL); loadIndex(Index.DATE_FORMATS); loadIndex(Index.BANK_WITH_NULL_VALUES); loadIndex(Index.DATE); diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLDedupIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLDedupIT.java index 1dbf13d4133..69c05164f53 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLDedupIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLDedupIT.java @@ -105,7 +105,7 @@ public void testDedupMultipleFieldsKeepEmpty() { @Test public void testConsecutiveThrowException() { assertThrows( - IllegalStateException.class, + UnsupportedOperationException.class, () -> executeQuery( String.format( @@ -115,7 +115,7 @@ public void testConsecutiveThrowException() { TEST_INDEX_DUPLICATION_NULLABLE))); assertThrows( - IllegalStateException.class, + UnsupportedOperationException.class, () -> executeQuery( String.format( @@ -125,7 +125,7 @@ public void testConsecutiveThrowException() { TEST_INDEX_DUPLICATION_NULLABLE))); assertThrows( - IllegalStateException.class, + UnsupportedOperationException.class, () -> executeQuery( String.format( @@ -135,7 +135,7 @@ public void testConsecutiveThrowException() { TEST_INDEX_DUPLICATION_NULLABLE))); assertThrows( - IllegalStateException.class, + UnsupportedOperationException.class, () -> executeQuery( String.format( diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLEventstatsIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLEventstatsIT.java new file mode 100644 index 00000000000..2e068825faa --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLEventstatsIT.java @@ -0,0 +1,603 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.standalone; + +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STATE_COUNTRY; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STATE_COUNTRY_WITH_NULL; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifyErrorMessageContains; +import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder; + +import java.io.IOException; +import java.util.List; +import org.json.JSONObject; +import org.junit.Ignore; +import org.junit.jupiter.api.Test; +import org.opensearch.client.Request; +import org.opensearch.sql.legacy.TestsConstants; + +public class CalcitePPLEventstatsIT extends CalcitePPLIntegTestCase { + @Override + public void init() throws IOException { + super.init(); + loadIndex(Index.STATE_COUNTRY); + loadIndex(Index.STATE_COUNTRY_WITH_NULL); + } + + @Test + public void testEventstat() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats count() as cnt, avg(age) as avg, min(age) as min, max(age)" + + " as max", + TEST_INDEX_STATE_COUNTRY)); + + verifySchemaInOrder( + actual, + schema("name", "string"), + schema("country", "string"), + schema("state", "string"), + schema("month", "integer"), + schema("year", "integer"), + schema("age", "integer"), + schema("cnt", "long"), + schema("avg", "double"), + schema("min", "integer"), + schema("max", "integer")); + + verifyDataRows( + actual, + rows("John", "Canada", "Ontario", 4, 2023, 25, 4, 36.25, 20, 70), + rows("Jake", "USA", "California", 4, 2023, 70, 4, 36.25, 20, 70), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 4, 36.25, 20, 70), + rows("Hello", "USA", "New York", 4, 2023, 30, 4, 36.25, 20, 70)); + } + + @Test + public void testEventstatWithNull() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats count() as cnt, avg(age) as avg, min(age) as min, max(age)" + + " as max", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifySchemaInOrder( + actual, + schema("name", "string"), + schema("country", "string"), + schema("state", "string"), + schema("month", "integer"), + schema("year", "integer"), + schema("age", "integer"), + schema("cnt", "long"), + schema("avg", "double"), + schema("min", "integer"), + schema("max", "integer")); + + verifyDataRows( + actual, + rows(null, "Canada", null, 4, 2023, 10, 6, 31.0, 10, 70), + rows("Kevin", null, null, 4, 2023, null, 6, 31, 10, 70), + rows("John", "Canada", "Ontario", 4, 2023, 25, 6, 31.0, 10, 70), + rows("Jake", "USA", "California", 4, 2023, 70, 6, 31.0, 10, 70), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 6, 31.0, 10, 70), + rows("Hello", "USA", "New York", 4, 2023, 30, 6, 31.0, 10, 70)); + } + + @Test + public void testEventstatBy() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats count() as cnt, avg(age) as avg, min(age) as min, max(age)" + + " as max by country", + TEST_INDEX_STATE_COUNTRY)); + + verifySchemaInOrder( + actual, + schema("name", "string"), + schema("country", "string"), + schema("state", "string"), + schema("month", "integer"), + schema("year", "integer"), + schema("age", "integer"), + schema("cnt", "long"), + schema("avg", "double"), + schema("min", "integer"), + schema("max", "integer")); + + verifyDataRows( + actual, + rows("John", "Canada", "Ontario", 4, 2023, 25, 2, 22.5, 20, 25), + rows("Jake", "USA", "California", 4, 2023, 70, 2, 50, 30, 70), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2, 22.5, 20, 25), + rows("Hello", "USA", "New York", 4, 2023, 30, 2, 50, 30, 70)); + } + + @Test + public void testEventstatByWithNull() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats count() as cnt, avg(age) as avg, min(age) as min, max(age)" + + " as max by country", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifySchemaInOrder( + actual, + schema("name", "string"), + schema("country", "string"), + schema("state", "string"), + schema("month", "integer"), + schema("year", "integer"), + schema("age", "integer"), + schema("cnt", "long"), + schema("avg", "double"), + schema("min", "integer"), + schema("max", "integer")); + + verifyDataRows( + actual, + rows("Kevin", null, null, 4, 2023, null, 1, null, null, null), + rows(null, "Canada", null, 4, 2023, 10, 3, 18.333333333333332, 10, 25), + rows("John", "Canada", "Ontario", 4, 2023, 25, 3, 18.333333333333332, 10, 25), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 3, 18.333333333333332, 10, 25), + rows("Jake", "USA", "California", 4, 2023, 70, 2, 50, 30, 70), + rows("Hello", "USA", "New York", 4, 2023, 30, 2, 50, 30, 70)); + + actual = + executeQuery( + String.format( + "source=%s | eventstats count() as cnt, avg(age) as avg, min(age) as min, max(age)" + + " as max by state", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + verifyDataRows( + actual, + rows(null, "Canada", null, 4, 2023, 10, 2, 10, 10, 10), + rows("Kevin", null, null, 4, 2023, null, 2, 10, 10, 10), + rows("John", "Canada", "Ontario", 4, 2023, 25, 1, 25, 25, 25), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 1, 20, 20, 20), + rows("Jake", "USA", "California", 4, 2023, 70, 1, 70, 70, 70), + rows("Hello", "USA", "New York", 4, 2023, 30, 1, 30, 30, 30)); + } + + @Test + public void testEventstatBySpan() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats count() as cnt, avg(age) as avg, min(age) as min, max(age)" + + " as max by span(age, 10) as age_span", + TEST_INDEX_STATE_COUNTRY)); + + verifyDataRows( + actual, + rows("John", "Canada", "Ontario", 4, 2023, 25, 2, 22.5, 20, 25), + rows("Jake", "USA", "California", 4, 2023, 70, 1, 70, 70, 70), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2, 22.5, 20, 25), + rows("Hello", "USA", "New York", 4, 2023, 30, 1, 30, 30, 30)); + } + + @Test + public void testEventstatBySpanWithNull() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats count() as cnt, avg(age) as avg, min(age) as min, max(age)" + + " as max by span(age, 10) as age_span", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifyDataRows( + actual, + rows(null, "Canada", null, 4, 2023, 10, 1, 10, 10, 10), + rows("Kevin", null, null, 4, 2023, null, 1, null, null, null), + rows("John", "Canada", "Ontario", 4, 2023, 25, 2, 22.5, 20, 25), + rows("Jake", "USA", "California", 4, 2023, 70, 1, 70, 70, 70), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2, 22.5, 20, 25), + rows("Hello", "USA", "New York", 4, 2023, 30, 1, 30, 30, 30)); + } + + @Test + public void testEventstatByMultiplePartitions1() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats count() as cnt, avg(age) as avg, min(age) as min, max(age)" + + " as max by span(age, 10) as age_span, country", + TEST_INDEX_STATE_COUNTRY)); + + verifyDataRows( + actual, + rows("John", "Canada", "Ontario", 4, 2023, 25, 2, 22.5, 20, 25), + rows("Jake", "USA", "California", 4, 2023, 70, 1, 70, 70, 70), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2, 22.5, 20, 25), + rows("Hello", "USA", "New York", 4, 2023, 30, 1, 30, 30, 30)); + } + + @Test + public void testEventstatByMultiplePartitions2() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats count() as cnt, avg(age) as avg, min(age) as min, max(age)" + + " as max by span(age, 10) as age_span, state", + TEST_INDEX_STATE_COUNTRY)); + + verifyDataRows( + actual, + rows("John", "Canada", "Ontario", 4, 2023, 25, 1, 25, 25, 25), + rows("Jake", "USA", "California", 4, 2023, 70, 1, 70, 70, 70), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 1, 20, 20, 20), + rows("Hello", "USA", "New York", 4, 2023, 30, 1, 30, 30, 30)); + } + + @Test + public void testEventstatByMultiplePartitionsWithNull1() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats count() as cnt, avg(age) as avg, min(age) as min, max(age)" + + " as max by span(age, 10) as age_span, country", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifyDataRows( + actual, + rows(null, "Canada", null, 4, 2023, 10, 1, 10, 10, 10), + rows("Kevin", null, null, 4, 2023, null, 1, null, null, null), + rows("John", "Canada", "Ontario", 4, 2023, 25, 2, 22.5, 20, 25), + rows("Jake", "USA", "California", 4, 2023, 70, 1, 70, 70, 70), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2, 22.5, 20, 25), + rows("Hello", "USA", "New York", 4, 2023, 30, 1, 30, 30, 30)); + } + + @Test + public void testEventstatByMultiplePartitionsWithNull2() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats count() as cnt, avg(age) as avg, min(age) as min, max(age)" + + " as max by span(age, 10) as age_span, state", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifyDataRows( + actual, + rows(null, "Canada", null, 4, 2023, 10, 1, 10, 10, 10), + rows("Kevin", null, null, 4, 2023, null, 1, null, null, null), + rows("John", "Canada", "Ontario", 4, 2023, 25, 1, 25, 25, 25), + rows("Jake", "USA", "California", 4, 2023, 70, 1, 70, 70, 70), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 1, 20, 20, 20), + rows("Hello", "USA", "New York", 4, 2023, 30, 1, 30, 30, 30)); + } + + @Test + public void testUnsupportedWindowFunctions() { + List unsupported = List.of("PERCENTILE_APPROX", "PERCENTILE"); + for (String u : unsupported) { + UnsupportedOperationException e = + assertThrows( + UnsupportedOperationException.class, + () -> + executeQuery( + String.format( + "source=%s | eventstats %s(age)", TEST_INDEX_STATE_COUNTRY, u))); + verifyErrorMessageContains(e, "Unexpected window function: " + u); + } + } + + @Ignore("DC should fail in window function") + public void testDistinctCountShouldFail() throws IOException { + Request request1 = + new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/5?refresh=true"); + request1.setJsonEntity( + "{\"name\":\"Jim\",\"age\":27,\"state\":\"Ontario\",\"country\":\"Canada\",\"year\":2023,\"month\":4}"); + client().performRequest(request1); + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats distinct_count(state) by country", + TEST_INDEX_STATE_COUNTRY)); + + verifyDataRows( + actual, + rows("John", "Canada", "Ontario", 4, 2023, 25, 3), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 3), + rows("Jim", "Canada", "Ontario", 4, 2023, 27, 3), + rows("Jake", "USA", "California", 4, 2023, 70, 2), + rows("Hello", "USA", "New York", 4, 2023, 30, 2)); + } + + @Test + public void testMultipleEventstat() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats avg(age) as avg_age by state, country | eventstats" + + " avg(avg_age) as avg_state_age by country", + TEST_INDEX_STATE_COUNTRY)); + + verifyDataRows( + actual, + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20.0, 22.5), + rows("John", "Canada", "Ontario", 4, 2023, 25, 25.0, 22.5), + rows("Jake", "USA", "California", 4, 2023, 70, 70.0, 50.0), + rows("Hello", "USA", "New York", 4, 2023, 30, 30.0, 50.0)); + } + + @Test + public void testMultipleEventstatWithNull() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats avg(age) as avg_age by state, country | eventstats" + + " avg(avg_age) as avg_state_age by country", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifyDataRows( + actual, + rows("Kevin", null, null, 4, 2023, null, null, null), + rows(null, "Canada", null, 4, 2023, 10, 10, 18.333333333333332), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20.0, 18.333333333333332), + rows("John", "Canada", "Ontario", 4, 2023, 25, 25.0, 18.333333333333332), + rows("Jake", "USA", "California", 4, 2023, 70, 70.0, 50.0), + rows("Hello", "USA", "New York", 4, 2023, 30, 30.0, 50.0)); + } + + @Test + public void testMultipleEventstatWithEval() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats avg(age) as avg_age by country, state, name | eval" + + " avg_age_divide_20 = avg_age - 20 | eventstats avg(avg_age_divide_20) as" + + " avg_state_age by country, state | where avg_state_age > 0 | eventstats" + + " count(avg_state_age) as count_country_age_greater_20 by country", + TEST_INDEX_STATE_COUNTRY)); + + verifyDataRows( + actual, + rows("John", "Canada", "Ontario", 4, 2023, 25, 25, 5, 5, 1), + rows("Jake", "USA", "California", 4, 2023, 70, 70, 50, 50, 2), + rows("Hello", "USA", "New York", 4, 2023, 30, 30, 10, 10, 2)); + } + + @Test + public void testEventstatEmptyRows() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | where name = 'non-existed' | eventstats count(), avg(age), min(age)," + + " max(age), stddev_pop(age), stddev_samp(age), var_pop(age), var_samp(age)", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + verifyNumOfRows(actual, 0); + + JSONObject actual2 = + executeQuery( + String.format( + "source=%s | where name = 'non-existed' | eventstats count(), avg(age), min(age)," + + " max(age), stddev_pop(age), stddev_samp(age), var_pop(age), var_samp(age) by" + + " country", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + verifyNumOfRows(actual2, 0); + } + + @Test + public void testEventstatVariance() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats stddev_pop(age), stddev_samp(age), var_pop(age)," + + " var_samp(age)", + TEST_INDEX_STATE_COUNTRY)); + + verifySchemaInOrder( + actual, + schema("name", "string"), + schema("country", "string"), + schema("state", "string"), + schema("month", "integer"), + schema("year", "integer"), + schema("age", "integer"), + schema("stddev_pop(age)", "double"), + schema("stddev_samp(age)", "double"), + schema("var_pop(age)", "double"), + schema("var_samp(age)", "double")); + + verifyDataRows( + actual, + rows( + "John", + "Canada", + "Ontario", + 4, + 2023, + 25, + 19.803724397193573, + 22.86737122335374, + 392.1875, + 522.9166666666666), + rows( + "Jake", + "USA", + "California", + 4, + 2023, + 70, + 19.803724397193573, + 22.86737122335374, + 392.1875, + 522.9166666666666), + rows( + "Jane", + "Canada", + "Quebec", + 4, + 2023, + 20, + 19.803724397193573, + 22.86737122335374, + 392.1875, + 522.9166666666666), + rows( + "Hello", + "USA", + "New York", + 4, + 2023, + 30, + 19.803724397193573, + 22.86737122335374, + 392.1875, + 522.9166666666666)); + } + + @Test + public void testEventstatVarianceWithNull() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats stddev_pop(age), stddev_samp(age), var_pop(age)," + + " var_samp(age)", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifySchemaInOrder( + actual, + schema("name", "string"), + schema("country", "string"), + schema("state", "string"), + schema("month", "integer"), + schema("year", "integer"), + schema("age", "integer"), + schema("stddev_pop(age)", "double"), + schema("stddev_samp(age)", "double"), + schema("var_pop(age)", "double"), + schema("var_samp(age)", "double")); + + verifyDataRows( + actual, + rows(null, "Canada", null, 4, 2023, 10, 20.591260281974, 23.021728866442675, 424, 530), + rows("Kevin", null, null, 4, 2023, null, 20.591260281974, 23.021728866442675, 424, 530), + rows( + "John", + "Canada", + "Ontario", + 4, + 2023, + 25, + 20.591260281974, + 23.021728866442675, + 424, + 530), + rows( + "Jake", + "USA", + "California", + 4, + 2023, + 70, + 20.591260281974, + 23.021728866442675, + 424, + 530), + rows( + "Jane", "Canada", "Quebec", 4, 2023, 20, 20.591260281974, 23.021728866442675, 424, 530), + rows( + "Hello", + "USA", + "New York", + 4, + 2023, + 30, + 20.591260281974, + 23.021728866442675, + 424, + 530)); + } + + @Test + public void testEventstatVarianceBy() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats stddev_pop(age), stddev_samp(age), var_pop(age)," + + " var_samp(age) by country", + TEST_INDEX_STATE_COUNTRY)); + + verifyDataRows( + actual, + rows("John", "Canada", "Ontario", 4, 2023, 25, 2.5, 3.5355339059327378, 6.25, 12.5), + rows("Jake", "USA", "California", 4, 2023, 70, 20, 28.284271247461902, 400, 800), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2.5, 3.5355339059327378, 6.25, 12.5), + rows("Hello", "USA", "New York", 4, 2023, 30, 20, 28.284271247461902, 400, 800)); + } + + @Test + public void testEventstatVarianceBySpan() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | where country != 'USA' | eventstats stddev_samp(age) by span(age, 10)", + TEST_INDEX_STATE_COUNTRY)); + + verifyDataRows( + actual, + rows("John", "Canada", "Ontario", 4, 2023, 25, 3.5355339059327378), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 3.5355339059327378)); + } + + @Test + public void testEventstatVarianceWithNullBy() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eventstats stddev_pop(age), stddev_samp(age), var_pop(age)," + + " var_samp(age) by country", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifyDataRows( + actual, + rows( + null, + "Canada", + null, + 4, + 2023, + 10, + 6.2360956446232345, + 7.6376261582597325, + 38.88888888888888, + 58.333333333333314), + rows("Kevin", null, null, 4, 2023, null, null, null, null, null), + rows( + "John", + "Canada", + "Ontario", + 4, + 2023, + 25, + 6.2360956446232345, + 7.6376261582597325, + 38.88888888888888, + 58.333333333333314), + rows("Jake", "USA", "California", 4, 2023, 70, 20, 28.284271247461902, 400, 800), + rows( + "Jane", + "Canada", + "Quebec", + 4, + 2023, + 20, + 6.2360956446232345, + 7.6376261582597325, + 38.88888888888888, + 58.333333333333314), + rows("Hello", "USA", "New York", 4, 2023, 30, 20, 28.284271247461902, 400, 800)); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLEventstatsPushdownIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLEventstatsPushdownIT.java new file mode 100644 index 00000000000..290e8f9163d --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLEventstatsPushdownIT.java @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.standalone; + +import org.opensearch.sql.common.setting.Settings; + +public class CalcitePPLEventstatsPushdownIT extends CalcitePPLEventstatsIT { + @Override + protected Settings getSettings() { + return enablePushdown(); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLExistsSubqueryIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLExistsSubqueryIT.java index a179dc88567..158525cb26d 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLExistsSubqueryIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLExistsSubqueryIT.java @@ -63,6 +63,23 @@ public void testSimpleExistsSubquery() { rows(1006, "Tommy", 30000)); } + @Test + public void testExistsSubqueryAndAggregation() { + JSONObject result = + executeQuery( + String.format( + """ + source = %s + | where exists [ + source = %s | where id = uid + ] + | stats count() by country + """, + TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION)); + verifySchema(result, schema("country", "string"), schema("count()", "long")); + verifyDataRows(result, rows(1, null), rows(2, "Canada"), rows(1, "USA"), rows(1, "England")); + } + @Test public void testSimpleExistsSubqueryInFilter() { JSONObject result = diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java index a6851ae6305..2f2857f8521 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java @@ -215,6 +215,8 @@ public void onFailure(Exception e) { throw (UnsupportedCursorRequestException) e; } else if (e instanceof NoCursorException) { throw (NoCursorException) e; + } else if (e instanceof UnsupportedOperationException) { + throw (UnsupportedOperationException) e; } else if (e instanceof IllegalArgumentException) { // most exceptions thrown by Calcite when resolve a plan. throw (IllegalArgumentException) e; diff --git a/integ-test/src/test/resources/state_country_with_null.json b/integ-test/src/test/resources/state_country_with_null.json index 498161ca356..1141d43eef3 100644 --- a/integ-test/src/test/resources/state_country_with_null.json +++ b/integ-test/src/test/resources/state_country_with_null.json @@ -8,3 +8,5 @@ {"name":"Jane","age":20,"state":"Quebec","country":"Canada","year":2023,"month":4} {"index":{"_id":"5"}} {"name":null,"age":10,"state":null,"country":"Canada","year":2023,"month":4} +{"index":{"_id":"6"}} +{"name":"Kevin","year":2023,"month":4} diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 4dd27b20921..ca928ad668b 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -19,6 +19,7 @@ WHERE: 'WHERE'; FIELDS: 'FIELDS'; RENAME: 'RENAME'; STATS: 'STATS'; +EVENTSTATS: 'EVENTSTATS'; DEDUP: 'DEDUP'; SORT: 'SORT'; EVAL: 'EVAL'; @@ -221,8 +222,6 @@ STDDEV_POP: 'STDDEV_POP'; PERCENTILE: 'PERCENTILE'; PERCENTILE_APPROX: 'PERCENTILE_APPROX'; TAKE: 'TAKE'; -FIRST: 'FIRST'; -LAST: 'LAST'; LIST: 'LIST'; VALUES: 'VALUES'; PER_DAY: 'PER_DAY'; @@ -234,6 +233,17 @@ SPARKLINE: 'SPARKLINE'; C: 'C'; DC: 'DC'; +// SCALAR WINDOW FUNCTIONS +ROW_NUMBER: 'ROW_NUMBER'; +RANK: 'RANK'; +DENSE_RANK: 'DENSE_RANK'; +PERCENT_RANK: 'PERCENT_RANK'; +CUME_DIST: 'CUME_DIST'; +FIRST: 'FIRST'; +LAST: 'LAST'; +NTH: 'NTH'; +NTILE: 'NTILE'; + // BASIC FUNCTIONS ABS: 'ABS'; CBRT: 'CBRT'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 93098478ea0..4d523bb426a 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -54,6 +54,7 @@ commands | joinCommand | renameCommand | statsCommand + | eventstatsCommand | dedupCommand | sortCommand | evalCommand @@ -80,6 +81,7 @@ commandName | JOIN | RENAME | STATS + | EVENTSTATS | DEDUP | SORT | EVAL @@ -128,6 +130,10 @@ statsCommand : STATS (PARTITIONS EQUAL partitions = integerLiteral)? (ALLNUM EQUAL allnum = booleanLiteral)? (DELIM EQUAL delim = stringLiteral)? statsAggTerm (COMMA statsAggTerm)* (statsByClause)? (DEDUP_SPLITVALUES EQUAL dedupsplit = booleanLiteral)? ; +eventstatsCommand + : EVENTSTATS eventstatsAggTerm (COMMA eventstatsAggTerm)* (statsByClause)? + ; + dedupCommand : DEDUP (number = integerLiteral)? fieldList (KEEPEMPTY EQUAL keepempty = booleanLiteral)? (CONSECUTIVE EQUAL consecutive = booleanLiteral)? ; @@ -348,6 +354,31 @@ evalClause : fieldExpression EQUAL expression ; +eventstatsAggTerm + : windowFunction (AS alias = wcFieldExpression)? + ; + +windowFunction + : windowFunctionName LT_PRTHS functionArgs RT_PRTHS + ; + +windowFunctionName + : statsFunctionName + | scalarWindowFunctionName + ; + +scalarWindowFunctionName + : ROW_NUMBER + | RANK + | DENSE_RANK + | PERCENT_RANK + | CUME_DIST + | FIRST + | LAST + | NTH + | NTILE + ; + // aggregation terms statsAggTerm : statsFunction (AS alias = wcFieldExpression)? @@ -1054,8 +1085,9 @@ keywordsCanBeId | TIME_ZONE | TRAINING_DATA_SIZE | ANOMALY_SCORE_THRESHOLD - // AGGREGATIONS + // AGGREGATIONS AND WINDOW | statsFunctionName + | windowFunctionName | DISTINCT_COUNT | ESTDC | ESTDC_ERROR @@ -1069,8 +1101,6 @@ keywordsCanBeId | VAR_SAMP | VAR_POP | TAKE - | FIRST - | LAST | LIST | VALUES | PER_DAY diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 181d5692bdb..5d1bfc97df0 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -78,6 +78,7 @@ import org.opensearch.sql.ast.tree.TableFunction; import org.opensearch.sql.ast.tree.Trendline; import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.ast.tree.Window; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.common.utils.StringUtils; @@ -328,6 +329,46 @@ public UnresolvedPlan visitStatsCommand(StatsCommandContext ctx) { return aggregation; } + public UnresolvedPlan visitEventstatsCommand(OpenSearchPPLParser.EventstatsCommandContext ctx) { + ImmutableList.Builder partExprListBuilder = new ImmutableList.Builder<>(); + Optional.ofNullable(ctx.statsByClause()) + .map(OpenSearchPPLParser.StatsByClauseContext::bySpanClause) + .map(this::internalVisitExpression) + .ifPresent(partExprListBuilder::add); + + Optional.ofNullable(ctx.statsByClause()) + .map(OpenSearchPPLParser.StatsByClauseContext::fieldList) + .map( + expr -> + expr.fieldExpression().stream() + .map( + groupCtx -> + (UnresolvedExpression) + new Alias( + StringUtils.unquoteIdentifier(getTextInQuery(groupCtx)), + internalVisitExpression(groupCtx))) + .collect(Collectors.toList())) + .ifPresent(partExprListBuilder::addAll); + + ImmutableList.Builder windownFunctionListBuilder = + new ImmutableList.Builder<>(); + for (OpenSearchPPLParser.EventstatsAggTermContext aggCtx : ctx.eventstatsAggTerm()) { + UnresolvedExpression windowFunction = internalVisitExpression(aggCtx.windowFunction()); + // set partition by list for window function + if (windowFunction instanceof WindowFunction) { + ((WindowFunction) windowFunction).setPartitionByList(partExprListBuilder.build()); + } + String name = + aggCtx.alias == null + ? getTextInQuery(aggCtx) + : StringUtils.unquoteIdentifier(aggCtx.alias.getText()); + Alias alias = new Alias(name, windowFunction); + windownFunctionListBuilder.add(alias); + } + + return new Window(windownFunctionListBuilder.build()); + } + /** Dedup command. */ @Override public UnresolvedPlan visitDedupCommand(DedupCommandContext ctx) { diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index 0755d035de8..679565e22f6 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -474,6 +474,14 @@ public UnresolvedExpression visitBetween(OpenSearchPPLParser.BetweenContext ctx) return ctx.NOT() != null ? new Not(betweenExpr) : betweenExpr; } + @Override + public UnresolvedExpression visitWindowFunction(OpenSearchPPLParser.WindowFunctionContext ctx) { + Function f = + buildFunction(ctx.windowFunctionName().getText(), ctx.functionArgs().functionArg()); + // In PPL eventstats command, all window functions have the same partition and order spec. + return new WindowFunction(f); + } + private QualifiedName visitIdentifiers(List ctx) { return new QualifiedName( ctx.stream() diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index bcec849846b..6f90dd4d9ca 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -40,6 +40,7 @@ import org.opensearch.sql.ast.expression.Span; import org.opensearch.sql.ast.expression.UnresolvedExpression; import org.opensearch.sql.ast.expression.When; +import org.opensearch.sql.ast.expression.WindowFunction; import org.opensearch.sql.ast.expression.Xor; import org.opensearch.sql.ast.expression.subquery.ExistsSubquery; import org.opensearch.sql.ast.expression.subquery.InSubquery; @@ -65,6 +66,7 @@ import org.opensearch.sql.ast.tree.TableFunction; import org.opensearch.sql.ast.tree.Trendline; import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.ast.tree.Window; import org.opensearch.sql.common.utils.StringUtils; import org.opensearch.sql.planner.logical.LogicalAggregation; import org.opensearch.sql.planner.logical.LogicalDedupe; @@ -223,6 +225,14 @@ public String visitAggregation(Aggregation node, String context) { child, String.join(" ", visitExpressionList(node.getAggExprList()), groupBy(group)).trim()); } + @Override + public String visitWindow(Window node, String context) { + String child = node.getChild().get(0).accept(this, context); + return StringUtils.format( + "%s | eventstats %s", + child, String.join(" ", visitExpressionList(node.getWindowFunctionList())).trim()); + } + /** Build {@link LogicalRareTopN}. */ @Override public String visitRareTopN(RareTopN node, String context) { @@ -460,6 +470,20 @@ public String visitFunction(Function node, String context) { return StringUtils.format("%s(%s)", node.getFuncName(), arguments); } + @Override + public String visitWindowFunction(WindowFunction node, String context) { + String function = analyze(node.getFunction(), context); + String partitions = + node.getPartitionByList().stream() + .map(p -> analyze(p, context)) + .collect(Collectors.joining(",")); + if (partitions.isEmpty()) { + return StringUtils.format("%s", function); + } else { + return StringUtils.format("%s by %s", function, partitions); + } + } + @Override public String visitCompare(Compare node, String context) { String left = analyze(node.getLeft(), context); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEventstatsTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEventstatsTest.java new file mode 100644 index 00000000000..cd808621407 --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEventstatsTest.java @@ -0,0 +1,73 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.test.CalciteAssert; +import org.junit.Test; + +public class CalcitePPLEventstatsTest extends CalcitePPLAbstractTest { + + public CalcitePPLEventstatsTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + @Test + public void testEventstatsCount() { + String ppl = "source=EMP | eventstats count()"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5]," + + " COMM=[$6], DEPTNO=[$7], count()=[COUNT() OVER ()])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`, COUNT(*) OVER" + + " (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) `count()`\n" + + "FROM `scott`.`EMP`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testEventstatsBy() { + String ppl = "source=EMP | eventstats max(SAL) by DEPTNO"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5]," + + " COMM=[$6], DEPTNO=[$7], max(SAL)=[MAX($5) OVER (PARTITION BY $7)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`, MAX(`SAL`)" + + " OVER (PARTITION BY `DEPTNO` RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED" + + " FOLLOWING) `max(SAL)`\n" + + "FROM `scott`.`EMP`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testEventstatsAvg() { + String ppl = "source=EMP | eventstats avg(SAL)"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5]," + + " COMM=[$6], DEPTNO=[$7], avg(SAL)=[/(SUM($5) OVER (), CAST(COUNT($5) OVER ()):DOUBLE" + + " NOT NULL)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + // Bug of Calcite, should be OVER (ROWS ...) + String expectedSparkSql = + "SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`, (SUM(`SAL`)" + + " OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) /" + + " CAST(COUNT(`SAL`) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)" + + " AS DOUBLE) `avg(SAL)`\n" + + "FROM `scott`.`EMP`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index 51fa1ea1bc7..0a1308b070a 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -94,6 +94,24 @@ public void testStatsCommandWithSpanFunction() { anonymize("source=t | stats count(a) by span(b, 1d), c")); } + @Test + public void testEventstatsCommandWithByClause() { + assertEquals( + "source=t | eventstats count(a) by b", anonymize("source=t | eventstats count(a) by b")); + } + + @Test + public void testEventstatsCommandWithNestedFunctions() { + assertEquals("source=t | eventstats sum(+(a,b))", anonymize("source=t | eventstats sum(a+b)")); + } + + @Test + public void testEventstatsCommandWithSpanFunction() { + assertEquals( + "source=t | eventstats count(a) by span(b, *** d),c", + anonymize("source=t | eventstats count(a) by span(b, 1d), c")); + } + @Test public void testDedupCommand() { assertEquals(