From 8362b7c525844ac969a5234314e550bfe13a4820 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 28 Oct 2022 15:48:14 -0700 Subject: [PATCH 1/8] Make span collector not special Signed-off-by: Chen Dai --- .../planner/physical/AggregationOperator.java | 21 ++----------------- .../planner/physical/collector/Collector.java | 20 +++++++++++------- 2 files changed, 15 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/AggregationOperator.java b/core/src/main/java/org/opensearch/sql/planner/physical/AggregationOperator.java index 3cf823d0e2..1d9523464b 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/AggregationOperator.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/AggregationOperator.java @@ -34,8 +34,7 @@ public class AggregationOperator extends PhysicalPlan { private final List aggregatorList; @Getter private final List groupByExprList; - @Getter - private final NamedExpression span; + /** * {@link BindingTuple} Collector. */ @@ -56,18 +55,7 @@ public AggregationOperator(PhysicalPlan input, List aggregatorL this.input = input; this.aggregatorList = aggregatorList; this.groupByExprList = groupByExprList; - if (hasSpan(groupByExprList)) { - // span expression is always the first expression in group list if exist. - this.span = groupByExprList.get(0); - this.collector = - Collector.Builder.build( - this.span, groupByExprList.subList(1, groupByExprList.size()), this.aggregatorList); - - } else { - this.span = null; - this.collector = - Collector.Builder.build(this.span, this.groupByExprList, this.aggregatorList); - } + this.collector = Collector.Builder.build(groupByExprList, this.aggregatorList); } @Override @@ -99,9 +87,4 @@ public void open() { } iterator = collector.results().iterator(); } - - private boolean hasSpan(List namedExpressionList) { - return !namedExpressionList.isEmpty() - && namedExpressionList.get(0).getDelegated() instanceof SpanExpression; - } } diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/collector/Collector.java b/core/src/main/java/org/opensearch/sql/planner/physical/collector/Collector.java index 66eba7440b..d7186a1bc2 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/collector/Collector.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/collector/Collector.java @@ -11,6 +11,7 @@ import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.expression.NamedExpression; import org.opensearch.sql.expression.aggregation.NamedAggregator; +import org.opensearch.sql.expression.span.SpanExpression; import org.opensearch.sql.storage.bindingtuple.BindingTuple; /** @@ -40,18 +41,23 @@ class Builder { /** * build {@link Collector}. */ - public static Collector build( - NamedExpression span, List buckets, List aggregators) { - if (span == null && buckets.isEmpty()) { + public static Collector build(List buckets, + List aggregators) { + if (buckets.isEmpty()) { return new MetricCollector(aggregators); - } else if (span != null) { - return new SpanCollector(span, () -> build(null, buckets, aggregators)); + } else if (isWindowExpression(buckets.get(0))) { + return new SpanCollector( + buckets.get(0), + () -> build(ImmutableList.copyOf(buckets.subList(1, buckets.size())), aggregators)); } else { return new BucketCollector( buckets.get(0), - () -> - build(null, ImmutableList.copyOf(buckets.subList(1, buckets.size())), aggregators)); + () -> build(ImmutableList.copyOf(buckets.subList(1, buckets.size())), aggregators)); } } + + private static boolean isWindowExpression(NamedExpression expr) { + return expr.getDelegated() instanceof SpanExpression; + } } } From 55eebccd1e6109b524d4c2c499ca02bbaaffb5de Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 28 Oct 2022 16:35:44 -0700 Subject: [PATCH 2/8] Add more UT for span Signed-off-by: Chen Dai --- .../physical/AggregationOperatorTest.java | 91 +++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/AggregationOperatorTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/AggregationOperatorTest.java index 318499c075..46b9821752 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/AggregationOperatorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/AggregationOperatorTest.java @@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test; import org.opensearch.sql.data.model.ExprDateValue; import org.opensearch.sql.data.model.ExprDatetimeValue; +import org.opensearch.sql.data.model.ExprStringValue; import org.opensearch.sql.data.model.ExprTimeValue; import org.opensearch.sql.data.model.ExprTimestampValue; import org.opensearch.sql.data.model.ExprValue; @@ -496,6 +497,96 @@ public void twoBucketsSpanAndLong() { )); } + @Test + public void aggregate_with_two_groups_with_windowing() { + PhysicalPlan plan = new AggregationOperator(testScan(compoundInputs), + Collections.singletonList(DSL.named("sum", dsl.sum(DSL.ref("errors", INTEGER)))), + Arrays.asList( + DSL.named("host", DSL.ref("host", STRING)), + DSL.named("span", DSL.span(DSL.ref("day", DATE), DSL.literal(1), "d")))); + + List result = execute(plan); + assertEquals(7, result.size()); + assertThat(result, containsInRelativeOrder( + ExprValueUtils.tupleValue(ImmutableMap.of( + "host", new ExprStringValue("h1"), + "span", new ExprDateValue("2021-01-03"), + "sum", 2)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "host", new ExprStringValue("h1"), + "span", new ExprDateValue("2021-01-04"), + "sum", 1)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "host", new ExprStringValue("h1"), + "span", new ExprDateValue("2021-01-06"), + "sum", 1)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "host", new ExprStringValue("h1"), + "span", new ExprDateValue("2021-01-07"), + "sum", 6)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "host", new ExprStringValue("h2"), + "span", new ExprDateValue("2021-01-03"), + "sum", 3)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "host", new ExprStringValue("h2"), + "span", new ExprDateValue("2021-01-04"), + "sum", 10)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "host", new ExprStringValue("h2"), + "span", new ExprDateValue("2021-01-07"), + "sum", 8)))); + } + + @Test + public void aggregate_with_three_groups_with_windowing() { + PhysicalPlan plan = new AggregationOperator(testScan(compoundInputs), + Collections.singletonList(DSL.named("sum", dsl.sum(DSL.ref("errors", INTEGER)))), + Arrays.asList( + DSL.named("host", DSL.ref("host", STRING)), + DSL.named("span", DSL.span(DSL.ref("day", DATE), DSL.literal(1), "d")), + DSL.named("region", DSL.ref("region", STRING)))); + + List result = execute(plan); + assertEquals(7, result.size()); + assertThat(result, containsInRelativeOrder( + ExprValueUtils.tupleValue(ImmutableMap.of( + "host", new ExprStringValue("h1"), + "span", new ExprDateValue("2021-01-03"), + "region", new ExprStringValue("iad"), + "sum", 2)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "host", new ExprStringValue("h1"), + "span", new ExprDateValue("2021-01-04"), + "region", new ExprStringValue("iad"), + "sum", 1)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "host", new ExprStringValue("h1"), + "span", new ExprDateValue("2021-01-06"), + "region", new ExprStringValue("iad"), + "sum", 1)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "host", new ExprStringValue("h1"), + "span", new ExprDateValue("2021-01-07"), + "region", new ExprStringValue("iad"), + "sum", 6)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "host", new ExprStringValue("h2"), + "span", new ExprDateValue("2021-01-03"), + "region", new ExprStringValue("iad"), + "sum", 3)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "host", new ExprStringValue("h2"), + "span", new ExprDateValue("2021-01-04"), + "region", new ExprStringValue("iad"), + "sum", 10)), + ExprValueUtils.tupleValue(ImmutableMap.of( + "host", new ExprStringValue("h2"), + "span", new ExprDateValue("2021-01-07"), + "region", new ExprStringValue("iad"), + "sum", 8)))); + } + @Test public void copyOfAggregationOperatorShouldSame() { AggregationOperator plan = new AggregationOperator(testScan(datetimeInputs), From 39c7a33ec41c02307ad9bd89c2774393976f8a98 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 3 Nov 2022 10:23:40 -0700 Subject: [PATCH 3/8] Prepare for removing span collector Signed-off-by: Chen Dai --- .../sql/expression/span/SpanExpression.java | 20 ++++++++++++++++--- .../physical/collector/BucketCollector.java | 4 ++-- .../planner/physical/collector/Collector.java | 9 --------- .../physical/collector/SpanCollector.java | 6 ++++-- 4 files changed, 23 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/expression/span/SpanExpression.java b/core/src/main/java/org/opensearch/sql/expression/span/SpanExpression.java index 715c911b13..e9cd0803f6 100644 --- a/core/src/main/java/org/opensearch/sql/expression/span/SpanExpression.java +++ b/core/src/main/java/org/opensearch/sql/expression/span/SpanExpression.java @@ -7,7 +7,6 @@ import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.ToString; import org.opensearch.sql.ast.expression.SpanUnit; import org.opensearch.sql.data.model.ExprValue; @@ -15,8 +14,8 @@ import org.opensearch.sql.expression.Expression; import org.opensearch.sql.expression.ExpressionNodeVisitor; import org.opensearch.sql.expression.env.Environment; +import org.opensearch.sql.planner.physical.collector.Rounding; -@RequiredArgsConstructor @Getter @ToString @EqualsAndHashCode @@ -25,9 +24,24 @@ public class SpanExpression implements Expression { private final Expression value; private final SpanUnit unit; + /** + * Rounding. + */ + private final Rounding rounding; + + /** + * Construct a span expression by field and span interval expression. + */ + public SpanExpression(Expression field, Expression value, SpanUnit unit) { + this.field = field; + this.value = value; + this.unit = unit; + this.rounding = Rounding.createRounding(this); + } + @Override public ExprValue valueOf(Environment valueEnv) { - return value.valueOf(valueEnv); + return rounding.round(field.valueOf(valueEnv)); } /** diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/collector/BucketCollector.java b/core/src/main/java/org/opensearch/sql/planner/physical/collector/BucketCollector.java index c3a803ca9e..1f49457a78 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/collector/BucketCollector.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/collector/BucketCollector.java @@ -7,11 +7,11 @@ import com.google.common.collect.ImmutableList; import java.util.Arrays; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.TreeMap; import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; @@ -40,7 +40,7 @@ public class BucketCollector implements Collector { /** * Map between bucketKey and collector in the bucket. */ - private final Map collectorMap = new HashMap<>(); + private final Map collectorMap = new TreeMap<>(); /** * Bucket Index. diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/collector/Collector.java b/core/src/main/java/org/opensearch/sql/planner/physical/collector/Collector.java index d7186a1bc2..a2b3a41a27 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/collector/Collector.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/collector/Collector.java @@ -11,7 +11,6 @@ import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.expression.NamedExpression; import org.opensearch.sql.expression.aggregation.NamedAggregator; -import org.opensearch.sql.expression.span.SpanExpression; import org.opensearch.sql.storage.bindingtuple.BindingTuple; /** @@ -45,19 +44,11 @@ public static Collector build(List buckets, List aggregators) { if (buckets.isEmpty()) { return new MetricCollector(aggregators); - } else if (isWindowExpression(buckets.get(0))) { - return new SpanCollector( - buckets.get(0), - () -> build(ImmutableList.copyOf(buckets.subList(1, buckets.size())), aggregators)); } else { return new BucketCollector( buckets.get(0), () -> build(ImmutableList.copyOf(buckets.subList(1, buckets.size())), aggregators)); } } - - private static boolean isWindowExpression(NamedExpression expr) { - return expr.getDelegated() instanceof SpanExpression; - } } } diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/collector/SpanCollector.java b/core/src/main/java/org/opensearch/sql/planner/physical/collector/SpanCollector.java index 092d79bd81..b074bc78a6 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/collector/SpanCollector.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/collector/SpanCollector.java @@ -53,7 +53,8 @@ protected ExprValue bucketKey(BindingTuple tuple) { */ @Override protected ExprValue[] allocateBuckets() { - return rounding.createBuckets(); + //return rounding.createBuckets(); + return super.allocateBuckets(); } /** @@ -64,6 +65,7 @@ protected ExprValue[] allocateBuckets() { */ @Override protected int locateBucket(ExprValue value) { - return rounding.locate(value); + //return rounding.locate(value); + return super.locateBucket(value); } } From cec10291a56af51bc4deb5f1077b18433a3d0224 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 3 Nov 2022 14:49:43 -0700 Subject: [PATCH 4/8] Delete span collector Signed-off-by: Chen Dai --- .../physical/collector/SpanCollector.java | 71 ------------------- 1 file changed, 71 deletions(-) delete mode 100644 core/src/main/java/org/opensearch/sql/planner/physical/collector/SpanCollector.java diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/collector/SpanCollector.java b/core/src/main/java/org/opensearch/sql/planner/physical/collector/SpanCollector.java deleted file mode 100644 index b074bc78a6..0000000000 --- a/core/src/main/java/org/opensearch/sql/planner/physical/collector/SpanCollector.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.planner.physical.collector; - -import java.util.function.Supplier; -import org.opensearch.sql.data.model.ExprValue; -import org.opensearch.sql.expression.NamedExpression; -import org.opensearch.sql.expression.span.SpanExpression; -import org.opensearch.sql.storage.bindingtuple.BindingTuple; - -/** - * Span Collector. - */ -public class SpanCollector extends BucketCollector { - - /** - * Span Expression. - */ - private final SpanExpression spanExpr; - - /** - * Rounding. - */ - private final Rounding rounding; - - /** - * Constructor. - */ - public SpanCollector(NamedExpression bucketExpr, Supplier supplier) { - super(bucketExpr, supplier); - this.spanExpr = (SpanExpression) bucketExpr.getDelegated(); - this.rounding = Rounding.createRounding(spanExpr); - } - - /** - * Rounding bucket value. - * - * @param tuple {@link BindingTuple}. - * @return {@link ExprValue}. - */ - @Override - protected ExprValue bucketKey(BindingTuple tuple) { - return rounding.round(spanExpr.getField().valueOf(tuple)); - } - - /** - * Allocates Buckets for building results. - * - * @return buckets. - */ - @Override - protected ExprValue[] allocateBuckets() { - //return rounding.createBuckets(); - return super.allocateBuckets(); - } - - /** - * Current Bucket index in allocated buckets. - * - * @param value bucket key. - * @return index. - */ - @Override - protected int locateBucket(ExprValue value) { - //return rounding.locate(value); - return super.locateBucket(value); - } -} From 5ade36648f4050590bf3a575435d2b4fdf8943e2 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 3 Nov 2022 15:11:51 -0700 Subject: [PATCH 5/8] Delete unused allocate and locate code in Rounding class Signed-off-by: Chen Dai --- .../planner/physical/collector/Rounding.java | 222 ------------------ .../physical/collector/RoundingTest.java | 5 - 2 files changed, 227 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java b/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java index 2934b44e95..4eaafea59b 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java @@ -17,7 +17,6 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneId; -import java.time.ZonedDateTime; import java.time.temporal.ChronoField; import java.util.Arrays; import java.util.concurrent.TimeUnit; @@ -40,10 +39,6 @@ */ @EqualsAndHashCode public abstract class Rounding { - @Getter - protected T maxRounded; - @Getter - protected T minRounded; /** * Create Rounding instance. @@ -75,10 +70,6 @@ public static Rounding createRounding(SpanExpression span) { public abstract ExprValue round(ExprValue value); - public abstract Integer locate(ExprValue value); - - public abstract ExprValue[] createBuckets(); - static class TimestampRounding extends Rounding { private final ExprValue interval; @@ -93,50 +84,8 @@ public TimestampRounding(ExprValue interval, String unit) { public ExprValue round(ExprValue var) { Instant instant = Instant.ofEpochMilli(dateTimeUnit.round(var.timestampValue() .toEpochMilli(), interval.integerValue())); - updateRounded(instant); return new ExprTimestampValue(instant); } - - @Override - public ExprValue[] createBuckets() { - if (dateTimeUnit.isMillisBased) { - int size = (int) ((maxRounded.toEpochMilli() - minRounded.toEpochMilli()) / (interval - .integerValue() * dateTimeUnit.ratio)) + 1; - return new ExprValue[size]; - } else { - ZonedDateTime maxZonedDateTime = maxRounded.atZone(ZoneId.of("UTC")); - ZonedDateTime minZonedDateTime = minRounded.atZone(ZoneId.of("UTC")); - int monthDiff = (maxZonedDateTime.getYear() - minZonedDateTime - .getYear()) * 12 + maxZonedDateTime.getMonthValue() - minZonedDateTime.getMonthValue(); - int size = monthDiff / ((int) dateTimeUnit.ratio * interval.integerValue()) + 1; - return new ExprValue[size]; - } - } - - @Override - public Integer locate(ExprValue value) { - if (dateTimeUnit.isMillisBased) { - long intervalInEpochMillis = dateTimeUnit.ratio; - return Long.valueOf((value.timestampValue() - .atZone(ZoneId.of("UTC")).toInstant().toEpochMilli() - minRounded - .atZone(ZoneId.of("UTC")).toInstant().toEpochMilli()) / (intervalInEpochMillis - * interval.integerValue())).intValue(); - } else { - int monthDiff = (value.dateValue().getYear() - minRounded.atZone(ZoneId.of("UTC")) - .getYear()) * 12 + value.dateValue().getMonthValue() - minRounded - .atZone(ZoneId.of("UTC")).getMonthValue(); - return (int) (monthDiff / (dateTimeUnit.ratio * interval.integerValue())); - } - } - - private void updateRounded(Instant value) { - if (maxRounded == null || value.isAfter(maxRounded)) { - maxRounded = value; - } - if (minRounded == null || value.isBefore(minRounded)) { - minRounded = value; - } - } } @@ -153,52 +102,8 @@ public DatetimeRounding(ExprValue interval, String unit) { public ExprValue round(ExprValue var) { Instant instant = Instant.ofEpochMilli(dateTimeUnit.round(var.datetimeValue() .atZone(ZoneId.of("UTC")).toInstant().toEpochMilli(), interval.integerValue())); - updateRounded(instant); return new ExprDatetimeValue(instant.atZone(ZoneId.of("UTC")).toLocalDateTime()); } - - @Override - public ExprValue[] createBuckets() { - if (dateTimeUnit.isMillisBased) { - int size = (int) ((maxRounded.atZone(ZoneId.of("UTC")).toInstant() - .toEpochMilli() - minRounded.atZone(ZoneId.of("UTC")).toInstant() - .toEpochMilli()) / (interval.integerValue() * dateTimeUnit.ratio)) + 1; - return new ExprValue[size]; - } else { - ZonedDateTime maxZonedDateTime = maxRounded.atZone(ZoneId.of("UTC")); - ZonedDateTime minZonedDateTime = minRounded.atZone(ZoneId.of("UTC")); - int monthDiff = (maxZonedDateTime.getYear() - minZonedDateTime - .getYear()) * 12 + maxZonedDateTime.getMonthValue() - minZonedDateTime.getMonthValue(); - int size = monthDiff / ((int) dateTimeUnit.ratio * interval.integerValue()) + 1; - return new ExprValue[size]; - } - } - - @Override - public Integer locate(ExprValue value) { - if (dateTimeUnit.isMillisBased) { - long intervalInEpochMillis = dateTimeUnit.ratio; - return Long.valueOf((value.datetimeValue() - .atZone(ZoneId.of("UTC")).toInstant().toEpochMilli() - minRounded - .atZone(ZoneId.of("UTC")).toInstant().toEpochMilli()) / (intervalInEpochMillis - * interval.integerValue())).intValue(); - } else { - int monthDiff = (value.datetimeValue().getYear() - minRounded.getYear()) * 12 - + value.dateValue().getMonthValue() - minRounded.getMonthValue(); - return (int) (monthDiff / (dateTimeUnit.ratio * interval.integerValue())); - } - } - - private void updateRounded(Instant value) { - if (maxRounded == null || value.isAfter(maxRounded - .atZone(ZoneId.of("UTC")).toInstant())) { - maxRounded = value.atZone(ZoneId.of("UTC")).toLocalDateTime(); - } - if (minRounded == null || value.isBefore(minRounded - .atZone(ZoneId.of("UTC")).toInstant())) { - minRounded = value.atZone(ZoneId.of("UTC")).toLocalDateTime(); - } - } } @@ -215,52 +120,8 @@ public DateRounding(ExprValue interval, String unit) { public ExprValue round(ExprValue var) { Instant instant = Instant.ofEpochMilli(dateTimeUnit.round(var.dateValue().atStartOfDay() .atZone(ZoneId.of("UTC")).toInstant().toEpochMilli(), interval.integerValue())); - updateRounded(instant); return new ExprDateValue(instant.atZone(ZoneId.of("UTC")).toLocalDate()); } - - @Override - public ExprValue[] createBuckets() { - if (dateTimeUnit.isMillisBased) { - int size = (int) ((maxRounded.atStartOfDay().atZone(ZoneId.of("UTC")).toInstant() - .toEpochMilli() - minRounded.atStartOfDay().atZone(ZoneId.of("UTC")).toInstant() - .toEpochMilli()) / (interval.integerValue() * dateTimeUnit.ratio)) + 1; - return new ExprValue[size]; - } else { - ZonedDateTime maxZonedDateTime = maxRounded.atStartOfDay().atZone(ZoneId.of("UTC")); - ZonedDateTime minZonedDateTime = minRounded.atStartOfDay().atZone(ZoneId.of("UTC")); - int monthDiff = (maxZonedDateTime.getYear() - minZonedDateTime - .getYear()) * 12 + maxZonedDateTime.getMonthValue() - minZonedDateTime.getMonthValue(); - int size = monthDiff / ((int) dateTimeUnit.ratio * interval.integerValue()) + 1; - return new ExprValue[size]; - } - } - - @Override - public Integer locate(ExprValue value) { - if (dateTimeUnit.isMillisBased) { - long intervalInEpochMillis = dateTimeUnit.ratio; - return Long.valueOf((value.dateValue().atStartOfDay() - .atZone(ZoneId.of("UTC")).toInstant().toEpochMilli() - minRounded.atStartOfDay() - .atZone(ZoneId.of("UTC")).toInstant().toEpochMilli()) / (intervalInEpochMillis - * interval.integerValue())).intValue(); - } else { - int monthDiff = (value.dateValue().getYear() - minRounded.getYear()) * 12 - + value.dateValue().getMonthValue() - minRounded.getMonthValue(); - return (int) (monthDiff / (dateTimeUnit.ratio * interval.integerValue())); - } - } - - private void updateRounded(Instant value) { - if (maxRounded == null || value.isAfter(maxRounded.atStartOfDay() - .atZone(ZoneId.of("UTC")).toInstant())) { - maxRounded = value.atZone(ZoneId.of("UTC")).toLocalDate(); - } - if (minRounded == null || value.isBefore(minRounded.atStartOfDay() - .atZone(ZoneId.of("UTC")).toInstant())) { - minRounded = value.atZone(ZoneId.of("UTC")).toLocalDate(); - } - } } static class TimeRounding extends Rounding { @@ -281,39 +142,8 @@ public ExprValue round(ExprValue var) { Instant instant = Instant.ofEpochMilli(dateTimeUnit.round(var.timeValue().getLong( ChronoField.MILLI_OF_DAY), interval.integerValue())); - updateRounded(instant); return new ExprTimeValue(instant.atZone(ZoneId.of("UTC")).toLocalTime()); } - - @Override - public ExprValue[] createBuckets() { - // local time is converted to timestamp on 1970-01-01 for aggregations - int size = (int) ((maxRounded.atDate(LocalDate.of(1970, 1, 1)) - .atZone(ZoneId.of("UTC")).toInstant().toEpochMilli() - minRounded - .atDate(LocalDate.of(1970, 1, 1)).atZone(ZoneId.of("UTC")).toInstant() - .toEpochMilli()) / (interval.integerValue() * dateTimeUnit.ratio)) + 1; - return new ExprValue[size]; - } - - @Override - public Integer locate(ExprValue value) { - long intervalInEpochMillis = dateTimeUnit.ratio; - return Long.valueOf((value.timeValue().atDate(LocalDate.of(1970, 1, 1)) - .atZone(ZoneId.of("UTC")).toInstant().toEpochMilli() - minRounded - .atDate(LocalDate.of(1970, 1, 1)) - .atZone(ZoneId.of("UTC")).toInstant().toEpochMilli()) / (intervalInEpochMillis * interval - .integerValue())).intValue(); - } - - private void updateRounded(Instant value) { - if (maxRounded == null || value.isAfter(maxRounded.atDate(LocalDate.of(1970, 1, 1)) - .atZone(ZoneId.of("UTC")).toInstant())) { - maxRounded = value.atZone(ZoneId.of("UTC")).toLocalTime(); - } - if (minRounded == null) { - minRounded = value.atZone(ZoneId.of("UTC")).toLocalTime(); - } - } } @@ -327,29 +157,8 @@ protected LongRounding(ExprValue interval) { @Override public ExprValue round(ExprValue value) { long rounded = Math.floorDiv(value.longValue(), longInterval) * longInterval; - updateRounded(rounded); return ExprValueUtils.longValue(rounded); } - - @Override - public Integer locate(ExprValue value) { - return Long.valueOf((value.longValue() - minRounded) / longInterval).intValue(); - } - - @Override - public ExprValue[] createBuckets() { - int size = Long.valueOf((maxRounded - minRounded) / longInterval).intValue() + 1; - return new ExprValue[size]; - } - - private void updateRounded(Long value) { - if (maxRounded == null || value > maxRounded) { - maxRounded = value; - } - if (minRounded == null || value < minRounded) { - minRounded = value; - } - } } @@ -364,29 +173,8 @@ protected DoubleRounding(ExprValue interval) { public ExprValue round(ExprValue value) { double rounded = Double .valueOf(value.doubleValue() / doubleInterval).intValue() * doubleInterval; - updateRounded(rounded); return ExprValueUtils.doubleValue(rounded); } - - @Override - public Integer locate(ExprValue value) { - return Double.valueOf((value.doubleValue() - minRounded) / doubleInterval).intValue(); - } - - @Override - public ExprValue[] createBuckets() { - int size = Double.valueOf((maxRounded - minRounded) / doubleInterval).intValue() + 1; - return new ExprValue[size]; - } - - private void updateRounded(Double value) { - if (maxRounded == null || value > maxRounded) { - maxRounded = value; - } - if (minRounded == null || value < minRounded) { - minRounded = value; - } - } } @@ -396,16 +184,6 @@ static class UnknownRounding extends Rounding { public ExprValue round(ExprValue var) { return null; } - - @Override - public Integer locate(ExprValue value) { - return null; - } - - @Override - public ExprValue[] createBuckets() { - return new ExprValue[0]; - } } diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/collector/RoundingTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/collector/RoundingTest.java index 41b3ea5d6b..f40e5c058b 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/collector/RoundingTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/collector/RoundingTest.java @@ -5,16 +5,13 @@ package org.opensearch.sql.planner.physical.collector; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.opensearch.sql.data.type.ExprCoreType.STRING; import static org.opensearch.sql.data.type.ExprCoreType.TIME; -import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; import org.opensearch.sql.data.model.ExprTimeValue; -import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.model.ExprValueUtils; import org.opensearch.sql.exception.ExpressionEvaluationException; import org.opensearch.sql.expression.DSL; @@ -34,8 +31,6 @@ void round_unknown_type() { SpanExpression span = DSL.span(DSL.ref("unknown", STRING), DSL.literal(1), ""); Rounding rounding = Rounding.createRounding(span); assertNull(rounding.round(ExprValueUtils.integerValue(1))); - assertNull(rounding.locate(ExprValueUtils.integerValue(1))); - assertEquals(0, rounding.createBuckets().length); } @Test From ebba041697620fc11a81671953ba22bc402c9cd9 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 3 Nov 2022 15:36:08 -0700 Subject: [PATCH 6/8] Fix broken UT Signed-off-by: Chen Dai --- .../opensearch/sql/expression/span/SpanExpression.java | 3 ++- .../sql/expression/span/SpanExpressionTest.java | 10 ++-------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/expression/span/SpanExpression.java b/core/src/main/java/org/opensearch/sql/expression/span/SpanExpression.java index e9cd0803f6..f7d9561e4f 100644 --- a/core/src/main/java/org/opensearch/sql/expression/span/SpanExpression.java +++ b/core/src/main/java/org/opensearch/sql/expression/span/SpanExpression.java @@ -27,6 +27,7 @@ public class SpanExpression implements Expression { /** * Rounding. */ + @ToString.Exclude private final Rounding rounding; /** @@ -36,7 +37,7 @@ public SpanExpression(Expression field, Expression value, SpanUnit unit) { this.field = field; this.value = value; this.unit = unit; - this.rounding = Rounding.createRounding(this); + this.rounding = Rounding.createRounding(this); // TODO: will integrate with WindowAssigner } @Override diff --git a/core/src/test/java/org/opensearch/sql/expression/span/SpanExpressionTest.java b/core/src/test/java/org/opensearch/sql/expression/span/SpanExpressionTest.java index a554a93153..f311c0147b 100644 --- a/core/src/test/java/org/opensearch/sql/expression/span/SpanExpressionTest.java +++ b/core/src/test/java/org/opensearch/sql/expression/span/SpanExpressionTest.java @@ -8,7 +8,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE; import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; -import static org.opensearch.sql.data.type.ExprCoreType.TIMESTAMP; import org.junit.jupiter.api.DisplayNameGeneration; import org.junit.jupiter.api.DisplayNameGenerator; @@ -20,22 +19,17 @@ @DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) public class SpanExpressionTest extends ExpressionTestBase { @Test - void span() { + void testSpanByNumeric() { SpanExpression span = DSL.span(DSL.ref("integer_value", INTEGER), DSL.literal(1), ""); assertEquals(INTEGER, span.type()); assertEquals(ExprValueUtils.integerValue(1), span.valueOf(valueEnv())); span = DSL.span(DSL.ref("integer_value", INTEGER), DSL.literal(1.5), ""); assertEquals(DOUBLE, span.type()); - assertEquals(ExprValueUtils.doubleValue(1.5), span.valueOf(valueEnv())); + assertEquals(ExprValueUtils.doubleValue(0.0), span.valueOf(valueEnv())); span = DSL.span(DSL.ref("double_value", DOUBLE), DSL.literal(1), ""); assertEquals(DOUBLE, span.type()); assertEquals(ExprValueUtils.doubleValue(1.0), span.valueOf(valueEnv())); - - span = DSL.span(DSL.ref("timestamp_value", TIMESTAMP), DSL.literal(1), "d"); - assertEquals(TIMESTAMP, span.type()); - assertEquals(ExprValueUtils.integerValue(1), span.valueOf(valueEnv())); } - } From 805a8c6994e0e7be681883ab4cd96e5cc016f1e1 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 3 Nov 2022 16:16:03 -0700 Subject: [PATCH 7/8] Update javadoc Signed-off-by: Chen Dai --- .../org/opensearch/sql/expression/span/SpanExpression.java | 2 +- .../sql/planner/physical/collector/BucketCollector.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/expression/span/SpanExpression.java b/core/src/main/java/org/opensearch/sql/expression/span/SpanExpression.java index f7d9561e4f..3c5a2dc14f 100644 --- a/core/src/main/java/org/opensearch/sql/expression/span/SpanExpression.java +++ b/core/src/main/java/org/opensearch/sql/expression/span/SpanExpression.java @@ -25,7 +25,7 @@ public class SpanExpression implements Expression { private final SpanUnit unit; /** - * Rounding. + * Rounding that generates span starting point. */ @ToString.Exclude private final Rounding rounding; diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/collector/BucketCollector.java b/core/src/main/java/org/opensearch/sql/planner/physical/collector/BucketCollector.java index 1f49457a78..a00fd3cdf3 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/collector/BucketCollector.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/collector/BucketCollector.java @@ -38,7 +38,8 @@ public class BucketCollector implements Collector { private final Supplier supplier; /** - * Map between bucketKey and collector in the bucket. + * Map from bucketKey to nested collector sorted by key to make sure + * final result is in order after traversal. */ private final Map collectorMap = new TreeMap<>(); From 6511f613b3189f07861f997adc6e5f514e86e876 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 7 Nov 2022 13:12:05 -0800 Subject: [PATCH 8/8] Fix broken UT after merge Signed-off-by: Chen Dai --- .../opensearch/sql/expression/span/SpanExpression.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/expression/span/SpanExpression.java b/core/src/main/java/org/opensearch/sql/expression/span/SpanExpression.java index 3c5a2dc14f..aff114145e 100644 --- a/core/src/main/java/org/opensearch/sql/expression/span/SpanExpression.java +++ b/core/src/main/java/org/opensearch/sql/expression/span/SpanExpression.java @@ -24,12 +24,6 @@ public class SpanExpression implements Expression { private final Expression value; private final SpanUnit unit; - /** - * Rounding that generates span starting point. - */ - @ToString.Exclude - private final Rounding rounding; - /** * Construct a span expression by field and span interval expression. */ @@ -37,11 +31,11 @@ public SpanExpression(Expression field, Expression value, SpanUnit unit) { this.field = field; this.value = value; this.unit = unit; - this.rounding = Rounding.createRounding(this); // TODO: will integrate with WindowAssigner } @Override public ExprValue valueOf(Environment valueEnv) { + Rounding rounding = Rounding.createRounding(this); //TODO: will integrate with WindowAssigner return rounding.round(field.valueOf(valueEnv)); }