diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index ea8658e4d5f0..5c6534f2dc2b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -110,6 +110,7 @@ import org.codehaus.janino.ScriptEvaluator; import org.joda.time.DateTime; import org.joda.time.Instant; +import org.joda.time.base.AbstractInstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -632,8 +633,10 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType) case BOOLEAN: return Expressions.convert_(value, Boolean.class); case DATETIME: + // AbstractInstant handles both joda Instant and DateTime return nullOr( - value, Expressions.call(Expressions.convert_(value, DateTime.class), "getMillis")); + value, + Expressions.call(Expressions.convert_(value, AbstractInstant.class), "getMillis")); case BYTES: return nullOr( value, Expressions.new_(ByteString.class, Expressions.convert_(value, byte[].class))); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java index a98732cba282..a5f78f715293 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java @@ -23,6 +23,7 @@ import java.time.LocalTime; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; @@ -43,6 +44,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Ignore; @@ -349,6 +351,44 @@ public void testNestedArrayOfBytes() { pipeline.run(); } + @Test + public void testNestedDatetime() { + List dateTimes = + ImmutableList.of(Instant.EPOCH, Instant.ofEpochSecond(10000), Instant.now()); + List nullDateTimes = Lists.newArrayList(Instant.EPOCH, null, Instant.now()); + + Schema nestedInputSchema = + Schema.of( + Schema.Field.of("c_dts", Schema.FieldType.array(Schema.FieldType.DATETIME)), + Schema.Field.of( + "c_null_dts", + Schema.FieldType.array(Schema.FieldType.DATETIME.withNullable(true)))); + Schema inputSchema = + Schema.of(Schema.Field.of("nested", Schema.FieldType.row(nestedInputSchema))); + + Schema outputSchema = + Schema.of( + Schema.Field.of("f0", Schema.FieldType.DATETIME), + Schema.Field.of("f1", Schema.FieldType.DATETIME.withNullable(true))); + + Row nestedRow = + Row.withSchema(nestedInputSchema).addValue(dateTimes).addValue(nullDateTimes).build(); + Row row = Row.withSchema(inputSchema).addValue(nestedRow).build(); + Row expected = + Row.withSchema(outputSchema).addValues(dateTimes.get(1), nullDateTimes.get(1)).build(); + + PCollection result = + pipeline + .apply(Create.of(row).withRowSchema(inputSchema)) + .apply( + SqlTransform.query( + "SELECT t.nested.c_dts[2], t.nested.c_null_dts[2] AS f0 FROM PCOLLECTION t")); + + PAssert.that(result).containsInAnyOrder(expected); + + pipeline.run(); + } + @Test public void testRowConstructor() { BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);