From 31a72d408962bc903964997c2e19a1fb5592a624 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Tue, 25 Oct 2022 17:28:31 -0700 Subject: [PATCH] Update aggregation to support datetime types. * Add aggregator fixes and some useless unit tests. * Add `avg` aggregation on datetime types. * Rework in-memory `AVG`. Fix parsing value returned from the OpenSearch node. Signed-off-by: Yury-Fridlyand --- .../sql/data/model/ExprDateValue.java | 3 +- .../sql/data/model/ExprDatetimeValue.java | 3 +- .../sql/data/model/ExprTimestampValue.java | 2 +- .../sql/data/model/ExprValueUtils.java | 52 ++- .../aggregation/AggregatorFunction.java | 8 + .../expression/aggregation/AvgAggregator.java | 71 +++- .../sql/data/model/DateTimeValueTest.java | 7 +- .../sql/data/model/ExprValueUtilsTest.java | 99 +++++ .../aggregation/AvgAggregatorTest.java | 52 +++ docs/user/dql/aggregations.rst | 8 +- .../org/opensearch/sql/sql/AggregationIT.java | 354 +++++++++++++++++- .../value/OpenSearchExprValueFactory.java | 33 +- .../ExpressionAggregationScript.java | 24 +- .../value/OpenSearchExprValueFactoryTest.java | 3 + .../ExpressionAggregationScriptTest.java | 55 +++ 15 files changed, 733 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/data/model/ExprDateValue.java b/core/src/main/java/org/opensearch/sql/data/model/ExprDateValue.java index 09b2e56b44..5707987797 100644 --- a/core/src/main/java/org/opensearch/sql/data/model/ExprDateValue.java +++ b/core/src/main/java/org/opensearch/sql/data/model/ExprDateValue.java @@ -11,7 +11,6 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; -import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; @@ -67,7 +66,7 @@ public LocalDateTime datetimeValue() { @Override public Instant timestampValue() { - return ZonedDateTime.of(date, timeValue(), ZoneId.systemDefault()).toInstant(); + return ZonedDateTime.of(date, timeValue(), ExprTimestampValue.ZONE).toInstant(); } @Override diff --git a/core/src/main/java/org/opensearch/sql/data/model/ExprDatetimeValue.java b/core/src/main/java/org/opensearch/sql/data/model/ExprDatetimeValue.java index f5f80f133f..628106b048 100644 --- a/core/src/main/java/org/opensearch/sql/data/model/ExprDatetimeValue.java +++ b/core/src/main/java/org/opensearch/sql/data/model/ExprDatetimeValue.java @@ -11,7 +11,6 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; -import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; @@ -71,7 +70,7 @@ public LocalTime timeValue() { @Override public Instant timestampValue() { - return ZonedDateTime.of(datetime, ZoneId.of("UTC")).toInstant(); + return ZonedDateTime.of(datetime, ExprTimestampValue.ZONE).toInstant(); } @Override diff --git a/core/src/main/java/org/opensearch/sql/data/model/ExprTimestampValue.java b/core/src/main/java/org/opensearch/sql/data/model/ExprTimestampValue.java index 219a4c2663..a7ae605a7f 100644 --- a/core/src/main/java/org/opensearch/sql/data/model/ExprTimestampValue.java +++ b/core/src/main/java/org/opensearch/sql/data/model/ExprTimestampValue.java @@ -30,7 +30,7 @@ public class ExprTimestampValue extends AbstractExprValue { /** * todo. only support UTC now. */ - private static final ZoneId ZONE = ZoneId.of("UTC"); + public static final ZoneId ZONE = ZoneId.of("UTC"); private final Instant timestamp; diff --git a/core/src/main/java/org/opensearch/sql/data/model/ExprValueUtils.java b/core/src/main/java/org/opensearch/sql/data/model/ExprValueUtils.java index 407b6df5b3..9e00ba1e5f 100644 --- a/core/src/main/java/org/opensearch/sql/data/model/ExprValueUtils.java +++ b/core/src/main/java/org/opensearch/sql/data/model/ExprValueUtils.java @@ -3,9 +3,13 @@ * SPDX-License-Identifier: Apache-2.0 */ - package org.opensearch.sql.data.model; +import static java.time.temporal.ChronoUnit.MILLIS; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.time.temporal.TemporalAmount; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -177,4 +181,50 @@ public static Map getTupleValue(ExprValue exprValue) { public static Boolean getBooleanValue(ExprValue exprValue) { return exprValue.booleanValue(); } + + /** + * Convert a datetime value to milliseconds since Epoch. + * @param value A value. + * @return Milliseconds since Epoch. + */ + public static long extractEpochMilliFromAnyDateTimeType(ExprValue value) { + switch ((ExprCoreType)value.type()) { + case TIME: + // workaround for session context issue + // TODO remove once fixed + return MILLIS.between(LocalTime.MIN, value.timeValue()); + case DATE: + case DATETIME: + case TIMESTAMP: + return value.timestampValue().toEpochMilli(); + default: + throw new IllegalArgumentException( + String.format("Not a datetime type: %s", value.type())); + } + } + + /** + * Convert milliseconds since Epoch to a datetime value of the given type. + * @param value Milliseconds since Epoch. + * @param type A type of the resulting value requested. + * @return A datetime value. + */ + public static ExprValue convertEpochMilliToDateTimeType(long value, ExprCoreType type) { + // Construct value the same way it is extracted + var ts = new ExprTimestampValue(Instant.ofEpochMilli(value)); + switch (type) { + case DATE: + return new ExprDateValue(ts.dateValue()); + case DATETIME: + return new ExprDatetimeValue(ts.datetimeValue()); + case TIMESTAMP: + return ts; + case TIME: + // TODO update once session context issue fixed + return new ExprTimeValue(LocalTime.MIN.plus(value, MILLIS)); + default: + throw new IllegalArgumentException( + String.format("Not a datetime type: %s", type)); + } + } } diff --git a/core/src/main/java/org/opensearch/sql/expression/aggregation/AggregatorFunction.java b/core/src/main/java/org/opensearch/sql/expression/aggregation/AggregatorFunction.java index 9fbf1557aa..1f743de95e 100644 --- a/core/src/main/java/org/opensearch/sql/expression/aggregation/AggregatorFunction.java +++ b/core/src/main/java/org/opensearch/sql/expression/aggregation/AggregatorFunction.java @@ -69,6 +69,14 @@ private static DefaultFunctionResolver avg() { new ImmutableMap.Builder() .put(new FunctionSignature(functionName, Collections.singletonList(DOUBLE)), arguments -> new AvgAggregator(arguments, DOUBLE)) + .put(new FunctionSignature(functionName, Collections.singletonList(DATE)), + arguments -> new AvgAggregator(arguments, DATE)) + .put(new FunctionSignature(functionName, Collections.singletonList(DATETIME)), + arguments -> new AvgAggregator(arguments, DATETIME)) + .put(new FunctionSignature(functionName, Collections.singletonList(TIME)), + arguments -> new AvgAggregator(arguments, TIME)) + .put(new FunctionSignature(functionName, Collections.singletonList(TIMESTAMP)), + arguments -> new AvgAggregator(arguments, TIMESTAMP)) .build() ); } diff --git a/core/src/main/java/org/opensearch/sql/expression/aggregation/AvgAggregator.java b/core/src/main/java/org/opensearch/sql/expression/aggregation/AvgAggregator.java index cadfdee87d..4678aa256c 100644 --- a/core/src/main/java/org/opensearch/sql/expression/aggregation/AvgAggregator.java +++ b/core/src/main/java/org/opensearch/sql/expression/aggregation/AvgAggregator.java @@ -10,6 +10,7 @@ import java.util.List; import java.util.Locale; +import lombok.RequiredArgsConstructor; import org.opensearch.sql.data.model.ExprNullValue; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.model.ExprValueUtils; @@ -23,20 +24,36 @@ */ public class AvgAggregator extends Aggregator { + /** + * To process by different ways different data types, we need to store the type. + * Input data has the same type as the result. + */ + private final ExprCoreType dataType; + public AvgAggregator(List arguments, ExprCoreType returnType) { super(BuiltinFunctionName.AVG.getName(), arguments, returnType); + dataType = returnType; } @Override public AvgState create() { - return new AvgState(); + switch (dataType) { + case DATE: + case DATETIME: + case TIMESTAMP: + case TIME: + return new DateTimeAvgState(dataType); + case DOUBLE: + return new DoubleAvgState(); + default: //unreachable code - we don't expose signatures for unsupported types + throw new IllegalArgumentException( + String.format("avg aggregation over %s type is not supported", dataType)); + } } @Override protected AvgState iterate(ExprValue value, AvgState state) { - state.count++; - state.total += ExprValueUtils.getDoubleValue(value); - return state; + return state.iterate(value); } @Override @@ -47,18 +64,56 @@ public String toString() { /** * Average State. */ - protected static class AvgState implements AggregationState { - private int count; - private double total; + protected abstract static class AvgState implements AggregationState { + protected int count; + protected double total; AvgState() { this.count = 0; this.total = 0d; } + @Override + public abstract ExprValue result(); + + protected AvgState iterate(ExprValue value) { + count++; + return this; + } + } + + protected static class DoubleAvgState extends AvgState { @Override public ExprValue result() { - return count == 0 ? ExprNullValue.of() : ExprValueUtils.doubleValue(total / count); + if (count == 0) { + return ExprNullValue.of(); + } + return ExprValueUtils.doubleValue(total / count); + } + + @Override + protected AvgState iterate(ExprValue value) { + total += ExprValueUtils.getDoubleValue(value); + return super.iterate(value); + } + } + + @RequiredArgsConstructor + protected static class DateTimeAvgState extends AvgState { + private final ExprCoreType dataType; + + @Override + public ExprValue result() { + if (count == 0) { + return ExprNullValue.of(); + } + return ExprValueUtils.convertEpochMilliToDateTimeType(Math.round(total / count), dataType); + } + + @Override + protected AvgState iterate(ExprValue value) { + total += ExprValueUtils.extractEpochMilliFromAnyDateTimeType(value); + return super.iterate(value); } } } diff --git a/core/src/test/java/org/opensearch/sql/data/model/DateTimeValueTest.java b/core/src/test/java/org/opensearch/sql/data/model/DateTimeValueTest.java index 3a7df17d90..dd541dd69b 100644 --- a/core/src/test/java/org/opensearch/sql/data/model/DateTimeValueTest.java +++ b/core/src/test/java/org/opensearch/sql/data/model/DateTimeValueTest.java @@ -15,7 +15,6 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; -import java.time.ZoneId; import java.time.ZonedDateTime; import org.junit.jupiter.api.Test; import org.opensearch.sql.exception.ExpressionEvaluationException; @@ -43,7 +42,7 @@ public void timestampValueInterfaceTest() { assertEquals(TIMESTAMP, timestampValue.type()); assertEquals(ZonedDateTime.of(LocalDateTime.parse("2020-07-07T01:01:01"), - ZoneId.of("UTC")).toInstant(), timestampValue.timestampValue()); + ExprTimestampValue.ZONE).toInstant(), timestampValue.timestampValue()); assertEquals("2020-07-07 01:01:01", timestampValue.value()); assertEquals("TIMESTAMP '2020-07-07 01:01:01'", timestampValue.toString()); assertEquals(LocalDate.parse("2020-07-07"), timestampValue.dateValue()); @@ -61,7 +60,7 @@ public void dateValueInterfaceTest() { assertEquals(LocalTime.parse("00:00:00"), dateValue.timeValue()); assertEquals(LocalDateTime.parse("2012-07-07T00:00:00"), dateValue.datetimeValue()); assertEquals(ZonedDateTime.of(LocalDateTime.parse("2012-07-07T00:00:00"), - ZoneId.systemDefault()).toInstant(), dateValue.timestampValue()); + ExprTimestampValue.ZONE).toInstant(), dateValue.timestampValue()); ExpressionEvaluationException exception = assertThrows(ExpressionEvaluationException.class, () -> integerValue(1).dateValue()); assertEquals("invalid to get dateValue from value of type INTEGER", @@ -76,7 +75,7 @@ public void datetimeValueInterfaceTest() { assertEquals(LocalDate.parse("2020-08-17"), datetimeValue.dateValue()); assertEquals(LocalTime.parse("19:44:00"), datetimeValue.timeValue()); assertEquals(ZonedDateTime.of(LocalDateTime.parse("2020-08-17T19:44:00"), - ZoneId.of("UTC")).toInstant(), datetimeValue.timestampValue()); + ExprTimestampValue.ZONE).toInstant(), datetimeValue.timestampValue()); assertEquals("DATETIME '2020-08-17 19:44:00'", datetimeValue.toString()); assertThrows(ExpressionEvaluationException.class, () -> integerValue(1).datetimeValue(), "invalid to get datetimeValue from value of type INTEGER"); diff --git a/core/src/test/java/org/opensearch/sql/data/model/ExprValueUtilsTest.java b/core/src/test/java/org/opensearch/sql/data/model/ExprValueUtilsTest.java index 3c3f5a2c24..18be998d78 100644 --- a/core/src/test/java/org/opensearch/sql/data/model/ExprValueUtilsTest.java +++ b/core/src/test/java/org/opensearch/sql/data/model/ExprValueUtilsTest.java @@ -10,6 +10,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.opensearch.sql.data.model.ExprValueUtils.convertEpochMilliToDateTimeType; +import static org.opensearch.sql.data.model.ExprValueUtils.extractEpochMilliFromAnyDateTimeType; import static org.opensearch.sql.data.model.ExprValueUtils.integerValue; import static org.opensearch.sql.data.type.ExprCoreType.ARRAY; import static org.opensearch.sql.data.type.ExprCoreType.BOOLEAN; @@ -257,4 +259,101 @@ public void hashCodeTest() { assertEquals(new ExprTimestampValue("2012-08-07 18:00:00").hashCode(), new ExprTimestampValue("2012-08-07 18:00:00").hashCode()); } + + private static Stream getMillisForConversionTest() { + return Stream.of( + Arguments.of(42L), + Arguments.of(-12345442000L), + Arguments.of(100500L), + Arguments.of(123456789L) + ); + } + + /** + * Check that `DATETIME` and `TIMESTAMP` could be converted to and from milliseconds since Epoch. + * @param sample A test value (milliseconds since Epoch). + */ + @ParameterizedTest + @MethodSource("getMillisForConversionTest") + public void checkDateTimeConversionToMillisAndBack(long sample) { + for (var type : List.of(DATETIME, TIMESTAMP)) { + var value = convertEpochMilliToDateTimeType(sample, type); + assertEquals(type, value.type()); + var extracted = extractEpochMilliFromAnyDateTimeType(value); + assertEquals(sample, extracted, type.toString()); + } + } + + private final long millisInDay = 24 * 60 * 60 * 1000; + + /** + * Check that `TIME` could be converted to and from milliseconds since Epoch. + * @param sample A test value (milliseconds since Epoch). + */ + @ParameterizedTest + @MethodSource("getMillisForConversionTest") + public void checkTimeConversionToMillisAndBack(long sample) { + var value = convertEpochMilliToDateTimeType(sample, TIME); + assertEquals(TIME, value.type()); + var extracted = extractEpochMilliFromAnyDateTimeType(value); + // time value goes around 24h, for negative (pre-epoch) values we need to shift down one day. + if (sample < 0) { + assertEquals((sample % millisInDay) + millisInDay, extracted, TIME.toString()); + } else { + assertEquals(sample % millisInDay, extracted, TIME.toString()); + } + } + + /** + * Check that `DATE` could be converted to and from milliseconds since Epoch. + * @param sample A test value (milliseconds since Epoch). + */ + @ParameterizedTest + @MethodSource("getMillisForConversionTest") + public void checkDateConversionToMillisAndBack(long sample) { + var value = convertEpochMilliToDateTimeType(sample, DATE); + assertEquals(DATE, value.type()); + var extracted = extractEpochMilliFromAnyDateTimeType(value); + // date value floored by 24h, for negative (pre-epoch) values we need to shift down one day. + if (sample < 0) { + assertEquals((sample - millisInDay) / millisInDay * millisInDay, extracted, DATE.toString()); + } else { + assertEquals((sample / millisInDay) * millisInDay, extracted, DATE.toString()); + } + } + + /** + * Check that conversion function reject all non-datetime types. + * @param sample A test value (milliseconds since Epoch). + */ + @ParameterizedTest + @MethodSource("getMillisForConversionTest") + public void checkExceptionThrownOnUnsupportedTypeConversion(long sample) { + var types = ExprCoreType.coreTypes(); + types.removeAll(List.of(DATE, DATETIME, TIMESTAMP, TIME)); + for (var type : types) { + var exception = assertThrows(IllegalArgumentException.class, + () -> convertEpochMilliToDateTimeType(sample, type)); + assertEquals(String.format("Not a datetime type: %s", type), exception.getMessage()); + } + } + + private static Stream getNonDateTimeValues() { + var types = List.of(DATE, DATETIME, TIMESTAMP, TIME); + return getValueTestArgumentStream() + .filter(args -> !types.contains(((ExprValue)args.get()[0]).type())) + .map(args -> Arguments.of(args.get()[0])); + } + + /** + * Check that conversion function reject all non-datetime types. + * @param value A test value. + */ + @ParameterizedTest(name = "the value of ExprValue:{0} is: {2} ") + @MethodSource("getNonDateTimeValues") + public void checkExceptionThrownOnUnsupportedTypeExtraction(ExprValue value) { + var exception = assertThrows(IllegalArgumentException.class, + () -> extractEpochMilliFromAnyDateTimeType(value)); + assertEquals(String.format("Not a datetime type: %s", value.type()), exception.getMessage()); + } } diff --git a/core/src/test/java/org/opensearch/sql/expression/aggregation/AvgAggregatorTest.java b/core/src/test/java/org/opensearch/sql/expression/aggregation/AvgAggregatorTest.java index 93d327257f..27b965a204 100644 --- a/core/src/test/java/org/opensearch/sql/expression/aggregation/AvgAggregatorTest.java +++ b/core/src/test/java/org/opensearch/sql/expression/aggregation/AvgAggregatorTest.java @@ -9,9 +9,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.opensearch.sql.data.type.ExprCoreType.DATETIME; import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE; import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; +import static org.opensearch.sql.data.type.ExprCoreType.TIMESTAMP; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.List; import org.junit.jupiter.api.Test; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.model.ExprValueUtils; @@ -62,6 +69,43 @@ public void avg_with_all_missing_or_null() { assertTrue(result.isNull()); } + @Test + public void avg_numeric_no_values() { + ExprValue result = aggregation(dsl.avg(DSL.ref("dummy", INTEGER)), List.of()); + assertTrue(result.isNull()); + } + + @Test + public void avg_datetime_no_values() { + ExprValue result = aggregation(dsl.avg(DSL.ref("dummy", DATETIME)), List.of()); + assertTrue(result.isNull()); + } + + @Test + public void avg_date() { + ExprValue result = aggregation(dsl.avg(dsl.date(DSL.ref("date_value", STRING))), tuples); + assertEquals(LocalDate.of(2007, 7, 2), result.dateValue()); + } + + @Test + public void avg_datetime() { + var result = aggregation(dsl.avg(dsl.datetime(DSL.ref("datetime_value", STRING))), tuples); + assertEquals(LocalDateTime.of(2012, 7, 2, 3, 30), result.datetimeValue()); + } + + @Test + public void avg_time() { + ExprValue result = aggregation(dsl.avg(dsl.time(DSL.ref("time_value", STRING))), tuples); + assertEquals(LocalTime.of(9, 30), result.timeValue()); + } + + @Test + public void avg_timestamp() { + var result = aggregation(dsl.avg(dsl.timestamp(DSL.ref("timestamp_value", STRING))), tuples); + assertEquals(TIMESTAMP, result.type()); + assertEquals(LocalDateTime.of(2012, 7, 2, 3, 30), result.datetimeValue()); + } + @Test public void valueOf() { ExpressionEvaluationException exception = assertThrows(ExpressionEvaluationException.class, @@ -69,6 +113,14 @@ public void valueOf() { assertEquals("can't evaluate on aggregator: avg", exception.getMessage()); } + @Test + public void avg_on_unsupported_type() { + var aggregator = new AvgAggregator(List.of(DSL.ref("string", STRING)), STRING); + var exception = assertThrows(IllegalArgumentException.class, + () -> aggregator.create()); + assertEquals("avg aggregation over STRING type is not supported", exception.getMessage()); + } + @Test public void test_to_string() { Aggregator avgAggregator = dsl.avg(DSL.ref("integer_value", INTEGER)); diff --git a/docs/user/dql/aggregations.rst b/docs/user/dql/aggregations.rst index 275666e7ba..3c97db80d0 100644 --- a/docs/user/dql/aggregations.rst +++ b/docs/user/dql/aggregations.rst @@ -163,7 +163,7 @@ SUM Description >>>>>>>>>>> -Usage: SUM(expr). Returns the sum of expr. +Usage: SUM(expr). Returns the sum of `expr`. `expr` could be of any of the numeric data types. Example:: @@ -182,7 +182,7 @@ AVG Description >>>>>>>>>>> -Usage: AVG(expr). Returns the average value of expr. +Usage: AVG(expr). Returns the average value of `expr`. `expr` could be of any of the numeric and datetime data types. Datetime aggregation perfomed with milliseconds precision. Example:: @@ -201,7 +201,7 @@ MAX Description >>>>>>>>>>> -Usage: MAX(expr). Returns the maximum value of expr. +Usage: MAX(expr). Returns the maximum value of `expr`. `expr` could be of any of the numeric and datetime data types. Datetime aggregation perfomed with milliseconds precision. Example:: @@ -219,7 +219,7 @@ MIN Description >>>>>>>>>>> -Usage: MIN(expr). Returns the minimum value of expr. +Usage: MIN(expr). Returns the minimum value of `expr`. `expr` could be of any of the numeric and datetime data types. Datetime aggregation perfomed with milliseconds precision. Example:: diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/AggregationIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/AggregationIT.java index 95f5b5e3e4..32170c578f 100644 --- a/integ-test/src/test/java/org/opensearch/sql/sql/AggregationIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/sql/AggregationIT.java @@ -162,7 +162,7 @@ public void testInMemoryAggregationOnAllValuesAndOnNotNullReturnsSameResult() th } @Test - public void testPushDownAggregationOnNullValuesReturnsNull() throws IOException { + public void testPushDownAggregationOnNullNumericValuesReturnsNull() throws IOException { var response = executeQuery(String.format("SELECT " + "max(int0), min(int0), avg(int0) from %s where int0 IS NULL;", TEST_INDEX_CALCS)); verifySchema(response, @@ -172,6 +172,61 @@ public void testPushDownAggregationOnNullValuesReturnsNull() throws IOException verifyDataRows(response, rows(null, null, null)); } + @Test + public void testPushDownAggregationOnNullDateTimeValuesFromTableReturnsNull() throws IOException { + var response = executeQuery(String.format("SELECT " + + "max(datetime1), min(datetime1), avg(datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("max(datetime1)", null, "timestamp"), + schema("min(datetime1)", null, "timestamp"), + schema("avg(datetime1)", null, "timestamp")); + verifyDataRows(response, rows(null, null, null)); + } + + @Test + public void testPushDownAggregationOnNullDateValuesReturnsNull() throws IOException { + var response = executeQuery(String.format("SELECT " + + "max(CAST(NULL AS date)), min(CAST(NULL AS date)), avg(CAST(NULL AS date)) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("max(CAST(NULL AS date))", null, "date"), + schema("min(CAST(NULL AS date))", null, "date"), + schema("avg(CAST(NULL AS date))", null, "date")); + verifyDataRows(response, rows(null, null, null)); + } + + @Test + public void testPushDownAggregationOnNullTimeValuesReturnsNull() throws IOException { + var response = executeQuery(String.format("SELECT " + + "max(CAST(NULL AS time)), min(CAST(NULL AS time)), avg(CAST(NULL AS time)) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("max(CAST(NULL AS time))", null, "time"), + schema("min(CAST(NULL AS time))", null, "time"), + schema("avg(CAST(NULL AS time))", null, "time")); + verifyDataRows(response, rows(null, null, null)); + } + + @Test + public void testPushDownAggregationOnNullTimeStampValuesReturnsNull() throws IOException { + var response = executeQuery(String.format("SELECT " + + "max(CAST(NULL AS timestamp)), min(CAST(NULL AS timestamp)), avg(CAST(NULL AS timestamp)) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("max(CAST(NULL AS timestamp))", null, "timestamp"), + schema("min(CAST(NULL AS timestamp))", null, "timestamp"), + schema("avg(CAST(NULL AS timestamp))", null, "timestamp")); + verifyDataRows(response, rows(null, null, null)); + } + + @Test + public void testPushDownAggregationOnNullDateTimeValuesReturnsNull() throws IOException { + var response = executeQuery(String.format("SELECT " + + "max(datetime(NULL)), min(datetime(NULL)), avg(datetime(NULL)) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("max(datetime(NULL))", null, "datetime"), + schema("min(datetime(NULL))", null, "datetime"), + schema("avg(datetime(NULL))", null, "datetime")); + verifyDataRows(response, rows(null, null, null)); + } + @Test public void testPushDownAggregationOnAllValuesAndOnNotNullReturnsSameResult() throws IOException { var responseNotNulls = executeQuery(String.format("SELECT " @@ -225,6 +280,303 @@ public void testPushDownAndInMemoryAggregationReturnTheSameResult() throws IOExc } } + public void testMinIntegerPushedDown() throws IOException { + var response = executeQuery(String.format("SELECT min(int2)" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("min(int2)", null, "integer")); + verifyDataRows(response, rows(-9)); + } + + @Test + public void testMaxIntegerPushedDown() throws IOException { + var response = executeQuery(String.format("SELECT max(int2)" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("max(int2)", null, "integer")); + verifyDataRows(response, rows(9)); + } + + @Test + public void testAvgIntegerPushedDown() throws IOException { + var response = executeQuery(String.format("SELECT avg(int2)" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("avg(int2)", null, "double")); + verifyDataRows(response, rows(-0.8235294117647058D)); + } + + @Test + public void testMinDoublePushedDown() throws IOException { + var response = executeQuery(String.format("SELECT min(num3)" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("min(num3)", null, "double")); + verifyDataRows(response, rows(-19.96D)); + } + + @Test + public void testMaxDoublePushedDown() throws IOException { + var response = executeQuery(String.format("SELECT max(num3)" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("max(num3)", null, "double")); + verifyDataRows(response, rows(12.93D)); + } + + @Test + public void testAvgDoublePushedDown() throws IOException { + var response = executeQuery(String.format("SELECT avg(num3)" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("avg(num3)", null, "double")); + verifyDataRows(response, rows(-6.12D)); + } + + @Test + public void testMinIntegerInMemory() throws IOException { + var response = executeQuery(String.format("SELECT min(int2)" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("min(int2) OVER(PARTITION BY datetime1)", null, "integer")); + verifySome(response.getJSONArray("datarows"), rows(-9)); + } + + @Test + public void testMaxIntegerInMemory() throws IOException { + var response = executeQuery(String.format("SELECT max(int2)" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("max(int2) OVER(PARTITION BY datetime1)", null, "integer")); + verifySome(response.getJSONArray("datarows"), rows(9)); + } + + @Test + public void testAvgIntegerInMemory() throws IOException { + var response = executeQuery(String.format("SELECT avg(int2)" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("avg(int2) OVER(PARTITION BY datetime1)", null, "double")); + verifySome(response.getJSONArray("datarows"), rows(-0.8235294117647058D)); + } + + @Test + public void testMinDoubleInMemory() throws IOException { + var response = executeQuery(String.format("SELECT min(num3)" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("min(num3) OVER(PARTITION BY datetime1)", null, "double")); + verifySome(response.getJSONArray("datarows"), rows(-19.96D)); + } + + @Test + public void testMaxDoubleInMemory() throws IOException { + var response = executeQuery(String.format("SELECT max(num3)" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("max(num3) OVER(PARTITION BY datetime1)", null, "double")); + verifySome(response.getJSONArray("datarows"), rows(12.93D)); + } + + @Test + public void testAvgDoubleInMemory() throws IOException { + var response = executeQuery(String.format("SELECT avg(num3)" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("avg(num3) OVER(PARTITION BY datetime1)", null, "double")); + verifySome(response.getJSONArray("datarows"), rows(-6.12D)); + } + + @Test + public void testMaxDatePushedDown() throws IOException { + var response = executeQuery(String.format("SELECT max(CAST(date0 AS date))" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("max(CAST(date0 AS date))", null, "date")); + verifyDataRows(response, rows("2004-06-19")); + } + + @Test + public void testAvgDatePushedDown() throws IOException { + var response = executeQuery(String.format("SELECT avg(CAST(date0 AS date))" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("avg(CAST(date0 AS date))", null, "date")); + verifyDataRows(response, rows("1992-04-23")); + } + + @Test + public void testMinDateTimePushedDown() throws IOException { + var response = executeQuery(String.format("SELECT min(datetime(CAST(time0 AS STRING)))" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("min(datetime(CAST(time0 AS STRING)))", null, "datetime")); + verifyDataRows(response, rows("1899-12-30 21:07:32")); + } + + @Test + public void testMaxDateTimePushedDown() throws IOException { + var response = executeQuery(String.format("SELECT max(datetime(CAST(time0 AS STRING)))" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("max(datetime(CAST(time0 AS STRING)))", null, "datetime")); + verifyDataRows(response, rows("1900-01-01 20:36:00")); + } + + @Test + public void testAvgDateTimePushedDown() throws IOException { + var response = executeQuery(String.format("SELECT avg(datetime(CAST(time0 AS STRING)))" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("avg(datetime(CAST(time0 AS STRING)))", null, "datetime")); + verifyDataRows(response, rows("1900-01-01 03:35:00.236")); + } + + @Test + public void testMinTimePushedDown() throws IOException { + var response = executeQuery(String.format("SELECT min(CAST(time1 AS time))" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("min(CAST(time1 AS time))", null, "time")); + verifyDataRows(response, rows("00:05:57")); + } + + @Test + public void testMaxTimePushedDown() throws IOException { + var response = executeQuery(String.format("SELECT max(CAST(time1 AS time))" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("max(CAST(time1 AS time))", null, "time")); + verifyDataRows(response, rows("22:50:16")); + } + + @Test + public void testAvgTimePushedDown() throws IOException { + var response = executeQuery(String.format("SELECT avg(CAST(time1 AS time))" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("avg(CAST(time1 AS time))", null, "time")); + verifyDataRows(response, rows("13:06:36.25")); + } + + @Test + public void testMinTimeStampPushedDown() throws IOException { + var response = executeQuery(String.format("SELECT min(CAST(datetime0 AS timestamp))" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("min(CAST(datetime0 AS timestamp))", null, "timestamp")); + verifyDataRows(response, rows("2004-07-04 22:49:28")); + } + + @Test + public void testMaxTimeStampPushedDown() throws IOException { + var response = executeQuery(String.format("SELECT max(CAST(datetime0 AS timestamp))" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("max(CAST(datetime0 AS timestamp))", null, "timestamp")); + verifyDataRows(response, rows("2004-08-02 07:59:23")); + } + + @Test + public void testAvgTimeStampPushedDown() throws IOException { + var response = executeQuery(String.format("SELECT avg(CAST(datetime0 AS timestamp))" + + " from %s", TEST_INDEX_CALCS)); + verifySchema(response, schema("avg(CAST(datetime0 AS timestamp))", null, "timestamp")); + verifyDataRows(response, rows("2004-07-20 10:38:09.705")); + } + + @Test + public void testMinDateInMemory() throws IOException { + var response = executeQuery(String.format("SELECT min(CAST(date0 AS date))" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("min(CAST(date0 AS date)) OVER(PARTITION BY datetime1)", null, "date")); + verifySome(response.getJSONArray("datarows"), rows("1972-07-04")); + } + + @Test + public void testMaxDateInMemory() throws IOException { + var response = executeQuery(String.format("SELECT max(CAST(date0 AS date))" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("max(CAST(date0 AS date)) OVER(PARTITION BY datetime1)", null, "date")); + verifySome(response.getJSONArray("datarows"), rows("2004-06-19")); + } + + @Test + public void testAvgDateInMemory() throws IOException { + var response = executeQuery(String.format("SELECT avg(CAST(date0 AS date))" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("avg(CAST(date0 AS date)) OVER(PARTITION BY datetime1)", null, "date")); + verifySome(response.getJSONArray("datarows"), rows("1992-04-23")); + } + + @Test + public void testMinDateTimeInMemory() throws IOException { + var response = executeQuery(String.format("SELECT min(datetime(CAST(time0 AS STRING)))" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("min(datetime(CAST(time0 AS STRING))) OVER(PARTITION BY datetime1)", null, "datetime")); + verifySome(response.getJSONArray("datarows"), rows("1899-12-30 21:07:32")); + } + + @Test + public void testMaxDateTimeInMemory() throws IOException { + var response = executeQuery(String.format("SELECT max(datetime(CAST(time0 AS STRING)))" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("max(datetime(CAST(time0 AS STRING))) OVER(PARTITION BY datetime1)", null, "datetime")); + verifySome(response.getJSONArray("datarows"), rows("1900-01-01 20:36:00")); + } + + @Test + public void testAvgDateTimeInMemory() throws IOException { + var response = executeQuery(String.format("SELECT avg(datetime(CAST(time0 AS STRING)))" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("avg(datetime(CAST(time0 AS STRING))) OVER(PARTITION BY datetime1)", null, "datetime")); + verifySome(response.getJSONArray("datarows"), rows("1900-01-01 03:35:00.235")); + } + + @Test + public void testMinTimeInMemory() throws IOException { + var response = executeQuery(String.format("SELECT min(CAST(time1 AS time))" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("min(CAST(time1 AS time)) OVER(PARTITION BY datetime1)", null, "time")); + verifySome(response.getJSONArray("datarows"), rows("00:05:57")); + } + + @Test + public void testMaxTimeInMemory() throws IOException { + var response = executeQuery(String.format("SELECT max(CAST(time1 AS time))" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("max(CAST(time1 AS time)) OVER(PARTITION BY datetime1)", null, "time")); + verifySome(response.getJSONArray("datarows"), rows("22:50:16")); + } + + @Test + public void testAvgTimeInMemory() throws IOException { + var response = executeQuery(String.format("SELECT avg(CAST(time1 AS time))" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("avg(CAST(time1 AS time)) OVER(PARTITION BY datetime1)", null, "time")); + verifySome(response.getJSONArray("datarows"), rows("13:06:36.25")); + } + + @Test + public void testMinTimeStampInMemory() throws IOException { + var response = executeQuery(String.format("SELECT min(CAST(datetime0 AS timestamp))" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("min(CAST(datetime0 AS timestamp)) OVER(PARTITION BY datetime1)", null, "timestamp")); + verifySome(response.getJSONArray("datarows"), rows("2004-07-04 22:49:28")); + } + + @Test + public void testMaxTimeStampInMemory() throws IOException { + var response = executeQuery(String.format("SELECT max(CAST(datetime0 AS timestamp))" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("max(CAST(datetime0 AS timestamp)) OVER(PARTITION BY datetime1)", null, "timestamp")); + verifySome(response.getJSONArray("datarows"), rows("2004-08-02 07:59:23")); + } + + @Test + public void testAvgTimeStampInMemory() throws IOException { + var response = executeQuery(String.format("SELECT avg(CAST(datetime0 AS timestamp))" + + " OVER(PARTITION BY datetime1) from %s", TEST_INDEX_CALCS)); + verifySchema(response, + schema("avg(CAST(datetime0 AS timestamp)) OVER(PARTITION BY datetime1)", null, "timestamp")); + verifySome(response.getJSONArray("datarows"), rows("2004-07-20 10:38:09.706")); + } + protected JSONObject executeQuery(String query) throws IOException { Request request = new Request("POST", QUERY_API_ENDPOINT); request.setJsonEntity(String.format(Locale.ROOT, "{\n" + " \"query\": \"%s\"\n" + "}", query)); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java index 2536121e91..15d2073b11 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java @@ -175,27 +175,26 @@ private Optional type(String field) { * https://www.elastic.co/guide/en/elasticsearch/reference/current/date_nanos.html * The customized date_format is not supported. */ - private ExprValue constructTimestamp(String value) { - try { - return new ExprTimestampValue( - // Using OpenSearch DateFormatters for now. - DateFormatters.from(DATE_TIME_FORMATTER.parse(value)).toInstant()); - } catch (DateTimeParseException e) { - throw new IllegalStateException( - String.format( - "Construct ExprTimestampValue from \"%s\" failed, unsupported date format.", value), - e); - } - } - private ExprValue parseTimestamp(Content value) { + if (value.isString()) { + // value may contain epoch millis as a string, trying to extract it + try { + return parseTimestamp(new ObjectContent(Long.parseLong(value.stringValue()))); + } catch (NumberFormatException ignored) { /* nothing to do, try another format */ } + try { + return new ExprTimestampValue( + // Using OpenSearch DateFormatters for now. + DateFormatters.from(DATE_TIME_FORMATTER.parse(value.stringValue())).toInstant()); + } catch (DateTimeParseException e) { + throw new IllegalStateException(String.format( + "Construct ExprTimestampValue from \"%s\" failed, unsupported date format.", + value.stringValue()), e); + } + } if (value.isNumber()) { return new ExprTimestampValue(Instant.ofEpochMilli(value.longValue())); - } else if (value.isString()) { - return constructTimestamp(value.stringValue()); - } else { - return new ExprTimestampValue((Instant) value.objectValue()); } + return new ExprTimestampValue((Instant) value.objectValue()); } private ExprValue parseStruct(Content content, String prefix) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/aggregation/ExpressionAggregationScript.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/aggregation/ExpressionAggregationScript.java index 6236d7bb32..b64ec6373e 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/aggregation/ExpressionAggregationScript.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/aggregation/ExpressionAggregationScript.java @@ -6,6 +6,11 @@ package org.opensearch.sql.opensearch.storage.script.aggregation; +import static java.time.temporal.ChronoUnit.MILLIS; + +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.Map; import lombok.EqualsAndHashCode; import org.apache.lucene.index.LeafReaderContext; @@ -13,8 +18,10 @@ import org.opensearch.search.lookup.SearchLookup; import org.opensearch.sql.data.model.ExprNullValue; import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.expression.Expression; import org.opensearch.sql.expression.env.Environment; +import org.opensearch.sql.opensearch.data.type.OpenSearchDataType; import org.opensearch.sql.opensearch.storage.script.core.ExpressionScript; /** @@ -42,7 +49,22 @@ public ExpressionAggregationScript( @Override public Object execute() { - return expressionScript.execute(this::getDoc, this::evaluateExpression).value(); + var expr = expressionScript.execute(this::getDoc, this::evaluateExpression); + if (expr.type() instanceof OpenSearchDataType) { + return expr.value(); + } + switch ((ExprCoreType)expr.type()) { + case TIME: + // workaround for session context issue + // TODO remove once fixed + return MILLIS.between(LocalTime.MIN, expr.timeValue()); + case DATE: + case DATETIME: + case TIMESTAMP: + return expr.timestampValue().toEpochMilli(); + default: + return expr.value(); + } } private ExprValue evaluateExpression(Expression expression, Environment