Skip to content

Commit

Permalink
Merge pull request #144 from Bit-Quill/dev-datetime-aggreg
Browse files Browse the repository at this point in the history
Datetime aggregation fixes
  • Loading branch information
Yury-Fridlyand authored Nov 9, 2022
2 parents b56edc7 + 31a72d4 commit 4a9dfda
Show file tree
Hide file tree
Showing 15 changed files with 733 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
Expand Down Expand Up @@ -67,7 +66,7 @@ public LocalDateTime datetimeValue() {

@Override
public Instant timestampValue() {
return ZonedDateTime.of(date, timeValue(), ZoneId.systemDefault()).toInstant();
return ZonedDateTime.of(date, timeValue(), ExprTimestampValue.ZONE).toInstant();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
Expand Down Expand Up @@ -71,7 +70,7 @@ public LocalTime timeValue() {

@Override
public Instant timestampValue() {
return ZonedDateTime.of(datetime, ZoneId.of("UTC")).toInstant();
return ZonedDateTime.of(datetime, ExprTimestampValue.ZONE).toInstant();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class ExprTimestampValue extends AbstractExprValue {
/**
* todo. only support UTC now.
*/
private static final ZoneId ZONE = ZoneId.of("UTC");
public static final ZoneId ZONE = ZoneId.of("UTC");

private final Instant timestamp;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
* SPDX-License-Identifier: Apache-2.0
*/


package org.opensearch.sql.data.model;

import static java.time.temporal.ChronoUnit.MILLIS;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -177,4 +181,50 @@ public static Map<String, ExprValue> getTupleValue(ExprValue exprValue) {
public static Boolean getBooleanValue(ExprValue exprValue) {
return exprValue.booleanValue();
}

/**
* Convert a datetime value to milliseconds since Epoch.
* @param value A value.
* @return Milliseconds since Epoch.
*/
public static long extractEpochMilliFromAnyDateTimeType(ExprValue value) {
switch ((ExprCoreType)value.type()) {
case TIME:
// workaround for session context issue
// TODO remove once fixed
return MILLIS.between(LocalTime.MIN, value.timeValue());
case DATE:
case DATETIME:
case TIMESTAMP:
return value.timestampValue().toEpochMilli();
default:
throw new IllegalArgumentException(
String.format("Not a datetime type: %s", value.type()));
}
}

/**
* Convert milliseconds since Epoch to a datetime value of the given type.
* @param value Milliseconds since Epoch.
* @param type A type of the resulting value requested.
* @return A datetime value.
*/
public static ExprValue convertEpochMilliToDateTimeType(long value, ExprCoreType type) {
// Construct value the same way it is extracted
var ts = new ExprTimestampValue(Instant.ofEpochMilli(value));
switch (type) {
case DATE:
return new ExprDateValue(ts.dateValue());
case DATETIME:
return new ExprDatetimeValue(ts.datetimeValue());
case TIMESTAMP:
return ts;
case TIME:
// TODO update once session context issue fixed
return new ExprTimeValue(LocalTime.MIN.plus(value, MILLIS));
default:
throw new IllegalArgumentException(
String.format("Not a datetime type: %s", type));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ private static DefaultFunctionResolver avg() {
new ImmutableMap.Builder<FunctionSignature, FunctionBuilder>()
.put(new FunctionSignature(functionName, Collections.singletonList(DOUBLE)),
arguments -> new AvgAggregator(arguments, DOUBLE))
.put(new FunctionSignature(functionName, Collections.singletonList(DATE)),
arguments -> new AvgAggregator(arguments, DATE))
.put(new FunctionSignature(functionName, Collections.singletonList(DATETIME)),
arguments -> new AvgAggregator(arguments, DATETIME))
.put(new FunctionSignature(functionName, Collections.singletonList(TIME)),
arguments -> new AvgAggregator(arguments, TIME))
.put(new FunctionSignature(functionName, Collections.singletonList(TIMESTAMP)),
arguments -> new AvgAggregator(arguments, TIMESTAMP))
.build()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import java.util.List;
import java.util.Locale;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.data.model.ExprNullValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.model.ExprValueUtils;
Expand All @@ -23,20 +24,36 @@
*/
public class AvgAggregator extends Aggregator<AvgAggregator.AvgState> {

/**
* To process by different ways different data types, we need to store the type.
* Input data has the same type as the result.
*/
private final ExprCoreType dataType;

public AvgAggregator(List<Expression> arguments, ExprCoreType returnType) {
super(BuiltinFunctionName.AVG.getName(), arguments, returnType);
dataType = returnType;
}

@Override
public AvgState create() {
return new AvgState();
switch (dataType) {
case DATE:
case DATETIME:
case TIMESTAMP:
case TIME:
return new DateTimeAvgState(dataType);
case DOUBLE:
return new DoubleAvgState();
default: //unreachable code - we don't expose signatures for unsupported types
throw new IllegalArgumentException(
String.format("avg aggregation over %s type is not supported", dataType));
}
}

@Override
protected AvgState iterate(ExprValue value, AvgState state) {
state.count++;
state.total += ExprValueUtils.getDoubleValue(value);
return state;
return state.iterate(value);
}

@Override
Expand All @@ -47,18 +64,56 @@ public String toString() {
/**
* Average State.
*/
protected static class AvgState implements AggregationState {
private int count;
private double total;
protected abstract static class AvgState implements AggregationState {
protected int count;
protected double total;

AvgState() {
this.count = 0;
this.total = 0d;
}

@Override
public abstract ExprValue result();

protected AvgState iterate(ExprValue value) {
count++;
return this;
}
}

protected static class DoubleAvgState extends AvgState {
@Override
public ExprValue result() {
return count == 0 ? ExprNullValue.of() : ExprValueUtils.doubleValue(total / count);
if (count == 0) {
return ExprNullValue.of();
}
return ExprValueUtils.doubleValue(total / count);
}

@Override
protected AvgState iterate(ExprValue value) {
total += ExprValueUtils.getDoubleValue(value);
return super.iterate(value);
}
}

@RequiredArgsConstructor
protected static class DateTimeAvgState extends AvgState {
private final ExprCoreType dataType;

@Override
public ExprValue result() {
if (count == 0) {
return ExprNullValue.of();
}
return ExprValueUtils.convertEpochMilliToDateTimeType(Math.round(total / count), dataType);
}

@Override
protected AvgState iterate(ExprValue value) {
total += ExprValueUtils.extractEpochMilliFromAnyDateTimeType(value);
return super.iterate(value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import org.junit.jupiter.api.Test;
import org.opensearch.sql.exception.ExpressionEvaluationException;
Expand Down Expand Up @@ -43,7 +42,7 @@ public void timestampValueInterfaceTest() {

assertEquals(TIMESTAMP, timestampValue.type());
assertEquals(ZonedDateTime.of(LocalDateTime.parse("2020-07-07T01:01:01"),
ZoneId.of("UTC")).toInstant(), timestampValue.timestampValue());
ExprTimestampValue.ZONE).toInstant(), timestampValue.timestampValue());
assertEquals("2020-07-07 01:01:01", timestampValue.value());
assertEquals("TIMESTAMP '2020-07-07 01:01:01'", timestampValue.toString());
assertEquals(LocalDate.parse("2020-07-07"), timestampValue.dateValue());
Expand All @@ -61,7 +60,7 @@ public void dateValueInterfaceTest() {
assertEquals(LocalTime.parse("00:00:00"), dateValue.timeValue());
assertEquals(LocalDateTime.parse("2012-07-07T00:00:00"), dateValue.datetimeValue());
assertEquals(ZonedDateTime.of(LocalDateTime.parse("2012-07-07T00:00:00"),
ZoneId.systemDefault()).toInstant(), dateValue.timestampValue());
ExprTimestampValue.ZONE).toInstant(), dateValue.timestampValue());
ExpressionEvaluationException exception =
assertThrows(ExpressionEvaluationException.class, () -> integerValue(1).dateValue());
assertEquals("invalid to get dateValue from value of type INTEGER",
Expand All @@ -76,7 +75,7 @@ public void datetimeValueInterfaceTest() {
assertEquals(LocalDate.parse("2020-08-17"), datetimeValue.dateValue());
assertEquals(LocalTime.parse("19:44:00"), datetimeValue.timeValue());
assertEquals(ZonedDateTime.of(LocalDateTime.parse("2020-08-17T19:44:00"),
ZoneId.of("UTC")).toInstant(), datetimeValue.timestampValue());
ExprTimestampValue.ZONE).toInstant(), datetimeValue.timestampValue());
assertEquals("DATETIME '2020-08-17 19:44:00'", datetimeValue.toString());
assertThrows(ExpressionEvaluationException.class, () -> integerValue(1).datetimeValue(),
"invalid to get datetimeValue from value of type INTEGER");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.opensearch.sql.data.model.ExprValueUtils.convertEpochMilliToDateTimeType;
import static org.opensearch.sql.data.model.ExprValueUtils.extractEpochMilliFromAnyDateTimeType;
import static org.opensearch.sql.data.model.ExprValueUtils.integerValue;
import static org.opensearch.sql.data.type.ExprCoreType.ARRAY;
import static org.opensearch.sql.data.type.ExprCoreType.BOOLEAN;
Expand Down Expand Up @@ -257,4 +259,101 @@ public void hashCodeTest() {
assertEquals(new ExprTimestampValue("2012-08-07 18:00:00").hashCode(),
new ExprTimestampValue("2012-08-07 18:00:00").hashCode());
}

private static Stream<Arguments> getMillisForConversionTest() {
return Stream.of(
Arguments.of(42L),
Arguments.of(-12345442000L),
Arguments.of(100500L),
Arguments.of(123456789L)
);
}

/**
* Check that `DATETIME` and `TIMESTAMP` could be converted to and from milliseconds since Epoch.
* @param sample A test value (milliseconds since Epoch).
*/
@ParameterizedTest
@MethodSource("getMillisForConversionTest")
public void checkDateTimeConversionToMillisAndBack(long sample) {
for (var type : List.of(DATETIME, TIMESTAMP)) {
var value = convertEpochMilliToDateTimeType(sample, type);
assertEquals(type, value.type());
var extracted = extractEpochMilliFromAnyDateTimeType(value);
assertEquals(sample, extracted, type.toString());
}
}

private final long millisInDay = 24 * 60 * 60 * 1000;

/**
* Check that `TIME` could be converted to and from milliseconds since Epoch.
* @param sample A test value (milliseconds since Epoch).
*/
@ParameterizedTest
@MethodSource("getMillisForConversionTest")
public void checkTimeConversionToMillisAndBack(long sample) {
var value = convertEpochMilliToDateTimeType(sample, TIME);
assertEquals(TIME, value.type());
var extracted = extractEpochMilliFromAnyDateTimeType(value);
// time value goes around 24h, for negative (pre-epoch) values we need to shift down one day.
if (sample < 0) {
assertEquals((sample % millisInDay) + millisInDay, extracted, TIME.toString());
} else {
assertEquals(sample % millisInDay, extracted, TIME.toString());
}
}

/**
* Check that `DATE` could be converted to and from milliseconds since Epoch.
* @param sample A test value (milliseconds since Epoch).
*/
@ParameterizedTest
@MethodSource("getMillisForConversionTest")
public void checkDateConversionToMillisAndBack(long sample) {
var value = convertEpochMilliToDateTimeType(sample, DATE);
assertEquals(DATE, value.type());
var extracted = extractEpochMilliFromAnyDateTimeType(value);
// date value floored by 24h, for negative (pre-epoch) values we need to shift down one day.
if (sample < 0) {
assertEquals((sample - millisInDay) / millisInDay * millisInDay, extracted, DATE.toString());
} else {
assertEquals((sample / millisInDay) * millisInDay, extracted, DATE.toString());
}
}

/**
* Check that conversion function reject all non-datetime types.
* @param sample A test value (milliseconds since Epoch).
*/
@ParameterizedTest
@MethodSource("getMillisForConversionTest")
public void checkExceptionThrownOnUnsupportedTypeConversion(long sample) {
var types = ExprCoreType.coreTypes();
types.removeAll(List.of(DATE, DATETIME, TIMESTAMP, TIME));
for (var type : types) {
var exception = assertThrows(IllegalArgumentException.class,
() -> convertEpochMilliToDateTimeType(sample, type));
assertEquals(String.format("Not a datetime type: %s", type), exception.getMessage());
}
}

private static Stream<Arguments> getNonDateTimeValues() {
var types = List.of(DATE, DATETIME, TIMESTAMP, TIME);
return getValueTestArgumentStream()
.filter(args -> !types.contains(((ExprValue)args.get()[0]).type()))
.map(args -> Arguments.of(args.get()[0]));
}

/**
* Check that conversion function reject all non-datetime types.
* @param value A test value.
*/
@ParameterizedTest(name = "the value of ExprValue:{0} is: {2} ")
@MethodSource("getNonDateTimeValues")
public void checkExceptionThrownOnUnsupportedTypeExtraction(ExprValue value) {
var exception = assertThrows(IllegalArgumentException.class,
() -> extractEpochMilliFromAnyDateTimeType(value));
assertEquals(String.format("Not a datetime type: %s", value.type()), exception.getMessage());
}
}
Loading

0 comments on commit 4a9dfda

Please sign in to comment.