From 176ea297225f44814c7d4c901e3b602a5dc4cbda Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 8 Oct 2024 02:41:36 +0300 Subject: [PATCH 01/14] support timestamp, time, date types --- .../IO_Iceberg_Integration_Tests.json | 2 +- CHANGES.md | 1 + .../io/iceberg/hive/IcebergHiveCatalogIT.java | 8 ++ .../beam/sdk/io/iceberg/IcebergUtils.java | 76 +++++++++++++++++-- .../beam/sdk/io/iceberg/IcebergIOIT.java | 8 ++ .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 69 ++++++++++++++--- 6 files changed, 146 insertions(+), 18 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 3f63c0c9975f2..bbdc3a3910ef8 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } diff --git a/CHANGES.md b/CHANGES.md index be4e0ba4d0f6c..64abde787a7ba 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ## New Features / Improvements diff --git a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java index 54a4998d37fba..5745bc549af9e 100644 --- a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java +++ b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.io.iceberg.hive.testutils.HiveMetastoreExtension; import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -64,6 +65,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.util.DateTimeUtil; import org.apache.thrift.TException; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -100,6 +102,9 @@ public class IcebergHiveCatalogIT { .addArrayField("arr_long", Schema.FieldType.INT64) .addRowField("row", NESTED_ROW_SCHEMA) .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) + .addLogicalTypeField("datetime", SqlTypes.DATETIME) + .addLogicalTypeField("date", SqlTypes.DATE) + .addLogicalTypeField("time", SqlTypes.TIME) .build(); private static final SimpleFunction ROW_FUNC = @@ -127,6 +132,9 @@ public Row apply(Long num) { .addValue(LongStream.range(1, num % 10).boxed().collect(Collectors.toList())) .addValue(nestedRow) .addValue(num % 2 == 0 ? null : nestedRow) + .addValue(DateTimeUtil.timestampFromMicros(num)) + .addValue(DateTimeUtil.dateFromDays(Integer.parseInt(strNum))) + .addValue(DateTimeUtil.timeFromMicros(num)) .build(); } }; diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index acd9b25a6a5e3..74194d8c93700 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -20,12 +20,16 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -34,9 +38,11 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SerializableFunction; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; +import org.joda.time.Instant; /** Utilities for converting between Beam and Iceberg types. */ public class IcebergUtils { @@ -54,6 +60,14 @@ private IcebergUtils() {} .put(Schema.TypeName.DOUBLE, Types.DoubleType.get()) .put(Schema.TypeName.STRING, Types.StringType.get()) .put(Schema.TypeName.BYTES, Types.BinaryType.get()) + .put(Schema.TypeName.DATETIME, Types.TimestampType.withoutZone()) + .build(); + + private static final Map BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES = + ImmutableMap.builder() + .put(SqlTypes.DATE.getIdentifier(), Types.DateType.get()) + .put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get()) + .put(SqlTypes.DATETIME.getIdentifier(), Types.TimestampType.withoutZone()) .build(); private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) { @@ -69,9 +83,15 @@ private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) { case DOUBLE: return Schema.FieldType.DOUBLE; case DATE: + return Schema.FieldType.logicalType(SqlTypes.DATE); case TIME: - case TIMESTAMP: // TODO: Logical types? - return Schema.FieldType.DATETIME; + return Schema.FieldType.logicalType(SqlTypes.TIME); + case TIMESTAMP: + // Beam has two 'DATETIME' types: SqlTypes.DATETIME and Schema.FieldType.DATETIME, but + // we have to choose one type to output. + // The former uses the `java.time` library and the latter uses the `org.joda.time` library + // Iceberg API leans towards `java.time`, so we output the SqlTypes.DATETIME logical type + return Schema.FieldType.logicalType(SqlTypes.DATETIME); case STRING: return Schema.FieldType.STRING; case UUID: @@ -151,6 +171,14 @@ static TypeAndMaxId beamFieldTypeToIcebergFieldType( // other types. return new TypeAndMaxId( --nestedFieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName())); + } else if (beamType.getTypeName().isLogicalType()) { + String logicalTypeIdentifier = + checkArgumentNotNull(beamType.getLogicalType()).getIdentifier(); + @Nullable Type type = BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES.get(logicalTypeIdentifier); + if (type == null) { + throw new RuntimeException("Unsupported Beam logical type " + logicalTypeIdentifier); + } + return new TypeAndMaxId(--nestedFieldId, type); } else if (beamType.getTypeName().isCollectionType()) { // ARRAY or ITERABLE Schema.FieldType beamCollectionType = Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()); @@ -282,12 +310,24 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row Optional.ofNullable(value.getDouble(name)).ifPresent(v -> rec.setField(name, v)); break; case DATE: - throw new UnsupportedOperationException("Date fields not yet supported"); + Optional.ofNullable(value.getLogicalTypeValue(name, LocalDate.class)) + .ifPresent(v -> rec.setField(name, v)); + break; case TIME: - throw new UnsupportedOperationException("Time fields not yet supported"); + Optional.ofNullable(value.getLogicalTypeValue(name, LocalTime.class)) + .ifPresent(v -> rec.setField(name, v)); + break; case TIMESTAMP: - Optional.ofNullable(value.getDateTime(name)) - .ifPresent(v -> rec.setField(name, v.getMillis())); + Object val = value.getValue(name); + if (val instanceof Instant) { // case Schema.FieldType.DATETIME + rec.setField(name, ((Instant) val).getMillis()); + } else if (val instanceof LocalDateTime) { // case SqlTypes.DATETIME + rec.setField(name, val); + } else { + String invalidType = val == null ? "unknown" : val.getClass().toString(); + throw new IllegalStateException( + "Unexpected Beam type for Iceberg TIMESTAMP: " + invalidType); + } break; case STRING: Optional.ofNullable(value.getString(name)).ifPresent(v -> rec.setField(name, v)); @@ -322,6 +362,15 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row } } + static final Map> LOGICAL_TYPE_CONVERTERS = + ImmutableMap.>builder() + .put(SqlTypes.DATE.getIdentifier(), val -> SqlTypes.DATE.toBaseType((LocalDate) val)) + .put(SqlTypes.TIME.getIdentifier(), val -> SqlTypes.TIME.toBaseType((LocalTime) val)) + .put( + SqlTypes.DATETIME.getIdentifier(), + val -> SqlTypes.DATETIME.toBaseType((LocalDateTime) val)) + .build(); + /** Converts an Iceberg {@link Record} to a Beam {@link Row}. */ public static Row icebergRecordToBeamRow(Schema schema, Record record) { Row.Builder rowBuilder = Row.withSchema(schema); @@ -369,8 +418,19 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { rowBuilder.addValue(icebergRecordToBeamRow(nestedSchema, nestedRecord)); break; case LOGICAL_TYPE: - throw new UnsupportedOperationException( - "Cannot convert iceberg field to Beam logical type"); + Schema.LogicalType logicalType = field.getType().getLogicalType(); + if (logicalType == null) { + throw new RuntimeException("Unexpected null Beam logical type " + field.getType()); + } + @Nullable + SerializableFunction converter = + LOGICAL_TYPE_CONVERTERS.get(logicalType.getIdentifier()); + if (converter == null) { + throw new RuntimeException( + "Unsupported Beam logical type " + logicalType.getIdentifier()); + } + rowBuilder.addValue(icebergValue); + break; default: throw new UnsupportedOperationException( "Unsupported Beam type: " + field.getType().getTypeName()); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index 84f2146275f03..9ff506a68a215 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -35,6 +35,7 @@ import java.util.stream.Stream; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -71,6 +72,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.util.DateTimeUtil; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -109,6 +111,9 @@ public class IcebergIOIT implements Serializable { .addArrayField("arr_long", org.apache.beam.sdk.schemas.Schema.FieldType.INT64) .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) .addNullableInt64Field("nullable_long") + .addLogicalTypeField("datetime", SqlTypes.DATETIME) + .addLogicalTypeField("date", SqlTypes.DATE) + .addLogicalTypeField("time", SqlTypes.TIME) .build(); private static final SimpleFunction ROW_FUNC = @@ -138,6 +143,9 @@ public Row apply(Long num) { .addValue(LongStream.range(0, num % 10).boxed().collect(Collectors.toList())) .addValue(num % 2 == 0 ? null : nestedRow) .addValue(num) + .addValue(DateTimeUtil.timestampFromMicros(num)) + .addValue(DateTimeUtil.dateFromDays(Integer.parseInt(strNum))) + .addValue(DateTimeUtil.timeFromMicros(num)) .build(); } }; diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index a20d5b7c8f59a..aa429cba1f871 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -32,12 +32,14 @@ import java.util.List; import java.util.Map; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Test; @@ -102,10 +104,20 @@ public void testDouble() { } @Test - public void testDate() {} + public void testDate() { + checkRowValueToRecordValue( + Schema.FieldType.logicalType(SqlTypes.DATE), + Types.DateType.get(), + DateTimeUtil.dateFromDays(12345)); + } @Test - public void testTime() {} + public void testTime() { + checkRowValueToRecordValue( + Schema.FieldType.logicalType(SqlTypes.TIME), + Types.TimeType.get(), + DateTimeUtil.timeFromMicros(12345678L)); + } @Test public void testTimestamp() { @@ -117,6 +129,11 @@ public void testTimestamp() { dateTime.toInstant(), Types.TimestampType.withoutZone(), dateTime.getMillis()); + + checkRowValueToRecordValue( + Schema.FieldType.logicalType(SqlTypes.DATETIME), + Types.TimestampType.withoutZone(), + DateTimeUtil.timestampFromMicros(123456789L)); } @Test @@ -190,7 +207,7 @@ private void checkRecordValueToRowValue( Row row = IcebergUtils.icebergRecordToBeamRow(beamSchema, record); - assertThat(row.getBaseValue("v"), equalTo(destValue)); + assertThat(row.getValue("v"), equalTo(destValue)); } @Test @@ -224,10 +241,20 @@ public void testDouble() { } @Test - public void testDate() {} + public void testDate() { + checkRecordValueToRowValue( + Types.DateType.get(), + Schema.FieldType.logicalType(SqlTypes.DATE), + DateTimeUtil.dateFromDays(12345)); + } @Test - public void testTime() {} + public void testTime() { + checkRecordValueToRowValue( + Types.TimeType.get(), + Schema.FieldType.logicalType(SqlTypes.TIME), + DateTimeUtil.timeFromMicros(1234567L)); + } @Test public void testTimestamp() { @@ -537,6 +564,9 @@ public void testMapBeamFieldTypeToIcebergFieldType() { .addNullableStringField("str") .addNullableBooleanField("bool") .addByteArrayField("bytes") + .addLogicalTypeField("datetime", SqlTypes.DATETIME) + .addLogicalTypeField("time", SqlTypes.TIME) + .addLogicalTypeField("date", SqlTypes.DATE) .build(); static final org.apache.iceberg.Schema ICEBERG_SCHEMA_PRIMITIVE = @@ -547,16 +577,16 @@ public void testMapBeamFieldTypeToIcebergFieldType() { required(4, "long", Types.LongType.get()), optional(5, "str", Types.StringType.get()), optional(6, "bool", Types.BooleanType.get()), - required(7, "bytes", Types.BinaryType.get())); + required(7, "bytes", Types.BinaryType.get()), + required(8, "datetime", Types.TimestampType.withoutZone()), + required(9, "time", Types.TimeType.get()), + required(10, "date", Types.DateType.get())); @Test public void testPrimitiveBeamSchemaToIcebergSchema() { org.apache.iceberg.Schema convertedIcebergSchema = IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_PRIMITIVE); - System.out.println(convertedIcebergSchema); - System.out.println(ICEBERG_SCHEMA_PRIMITIVE); - assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_PRIMITIVE)); } @@ -567,6 +597,27 @@ public void testPrimitiveIcebergSchemaToBeamSchema() { assertEquals(BEAM_SCHEMA_PRIMITIVE, convertedBeamSchema); } + @Test + public void testDatetimeBeamToIceberg() { + // Support converting both DATETIME types to Iceberg TIMESTAMP type + // The other way around only supports Iceberg TIMESTAMP to Beam SqlTypes.DATETIME + Schema beamSchema = + Schema.builder() + .addDateTimeField("datetime") + .addLogicalTypeField("sql_datetime", SqlTypes.DATETIME) + .build(); + + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema( + required(1, "datetime", Types.TimestampType.withoutZone()), + required(2, "sql_datetime", Types.TimestampType.withoutZone())); + + org.apache.iceberg.Schema convertedIcebergSchema = + IcebergUtils.beamSchemaToIcebergSchema(beamSchema); + + assertTrue(convertedIcebergSchema.sameSchema(icebergSchema)); + } + static final Schema BEAM_SCHEMA_LIST = Schema.builder() .addIterableField("arr_str", Schema.FieldType.STRING) From 24741879fc1c63dbc41f6302aa607139d3cfb16b Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 8 Oct 2024 02:43:08 +0300 Subject: [PATCH 02/14] add to changes md --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 64abde787a7ba..e7f147f420f3d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,7 +63,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). -* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types +* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688)) ## New Features / Improvements From 16b15161063fde745f6072483172e6a2868d2283 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 8 Oct 2024 02:52:24 +0300 Subject: [PATCH 03/14] update java doc --- .../java/org/apache/beam/sdk/io/iceberg/IcebergIO.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 6321f9006e2a9..c391f708d0add 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -141,7 +141,13 @@ * DOUBLE DOUBLE * * - * DATETIME STRING + * SqlTypes.DATETIME TIMESTAMP + * + * + * SqlTypes.DATE DATE + * + * + * SqlTypes.TIME TIME * * * ITERABLE LIST From 90474202b2df9e4d20b9089f733f8961a357ac2e Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 8 Oct 2024 03:12:03 +0300 Subject: [PATCH 04/14] always write java time LocalDateTime for iceberg TIMESTAMP --- .../org/apache/beam/sdk/io/iceberg/IcebergUtils.java | 10 +++------- .../apache/beam/sdk/io/iceberg/IcebergUtilsTest.java | 10 ++++++++-- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index 74194d8c93700..f63b776d61930 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -38,17 +38,15 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.SerializableFunction; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Instant; -/** Utilities for converting between Beam and Iceberg types. */ +/** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */ public class IcebergUtils { - // This is made public for users convenience, as many may have more experience working with - // Iceberg types. - private IcebergUtils() {} private static final Map BEAM_TYPES_TO_ICEBERG_TYPES = @@ -255,8 +253,6 @@ static TypeAndMaxId beamFieldTypeToIcebergFieldType( * *

The following unsupported Beam types will be defaulted to {@link Types.StringType}: *

  • {@link Schema.TypeName.DECIMAL} - *
  • {@link Schema.TypeName.DATETIME} - *
  • {@link Schema.TypeName.LOGICAL_TYPE} */ public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema schema) { List fields = new ArrayList<>(schema.getFieldCount()); @@ -320,7 +316,7 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row case TIMESTAMP: Object val = value.getValue(name); if (val instanceof Instant) { // case Schema.FieldType.DATETIME - rec.setField(name, ((Instant) val).getMillis()); + rec.setField(name, DateTimeUtil.timestampFromMicros(((Instant) val).getMillis())); } else if (val instanceof LocalDateTime) { // case SqlTypes.DATETIME rec.setField(name, val); } else { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index aa429cba1f871..6e19a5226e96c 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -47,6 +47,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +/** Test class for {@link IcebergUtils}. */ @RunWith(Enclosed.class) public class IcebergUtilsTest { @@ -128,7 +129,7 @@ public void testTimestamp() { Schema.FieldType.DATETIME, dateTime.toInstant(), Types.TimestampType.withoutZone(), - dateTime.getMillis()); + DateTimeUtil.timestampFromMicros(dateTime.getMillis())); checkRowValueToRecordValue( Schema.FieldType.logicalType(SqlTypes.DATETIME), @@ -266,6 +267,11 @@ public void testTimestamp() { dateTime.getMillis(), Schema.FieldType.DATETIME, dateTime.toInstant()); + + checkRecordValueToRowValue( + Types.TimestampType.withoutZone(), + Schema.FieldType.logicalType(SqlTypes.DATETIME), + DateTimeUtil.timestampFromMicros(123456789L)); } @Test @@ -452,7 +458,7 @@ public void testStructBeamFieldTypeToIcebergFieldType() { new BeamFieldTypeTestCase( 1, Schema.FieldType.row(BEAM_SCHEMA_PRIMITIVE), - 7, + 10, Types.StructType.of(ICEBERG_SCHEMA_PRIMITIVE.columns())), new BeamFieldTypeTestCase( 15, From adfa84d681cc4c22105b23a39da09df2f2dbadc4 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 8 Oct 2024 14:45:30 +0300 Subject: [PATCH 05/14] update java doc --- .../main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index c391f708d0add..6a5400d7a06f0 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -163,6 +163,12 @@ * * * + *

    Note: {@code SqlTypes} are Beam logical types. + * + *

    Note: Both of Beam's {@code DATETIME} and {@code SqlTypes.DATETIME} types will convert + * to Iceberg's {@code TIMESTAMP} type. In the opposite direction, Iceberg's {@code TIMESTAMP} will + * only convert to Beam's {@code SqlTypes.DATETIME}. + * *

    Dynamic Destinations

    * *

    Managed Iceberg supports writing to dynamic destinations. To do so, please provide an From d3cb183303a6015c334b06569a375c268e31ad2f Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 9 Oct 2024 14:09:19 +0300 Subject: [PATCH 06/14] add timezone support with Strings --- .../apache/beam/sdk/io/iceberg/IcebergIO.java | 27 +++++- .../beam/sdk/io/iceberg/IcebergUtils.java | 97 +++++++++++++++---- .../beam/sdk/io/iceberg/IcebergIOIT.java | 6 ++ .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 58 +++++++++-- 4 files changed, 159 insertions(+), 29 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 6a5400d7a06f0..ad54be0576ec6 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -144,6 +144,9 @@ * SqlTypes.DATETIME TIMESTAMP * * + * STRING TIMESTAMPTZ + * + * * SqlTypes.DATE DATE * * @@ -165,9 +168,27 @@ * *

    Note: {@code SqlTypes} are Beam logical types. * - *

    Note: Both of Beam's {@code DATETIME} and {@code SqlTypes.DATETIME} types will convert - * to Iceberg's {@code TIMESTAMP} type. In the opposite direction, Iceberg's {@code TIMESTAMP} will - * only convert to Beam's {@code SqlTypes.DATETIME}. + *

    Note on timestamps

    + * + *

    Iceberg has two timestamp types: {@code timestamp} and {@code timestamptz} (the latter cares + * about timezones). Beam's native schema extends two types for timestamps: {@code DATETIME} and + * {@code SqlTypes.DATETIME}; unfortunately when values in these two fields are processed in Beam + * Rows, timezone information is dropped and a UTC timestamp is produced. + * + *

    If your use-case requires timestamps with timezone, you can provide {@code STRING} + * timestamp representations, which will be parsed with {@link + * java.time.OffsetDateTime#parse(CharSequence)} and written to Iceberg. + * + *

    Otherwise, you may write timestamps using any of {@code DATETIME}, {@code SqlTypes.DATETIME}, + * or {@code INT64}. + * + *

    Note: Beam does not support creating a table with a timestamptz field. If the table + * does not exist, Beam will treat {@code STRING} and {@code INT64} at face-value. If you expect + * Beam to create a table with Iceberg timestamps (without tz), please use either {@code DATETIME} + * or {@code SqlTypes.DATETIME}. + * + *

    For Iceberg reads, the connector will convert Iceberg's {@code timestamp} and {@code + * timestamptz} types to Beam's {@code SqlTypes.DATETIME} and {@code STRING}, respectively. * *

    Dynamic Destinations

    * diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index f63b776d61930..233832e04d406 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -23,6 +23,8 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -42,7 +44,6 @@ import org.apache.iceberg.util.SerializableFunction; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; import org.joda.time.Instant; /** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */ @@ -85,11 +86,17 @@ private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) { case TIME: return Schema.FieldType.logicalType(SqlTypes.TIME); case TIMESTAMP: - // Beam has two 'DATETIME' types: SqlTypes.DATETIME and Schema.FieldType.DATETIME, but - // we have to choose one type to output. - // The former uses the `java.time` library and the latter uses the `org.joda.time` library - // Iceberg API leans towards `java.time`, so we output the SqlTypes.DATETIME logical type - return Schema.FieldType.logicalType(SqlTypes.DATETIME); + Types.TimestampType ts = (Types.TimestampType) type.asPrimitiveType(); + if (ts.shouldAdjustToUTC()) { + // String timestamps can hold timezone information + return Schema.FieldType.STRING; + } else { + // Beam has two 'DATETIME' types: SqlTypes.DATETIME (uses java.time) and + // Schema.FieldType.DATETIME (uses org.joda.time). + // We choose SqlTypes.DATETIME because Iceberg API leans towards `java.time` + // Unfortunately neither of these types maintains timezone information + return Schema.FieldType.logicalType(SqlTypes.DATETIME); + } case STRING: return Schema.FieldType.STRING; case UUID: @@ -315,15 +322,41 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row break; case TIMESTAMP: Object val = value.getValue(name); - if (val instanceof Instant) { // case Schema.FieldType.DATETIME - rec.setField(name, DateTimeUtil.timestampFromMicros(((Instant) val).getMillis())); - } else if (val instanceof LocalDateTime) { // case SqlTypes.DATETIME + if (val == null) { + break; + } + Types.TimestampType ts = (Types.TimestampType) field.type().asPrimitiveType(); + // timestamp with timezone + if (ts.shouldAdjustToUTC()) { + // currently only string is supported because other types + // do not maintain timezone information + if (val instanceof String) { + // e.g. 2007-12-03T10:15:30+01:00 + rec.setField(name, OffsetDateTime.parse((String) val)); + break; + } else { + throw new UnsupportedOperationException( + "Unsupported Beam type for Iceberg timestamp with timezone: " + val.getClass()); + } + } + + // timestamp + // SqlTypes.DATETIME + if (val instanceof LocalDateTime) { rec.setField(name, val); + break; + } + + long micros; + if (val instanceof Instant) { // Schema.FieldType.DATETIME + micros = ((Instant) val).getMillis() * 1000L; + } else if (val instanceof Long) { // Schema.FieldType.INT64 + micros = (long) val; } else { - String invalidType = val == null ? "unknown" : val.getClass().toString(); - throw new IllegalStateException( - "Unexpected Beam type for Iceberg TIMESTAMP: " + invalidType); + throw new UnsupportedOperationException( + "Unsupported Beam type for Iceberg timestamp: " + val.getClass()); } + rec.setField(name, DateTimeUtil.timestampFromMicros(micros)); break; case STRING: Optional.ofNullable(value.getString(name)).ifPresent(v -> rec.setField(name, v)); @@ -385,21 +418,51 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { case BYTE: case INT16: case INT32: - case INT64: case DECIMAL: // Iceberg and Beam both use BigDecimal case FLOAT: // Iceberg and Beam both use float case DOUBLE: // Iceberg and Beam both use double - case STRING: // Iceberg and Beam both use String case BOOLEAN: // Iceberg and Beam both use String case ARRAY: case ITERABLE: case MAP: rowBuilder.addValue(icebergValue); break; + case STRING: // Iceberg and Beam both use String + // first handle timestamp with timezone case + if (icebergValue instanceof OffsetDateTime) { + rowBuilder.addValue(((OffsetDateTime) icebergValue).toString()); + } else { + rowBuilder.addValue(icebergValue); + } + break; + case INT64: + long val; + if (icebergValue instanceof Long) { + val = (long) icebergValue; + } else if (icebergValue instanceof OffsetDateTime) { + val = DateTimeUtil.microsFromTimestamptz((OffsetDateTime) icebergValue); + } else { + throw new UnsupportedOperationException( + "Unsupported Iceberg type for Beam type INT64: " + icebergValue.getClass()); + } + rowBuilder.addValue(val); + break; case DATETIME: - // Iceberg uses a long for millis; Beam uses joda time DateTime - long millis = (long) icebergValue; - rowBuilder.addValue(new DateTime(millis, DateTimeZone.UTC)); + long millis; + if (icebergValue instanceof OffsetDateTime) { + millis = ((OffsetDateTime) icebergValue).toInstant().toEpochMilli(); + } else if (icebergValue instanceof LocalDateTime) { + millis = ((LocalDateTime) icebergValue).toInstant(ZoneOffset.UTC).toEpochMilli(); + } else if (icebergValue instanceof Long) { + // Iceberg uses a long for micros + // Beam DATETIME uses joda's DateTime, which only supports millis, + // so we do lose some precision here + millis = ((long) icebergValue) / 1000L; + } else { + throw new UnsupportedOperationException( + "Unsupported Iceberg type for Beam type DATETIME: " + icebergValue.getClass()); + } + rowBuilder.addValue(new DateTime(millis)); break; case BYTES: // Iceberg uses ByteBuffer; Beam uses byte[] diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index 9ff506a68a215..79c56670ef426 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.Serializable; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -112,6 +113,7 @@ public class IcebergIOIT implements Serializable { .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) .addNullableInt64Field("nullable_long") .addLogicalTypeField("datetime", SqlTypes.DATETIME) + .addStringField("datetime_tz") .addLogicalTypeField("date", SqlTypes.DATE) .addLogicalTypeField("time", SqlTypes.TIME) .build(); @@ -144,6 +146,10 @@ public Row apply(Long num) { .addValue(num % 2 == 0 ? null : nestedRow) .addValue(num) .addValue(DateTimeUtil.timestampFromMicros(num)) + .addValue( + DateTimeUtil.timestamptzFromMicros(num) + .withOffsetSameInstant(ZoneOffset.ofHoursMinutes(3, 38)) + .toString()) .addValue(DateTimeUtil.dateFromDays(Integer.parseInt(strNum))) .addValue(DateTimeUtil.timeFromMicros(num)) .build(); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index 6e19a5226e96c..6102eaa5aacea 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -28,6 +28,8 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -122,19 +124,38 @@ public void testTime() { @Test public void testTimestamp() { + // SqlTypes.DATETIME + checkRowValueToRecordValue( + Schema.FieldType.logicalType(SqlTypes.DATETIME), + Types.TimestampType.withoutZone(), + DateTimeUtil.timestampFromMicros(123456789L)); + + // Schema.FieldType.DATETIME DateTime dateTime = new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC); - checkRowValueToRecordValue( Schema.FieldType.DATETIME, - dateTime.toInstant(), + dateTime, Types.TimestampType.withoutZone(), - DateTimeUtil.timestampFromMicros(dateTime.getMillis())); + DateTimeUtil.timestampFromMicros(dateTime.getMillis() * 1000L)); + // Schema.FieldType.INT64 + long micros = 1234567890L; checkRowValueToRecordValue( - Schema.FieldType.logicalType(SqlTypes.DATETIME), + Schema.FieldType.INT64, + micros, Types.TimestampType.withoutZone(), - DateTimeUtil.timestampFromMicros(123456789L)); + DateTimeUtil.timestampFromMicros(micros)); + } + + @Test + public void testTimestampWithZone() { + String val = "2024-10-08T13:18:20.053+03:27"; + OffsetDateTime offsetDateTime = + OffsetDateTime.of(2024, 10, 8, 13, 18, 20, 53_000_000, ZoneOffset.ofHoursMinutes(3, 27)); + // Schema.FieldType.String + checkRowValueToRecordValue( + Schema.FieldType.STRING, val, Types.TimestampType.withZone(), offsetDateTime); } @Test @@ -259,19 +280,38 @@ public void testTime() { @Test public void testTimestamp() { + // SqlTypes.DATETIME + checkRecordValueToRowValue( + Types.TimestampType.withoutZone(), + Schema.FieldType.logicalType(SqlTypes.DATETIME), + DateTimeUtil.timestampFromMicros(123456789L)); + + // Schema.FieldType.DATETIME DateTime dateTime = new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC); checkRecordValueToRowValue( Types.TimestampType.withoutZone(), - dateTime.getMillis(), + dateTime.getMillis() * 1000L, Schema.FieldType.DATETIME, - dateTime.toInstant()); + dateTime); + // Schema.FieldType.INT64 + long micros = 1234567890L; checkRecordValueToRowValue( Types.TimestampType.withoutZone(), - Schema.FieldType.logicalType(SqlTypes.DATETIME), - DateTimeUtil.timestampFromMicros(123456789L)); + DateTimeUtil.timestamptzFromMicros(micros), + Schema.FieldType.INT64, + micros); + } + + @Test + public void testTimestampWithZone() { + String val = "2007-12-03T10:15:30+01:00"; + OffsetDateTime offsetDateTime = OffsetDateTime.parse(val); + // Schema.FieldType.String + checkRecordValueToRowValue( + Types.TimestampType.withZone(), offsetDateTime, Schema.FieldType.STRING, val); } @Test From f10e0acf70e634385b0c085b07dd5df309931927 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 9 Oct 2024 17:51:39 +0300 Subject: [PATCH 07/14] clean up; reading iceberg timestamptz will return sqltype.datetime --- .../apache/beam/sdk/io/iceberg/IcebergIO.java | 5 +- .../beam/sdk/io/iceberg/IcebergUtils.java | 112 +++++++++--------- .../beam/sdk/io/iceberg/IcebergIOIT.java | 74 +++++++++--- .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 27 ++++- 4 files changed, 141 insertions(+), 77 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 43bd99a8a14c0..f0d33a607beb5 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -191,13 +191,14 @@ *

    Otherwise, you may write timestamps using any of {@code DATETIME}, {@code SqlTypes.DATETIME}, * or {@code INT64}. * - *

    Note: Beam does not support creating a table with a timestamptz field. If the table + *

    Note: Beam does not support creating a table with a timestamptz field. If the table * does not exist, Beam will treat {@code STRING} and {@code INT64} at face-value. If you expect * Beam to create a table with Iceberg timestamps (without tz), please use either {@code DATETIME} * or {@code SqlTypes.DATETIME}. * *

    For Iceberg reads, the connector will convert Iceberg's {@code timestamp} and {@code - * timestamptz} types to Beam's {@code SqlTypes.DATETIME} and {@code STRING}, respectively. + * timestamptz} types to Beam's {@code SqlTypes.DATETIME}. This is sufficient because Iceberg stores + * all timestamp values as UTC and does not retain the timezone anyway. * *

    Dynamic Destinations

    * diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index 233832e04d406..76b6be404c3bf 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -41,7 +41,6 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; -import org.apache.iceberg.util.SerializableFunction; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.Instant; @@ -86,17 +85,11 @@ private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) { case TIME: return Schema.FieldType.logicalType(SqlTypes.TIME); case TIMESTAMP: - Types.TimestampType ts = (Types.TimestampType) type.asPrimitiveType(); - if (ts.shouldAdjustToUTC()) { - // String timestamps can hold timezone information - return Schema.FieldType.STRING; - } else { - // Beam has two 'DATETIME' types: SqlTypes.DATETIME (uses java.time) and - // Schema.FieldType.DATETIME (uses org.joda.time). - // We choose SqlTypes.DATETIME because Iceberg API leans towards `java.time` - // Unfortunately neither of these types maintains timezone information - return Schema.FieldType.logicalType(SqlTypes.DATETIME); - } + // Beam has two 'DATETIME' types: SqlTypes.DATETIME (uses java.time) and + // Schema.FieldType.DATETIME (uses org.joda.time). + // We choose SqlTypes.DATETIME because Iceberg API leans towards `java.time`. + // Also, Iceberg stores timestampts as UTC and does not retain timezone + return Schema.FieldType.logicalType(SqlTypes.DATETIME); case STRING: return Schema.FieldType.STRING; case UUID: @@ -391,15 +384,6 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row } } - static final Map> LOGICAL_TYPE_CONVERTERS = - ImmutableMap.>builder() - .put(SqlTypes.DATE.getIdentifier(), val -> SqlTypes.DATE.toBaseType((LocalDate) val)) - .put(SqlTypes.TIME.getIdentifier(), val -> SqlTypes.TIME.toBaseType((LocalTime) val)) - .put( - SqlTypes.DATETIME.getIdentifier(), - val -> SqlTypes.DATETIME.toBaseType((LocalDateTime) val)) - .build(); - /** Converts an Iceberg {@link Record} to a Beam {@link Row}. */ public static Row icebergRecordToBeamRow(Schema schema, Record record) { Row.Builder rowBuilder = Row.withSchema(schema); @@ -421,48 +405,40 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { case DECIMAL: // Iceberg and Beam both use BigDecimal case FLOAT: // Iceberg and Beam both use float case DOUBLE: // Iceberg and Beam both use double - case BOOLEAN: // Iceberg and Beam both use String + case STRING: // Iceberg and Beam both use String + case BOOLEAN: // Iceberg and Beam both use boolean case ARRAY: case ITERABLE: case MAP: rowBuilder.addValue(icebergValue); break; - case STRING: // Iceberg and Beam both use String - // first handle timestamp with timezone case - if (icebergValue instanceof OffsetDateTime) { - rowBuilder.addValue(((OffsetDateTime) icebergValue).toString()); - } else { - rowBuilder.addValue(icebergValue); - } - break; case INT64: - long val; - if (icebergValue instanceof Long) { - val = (long) icebergValue; - } else if (icebergValue instanceof OffsetDateTime) { - val = DateTimeUtil.microsFromTimestamptz((OffsetDateTime) icebergValue); - } else { - throw new UnsupportedOperationException( - "Unsupported Iceberg type for Beam type INT64: " + icebergValue.getClass()); + Object value = icebergValue; + if (icebergValue instanceof OffsetDateTime) { + value = DateTimeUtil.microsFromTimestamptz((OffsetDateTime) icebergValue); + } else if (icebergValue instanceof LocalDateTime) { + value = DateTimeUtil.microsFromTimestamp((LocalDateTime) icebergValue); + } else if (icebergValue instanceof LocalTime) { + value = DateTimeUtil.microsFromTime((LocalTime) icebergValue); } - rowBuilder.addValue(val); + rowBuilder.addValue(value); break; case DATETIME: - long millis; + long micros; if (icebergValue instanceof OffsetDateTime) { - millis = ((OffsetDateTime) icebergValue).toInstant().toEpochMilli(); + micros = DateTimeUtil.microsFromTimestamptz((OffsetDateTime) icebergValue); } else if (icebergValue instanceof LocalDateTime) { - millis = ((LocalDateTime) icebergValue).toInstant(ZoneOffset.UTC).toEpochMilli(); + micros = DateTimeUtil.microsFromTimestamp((LocalDateTime) icebergValue); } else if (icebergValue instanceof Long) { - // Iceberg uses a long for micros - // Beam DATETIME uses joda's DateTime, which only supports millis, - // so we do lose some precision here - millis = ((long) icebergValue) / 1000L; + micros = (long) icebergValue; } else { throw new UnsupportedOperationException( "Unsupported Iceberg type for Beam type DATETIME: " + icebergValue.getClass()); } - rowBuilder.addValue(new DateTime(millis)); + // Iceberg uses a long for micros + // Beam DATETIME uses joda's DateTime, which only supports millis, + // so we do lose some precision here + rowBuilder.addValue(new DateTime(micros / 1000L)); break; case BYTES: // Iceberg uses ByteBuffer; Beam uses byte[] @@ -477,18 +453,7 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { rowBuilder.addValue(icebergRecordToBeamRow(nestedSchema, nestedRecord)); break; case LOGICAL_TYPE: - Schema.LogicalType logicalType = field.getType().getLogicalType(); - if (logicalType == null) { - throw new RuntimeException("Unexpected null Beam logical type " + field.getType()); - } - @Nullable - SerializableFunction converter = - LOGICAL_TYPE_CONVERTERS.get(logicalType.getIdentifier()); - if (converter == null) { - throw new RuntimeException( - "Unsupported Beam logical type " + logicalType.getIdentifier()); - } - rowBuilder.addValue(icebergValue); + rowBuilder.addValue(getLogicalTypeValue(icebergValue, field.getType())); break; default: throw new UnsupportedOperationException( @@ -497,4 +462,33 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { } return rowBuilder.build(); } + + private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType type) { + if (icebergValue instanceof String) { + String strValue = (String) icebergValue; + if (type.isLogicalType(SqlTypes.DATE.getIdentifier())) { + return LocalDate.parse(strValue); + } else if (type.isLogicalType(SqlTypes.TIME.getIdentifier())) { + return LocalTime.parse(strValue); + } else if (type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) { + return LocalDateTime.parse(strValue); + } + } else if (icebergValue instanceof Long) { + if (type.isLogicalType(SqlTypes.TIME.getIdentifier())) { + return DateTimeUtil.timeFromMicros((Long) icebergValue); + } else if (type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) { + return DateTimeUtil.timestampFromMicros((Long) icebergValue); + } + } else if (icebergValue instanceof Integer + && type.isLogicalType(SqlTypes.DATE.getIdentifier())) { + return DateTimeUtil.dateFromDays((Integer) icebergValue); + } else if (icebergValue instanceof OffsetDateTime + && type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) { + return ((OffsetDateTime) icebergValue) + .withOffsetSameInstant(ZoneOffset.UTC) + .toLocalDateTime(); + } + // LocalDateTime, LocalDate, LocalTime + return icebergValue; + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index 7dd94554ea003..e4e2c0bf300ad 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.Serializable; +import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; @@ -74,6 +75,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -148,8 +150,9 @@ public Row apply(Long num) { .addValue(num) .addValue(DateTimeUtil.timestampFromMicros(num)) .addValue( - DateTimeUtil.timestamptzFromMicros(num) - .withOffsetSameInstant(ZoneOffset.ofHoursMinutes(3, 38)) + // increment minutes. in testWritePartitionedData, we will partition by hours + OffsetDateTime.of(2024, 9, 10, 12, 30, 30, 0, ZONE_OFFSET) + .plusMinutes(num) .toString()) .addValue(DateTimeUtil.dateFromDays(Integer.parseInt(strNum))) .addValue(DateTimeUtil.timeFromMicros(num)) @@ -157,6 +160,8 @@ public Row apply(Long num) { } }; + private static final ZoneOffset ZONE_OFFSET = ZoneOffset.ofHoursMinutes(3, 25); + private static final org.apache.iceberg.Schema ICEBERG_SCHEMA = IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA); private static final SimpleFunction RECORD_FUNC = @@ -200,6 +205,26 @@ public void setUp() { catalog = new HadoopCatalog(catalogHadoopConf, warehouseLocation); } + /** + * Currently, Iceberg's timestamptz is only supported with the Beam String type. However, our + * {@link IcebergUtils} naturally converts a Beam String to an Iceberg String. This method + * replaces a String field with it's intended timestamp type. + */ + private Schema getIcebergSchemaWithTimestampTz() { + List fields = new ArrayList<>(ICEBERG_SCHEMA.columns()); + Types.NestedField datetimeTz = ICEBERG_SCHEMA.findField("datetime_tz"); + int index = fields.indexOf(datetimeTz); + fields.set( + index, + Types.NestedField.of( + datetimeTz.fieldId(), + datetimeTz.isOptional(), + datetimeTz.name(), + Types.TimestampType.withZone())); + + return new Schema(fields); + } + /** Populates the Iceberg table and Returns a {@link List} of expected elements. */ private List populateTable(Table table) throws IOException { double recordsPerShardFraction = NUM_RECORDS.doubleValue() / NUM_SHARDS; @@ -238,6 +263,10 @@ private List populateTable(Table table) throws IOException { } private List readRecords(Table table) { + return readRecords(table, false); + } + + private List readRecords(Table table, boolean adjustBackToZone) { Schema tableSchema = table.schema(); TableScan tableScan = table.newScan().project(tableSchema); List writtenRecords = new ArrayList<>(); @@ -256,6 +285,13 @@ private List readRecords(Table table) { .build(); for (Record rec : iterable) { + // Iceberg returns timestamps in UTC, so we apply the zone offset again to check + // correctness with our initial Beam rows + if (adjustBackToZone) { + OffsetDateTime dt = (OffsetDateTime) rec.getField("datetime_tz"); + dt = dt.withOffsetSameInstant(ZONE_OFFSET); + rec.setField("datetime_tz", dt); + } writtenRecords.add(rec); } } @@ -283,7 +319,8 @@ private Map managedIcebergConfig(String tableId) { */ @Test public void testRead() throws Exception { - Table table = catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA); + Schema schema = getIcebergSchemaWithTimestampTz(); + Table table = catalog.createTable(TableIdentifier.parse(tableId), schema); List expectedRows = populateTable(table); @@ -306,33 +343,38 @@ public void testRead() throws Exception { @Test public void testWrite() { // Write with Beam - // Expect the sink to create the table + // These lines are needed to inject a TimestampType.withZone() type in the table schema. + // Beam doesn't support creating tables with this type, so we need to create it beforehand + Schema newSchema = getIcebergSchemaWithTimestampTz(); + Table table = catalog.createTable(TableIdentifier.parse(tableId), newSchema); + Map config = managedIcebergConfig(tableId); PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA); input.apply(Managed.write(Managed.ICEBERG).withConfig(config)); pipeline.run().waitUntilFinish(); - Table table = catalog.loadTable(TableIdentifier.parse(tableId)); - assertTrue(table.schema().sameSchema(ICEBERG_SCHEMA)); - // Read back and check records are correct - List returnedRecords = readRecords(table); + List returnedRecords = readRecords(table, true); assertThat( - returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); + returnedRecords, + containsInAnyOrder( + INPUT_ROWS.stream() + .map(row -> IcebergUtils.beamRowToIcebergRecord(newSchema, row)) + .toArray())); } @Test public void testWritePartitionedData() { // For an example row where bool=true, modulo_5=3, str=value_303, // this partition spec will create a partition like: /bool=true/modulo_5=3/str_trunc=value_3/ + Schema newSchema = getIcebergSchemaWithTimestampTz(); PartitionSpec partitionSpec = - PartitionSpec.builderFor(ICEBERG_SCHEMA) + PartitionSpec.builderFor(newSchema) .identity("bool") .identity("modulo_5") .truncate("str", "value_x".length()) .build(); - Table table = - catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA, partitionSpec); + Table table = catalog.createTable(TableIdentifier.parse(tableId), newSchema, partitionSpec); // Write with Beam Map config = managedIcebergConfig(tableId); @@ -341,9 +383,13 @@ public void testWritePartitionedData() { pipeline.run().waitUntilFinish(); // Read back and check records are correct - List returnedRecords = readRecords(table); + List returnedRecords = readRecords(table, true); assertThat( - returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); + returnedRecords, + containsInAnyOrder( + INPUT_ROWS.stream() + .map(row -> IcebergUtils.beamRowToIcebergRecord(newSchema, row)) + .toArray())); } private PeriodicImpulse getStreamingSource() { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index 6102eaa5aacea..46f2efcd68edd 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -28,6 +28,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.Arrays; @@ -309,9 +310,14 @@ public void testTimestamp() { public void testTimestampWithZone() { String val = "2007-12-03T10:15:30+01:00"; OffsetDateTime offsetDateTime = OffsetDateTime.parse(val); + LocalDateTime expectedDateTime = + offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC).toLocalDateTime(); // Schema.FieldType.String checkRecordValueToRowValue( - Types.TimestampType.withZone(), offsetDateTime, Schema.FieldType.STRING, val); + Types.TimestampType.withZone(), + offsetDateTime, + Schema.FieldType.logicalType(SqlTypes.DATETIME), + expectedDateTime); } @Test @@ -646,7 +652,6 @@ public void testPrimitiveIcebergSchemaToBeamSchema() { @Test public void testDatetimeBeamToIceberg() { // Support converting both DATETIME types to Iceberg TIMESTAMP type - // The other way around only supports Iceberg TIMESTAMP to Beam SqlTypes.DATETIME Schema beamSchema = Schema.builder() .addDateTimeField("datetime") @@ -664,6 +669,24 @@ public void testDatetimeBeamToIceberg() { assertTrue(convertedIcebergSchema.sameSchema(icebergSchema)); } + @Test + public void testTimestampIcebergToBeam() { + // Both timestamp and timestamptz types will convert to Beam SqlTypes.DATETIME + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema( + required(1, "timestamp", Types.TimestampType.withoutZone()), + required(2, "timestamp_tz", Types.TimestampType.withZone())); + + Schema expectedBeamSchema = + Schema.builder() + .addLogicalTypeField("timestamp", SqlTypes.DATETIME) + .addLogicalTypeField("timestamp_tz", SqlTypes.DATETIME) + .build(); + Schema convertedBeamSchema = IcebergUtils.icebergSchemaToBeamSchema(icebergSchema); + + assertEquals(expectedBeamSchema, convertedBeamSchema); + } + static final Schema BEAM_SCHEMA_LIST = Schema.builder() .addIterableField("arr_str", Schema.FieldType.STRING) From 64d0c51b908d29eb4d6091fd39522fe336bf00aa Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 9 Oct 2024 17:57:14 +0300 Subject: [PATCH 08/14] doc update --- .../main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index f0d33a607beb5..521528bb5a5fa 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -196,9 +196,9 @@ * Beam to create a table with Iceberg timestamps (without tz), please use either {@code DATETIME} * or {@code SqlTypes.DATETIME}. * - *

    For Iceberg reads, the connector will convert Iceberg's {@code timestamp} and {@code - * timestamptz} types to Beam's {@code SqlTypes.DATETIME}. This is sufficient because Iceberg stores - * all timestamp values as UTC and does not retain the timezone anyway. + *

    For Iceberg reads, the connector will produce Beam {@code SqlTypes.DATETIME} types for both of + * Iceberg's {@code timestamp} and {@code timestamptz}. This is sufficient because Iceberg does not + * retain timezone information and only produces UTC timestamps. * *

    Dynamic Destinations

    * From 9ed5cb99142900fb5ec8ee29c459d8d8dc16e869 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 9 Oct 2024 18:06:44 +0300 Subject: [PATCH 09/14] doc update --- .../main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 521528bb5a5fa..4540d080dfae4 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -181,8 +181,9 @@ * *

    Iceberg has two timestamp types: {@code timestamp} and {@code timestamptz} (the latter cares * about timezones). Beam's native schema extends two types for timestamps: {@code DATETIME} and - * {@code SqlTypes.DATETIME}; unfortunately when values in these two fields are processed in Beam - * Rows, timezone information is dropped and a UTC timestamp is produced. + * {@code SqlTypes.DATETIME} -- both are supported for writing to the former {@code timestamp} type. + * They are not supported for {@code timestamptz} however because when a Beam Row stores timestamp + * values, it resolves them to UTC and drops the timezone information. * *

    If your use-case requires timestamps with timezone, you can provide {@code STRING} * timestamp representations, which will be parsed with {@link From a92f2d28cf56d6a2fdfbfa10104fcfbc70646590 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 9 Oct 2024 23:07:45 +0300 Subject: [PATCH 10/14] support string, long, sql.datetime, and datetime; timestamp returns sql.datetime and timestamptTZ returns datetime --- .../apache/beam/sdk/io/iceberg/IcebergIO.java | 36 +++--- .../beam/sdk/io/iceberg/IcebergUtils.java | 106 ++++++++++-------- .../beam/sdk/io/iceberg/IcebergIOIT.java | 83 +++----------- .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 91 +++++++++++---- 4 files changed, 164 insertions(+), 152 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 4540d080dfae4..11e622c6f7575 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -153,7 +153,7 @@ * SqlTypes.DATETIME TIMESTAMP * * - * STRING TIMESTAMPTZ + * DATETIME TIMESTAMPTZ * * * SqlTypes.DATE DATE @@ -179,27 +179,27 @@ * *

    Note on timestamps

    * - *

    Iceberg has two timestamp types: {@code timestamp} and {@code timestamptz} (the latter cares - * about timezones). Beam's native schema extends two types for timestamps: {@code DATETIME} and - * {@code SqlTypes.DATETIME} -- both are supported for writing to the former {@code timestamp} type. - * They are not supported for {@code timestamptz} however because when a Beam Row stores timestamp - * values, it resolves them to UTC and drops the timezone information. + *

    Iceberg has two timestamp types: {@code timestamp} and {@code timestamptz}. For the latter, + * Iceberg will first resolve the timestamp to UTC before storing it. * - *

    If your use-case requires timestamps with timezone, you can provide {@code STRING} - * timestamp representations, which will be parsed with {@link - * java.time.OffsetDateTime#parse(CharSequence)} and written to Iceberg. + *

    For an existing table, the following Beam types are supported for both {@code timestamp} and + * {@code timestamptz}: * - *

    Otherwise, you may write timestamps using any of {@code DATETIME}, {@code SqlTypes.DATETIME}, - * or {@code INT64}. + *

      + *
    • {@code SqlTypes.DATETIME} --> Using a java.time.LocalDateTime object + *
    • {@code DATETIME} --> Using a org.joda.time.DateTime object + *
    • {@code INT64} --> Using a long representing micros since EPOCH + *
    • {@code STRING} --> Using a timestamp string representation (e.g. {@code + * "2024-10-08T13:18:20.053+03:27"}) + *
    * - *

    Note: Beam does not support creating a table with a timestamptz field. If the table - * does not exist, Beam will treat {@code STRING} and {@code INT64} at face-value. If you expect - * Beam to create a table with Iceberg timestamps (without tz), please use either {@code DATETIME} - * or {@code SqlTypes.DATETIME}. + *

    Note: If you expect Beam to create the Iceberg table at runtime, please provide {@code + * SqlTypes.DATETIME} and for a {@code timestamp} column and {@code DATETIME} for a {@code + * timestamptz} column. If the table does not exist, Beam will treat {@code STRING} and {@code + * INT64} at face-value and create equivalent column types. * - *

    For Iceberg reads, the connector will produce Beam {@code SqlTypes.DATETIME} types for both of - * Iceberg's {@code timestamp} and {@code timestamptz}. This is sufficient because Iceberg does not - * retain timezone information and only produces UTC timestamps. + *

    For Iceberg reads, the connector will produce Beam {@code SqlTypes.DATETIME} types for + * Iceberg's {@code timestamp} and {@code DATETIME} types for {@code timestamptz}. * *

    Dynamic Destinations

    * diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index 76b6be404c3bf..27c587bd22ed4 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -58,7 +58,7 @@ private IcebergUtils() {} .put(Schema.TypeName.DOUBLE, Types.DoubleType.get()) .put(Schema.TypeName.STRING, Types.StringType.get()) .put(Schema.TypeName.BYTES, Types.BinaryType.get()) - .put(Schema.TypeName.DATETIME, Types.TimestampType.withoutZone()) + .put(Schema.TypeName.DATETIME, Types.TimestampType.withZone()) .build(); private static final Map BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES = @@ -85,10 +85,10 @@ private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) { case TIME: return Schema.FieldType.logicalType(SqlTypes.TIME); case TIMESTAMP: - // Beam has two 'DATETIME' types: SqlTypes.DATETIME (uses java.time) and - // Schema.FieldType.DATETIME (uses org.joda.time). - // We choose SqlTypes.DATETIME because Iceberg API leans towards `java.time`. - // Also, Iceberg stores timestampts as UTC and does not retain timezone + Types.TimestampType ts = (Types.TimestampType) type.asPrimitiveType(); + if (ts.shouldAdjustToUTC()) { + return Schema.FieldType.DATETIME; + } return Schema.FieldType.logicalType(SqlTypes.DATETIME); case STRING: return Schema.FieldType.STRING; @@ -319,37 +319,7 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row break; } Types.TimestampType ts = (Types.TimestampType) field.type().asPrimitiveType(); - // timestamp with timezone - if (ts.shouldAdjustToUTC()) { - // currently only string is supported because other types - // do not maintain timezone information - if (val instanceof String) { - // e.g. 2007-12-03T10:15:30+01:00 - rec.setField(name, OffsetDateTime.parse((String) val)); - break; - } else { - throw new UnsupportedOperationException( - "Unsupported Beam type for Iceberg timestamp with timezone: " + val.getClass()); - } - } - - // timestamp - // SqlTypes.DATETIME - if (val instanceof LocalDateTime) { - rec.setField(name, val); - break; - } - - long micros; - if (val instanceof Instant) { // Schema.FieldType.DATETIME - micros = ((Instant) val).getMillis() * 1000L; - } else if (val instanceof Long) { // Schema.FieldType.INT64 - micros = (long) val; - } else { - throw new UnsupportedOperationException( - "Unsupported Beam type for Iceberg timestamp: " + val.getClass()); - } - rec.setField(name, DateTimeUtil.timestampFromMicros(micros)); + rec.setField(name, getIcebergTimestampValue(val, ts.shouldAdjustToUTC())); break; case STRING: Optional.ofNullable(value.getString(name)).ifPresent(v -> rec.setField(name, v)); @@ -384,6 +354,55 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row } } + /** + * Returns the appropriate value for an Iceberg timestamp field + * + *

    If `timestamp`, we resolve incoming values to a {@link LocalDateTime}. + * + *

    If `timestamptz`, we resolve to a UTC {@link OffsetDateTime}. Iceberg already resolves all + * incoming timestamps to UTC, so there is no harm in doing it from our side. + * + *

    Valid types are: + * + *

      + *
    • {@link SqlTypes.DATETIME} --> {@link LocalDateTime} + *
    • {@link Schema.FieldType.DATETIME} --> {@link Instant} + *
    • {@link Schema.FieldType.INT64} --> {@link Long} + *
    • {@link Schema.FieldType.STRING} --> {@link String} + *
    + */ + private static Object getIcebergTimestampValue(Object beamValue, boolean shouldAdjustToUtc) { + // timestamptz + if (shouldAdjustToUtc) { + if (beamValue instanceof LocalDateTime) { // SqlTypes.DATETIME + return OffsetDateTime.of((LocalDateTime) beamValue, ZoneOffset.UTC); + } else if (beamValue instanceof Instant) { // FieldType.DATETIME + return DateTimeUtil.timestamptzFromMicros(((Instant) beamValue).getMillis() * 1000L); + } else if (beamValue instanceof Long) { // FieldType.INT64 + return DateTimeUtil.timestamptzFromMicros((Long) beamValue); + } else if (beamValue instanceof String) { // FieldType.STRING + return OffsetDateTime.parse((String) beamValue).withOffsetSameInstant(ZoneOffset.UTC); + } else { + throw new UnsupportedOperationException( + "Unsupported Beam type for Iceberg timestamp with timezone: " + beamValue.getClass()); + } + } + + // timestamp + if (beamValue instanceof LocalDateTime) { // SqlType.DATETIME + return beamValue; + } else if (beamValue instanceof Instant) { // FieldType.DATETIME + return DateTimeUtil.timestampFromMicros(((Instant) beamValue).getMillis() * 1000L); + } else if (beamValue instanceof Long) { // FieldType.INT64 + return DateTimeUtil.timestampFromMicros((Long) beamValue); + } else if (beamValue instanceof String) { // FieldType.STRING + return LocalDateTime.parse((String) beamValue); + } else { + throw new UnsupportedOperationException( + "Unsupported Beam type for Iceberg timestamp with timezone: " + beamValue.getClass()); + } + } + /** Converts an Iceberg {@link Record} to a Beam {@link Row}. */ public static Row icebergRecordToBeamRow(Schema schema, Record record) { Row.Builder rowBuilder = Row.withSchema(schema); @@ -402,6 +421,7 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { case BYTE: case INT16: case INT32: + case INT64: case DECIMAL: // Iceberg and Beam both use BigDecimal case FLOAT: // Iceberg and Beam both use float case DOUBLE: // Iceberg and Beam both use double @@ -412,17 +432,6 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { case MAP: rowBuilder.addValue(icebergValue); break; - case INT64: - Object value = icebergValue; - if (icebergValue instanceof OffsetDateTime) { - value = DateTimeUtil.microsFromTimestamptz((OffsetDateTime) icebergValue); - } else if (icebergValue instanceof LocalDateTime) { - value = DateTimeUtil.microsFromTimestamp((LocalDateTime) icebergValue); - } else if (icebergValue instanceof LocalTime) { - value = DateTimeUtil.microsFromTime((LocalTime) icebergValue); - } - rowBuilder.addValue(value); - break; case DATETIME: long micros; if (icebergValue instanceof OffsetDateTime) { @@ -431,6 +440,9 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { micros = DateTimeUtil.microsFromTimestamp((LocalDateTime) icebergValue); } else if (icebergValue instanceof Long) { micros = (long) icebergValue; + } else if (icebergValue instanceof String) { + rowBuilder.addValue(DateTime.parse((String) icebergValue)); + break; } else { throw new UnsupportedOperationException( "Unsupported Iceberg type for Beam type DATETIME: " + icebergValue.getClass()); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index e4e2c0bf300ad..5df8604699a34 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.schemas.Schema.FieldType; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -25,8 +26,6 @@ import java.io.IOException; import java.io.Serializable; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -75,9 +74,10 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; @@ -112,11 +112,11 @@ public class IcebergIOIT implements Serializable { .addBooleanField("bool") .addInt32Field("int") .addRowField("row", NESTED_ROW_SCHEMA) - .addArrayField("arr_long", org.apache.beam.sdk.schemas.Schema.FieldType.INT64) + .addArrayField("arr_long", FieldType.INT64) .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) .addNullableInt64Field("nullable_long") + .addDateTimeField("datetime_tz") .addLogicalTypeField("datetime", SqlTypes.DATETIME) - .addStringField("datetime_tz") .addLogicalTypeField("date", SqlTypes.DATE) .addLogicalTypeField("time", SqlTypes.TIME) .build(); @@ -148,20 +148,14 @@ public Row apply(Long num) { .addValue(LongStream.range(0, num % 10).boxed().collect(Collectors.toList())) .addValue(num % 2 == 0 ? null : nestedRow) .addValue(num) + .addValue(new DateTime(num).withZone(DateTimeZone.forOffsetHoursMinutes(3, 25))) .addValue(DateTimeUtil.timestampFromMicros(num)) - .addValue( - // increment minutes. in testWritePartitionedData, we will partition by hours - OffsetDateTime.of(2024, 9, 10, 12, 30, 30, 0, ZONE_OFFSET) - .plusMinutes(num) - .toString()) .addValue(DateTimeUtil.dateFromDays(Integer.parseInt(strNum))) .addValue(DateTimeUtil.timeFromMicros(num)) .build(); } }; - private static final ZoneOffset ZONE_OFFSET = ZoneOffset.ofHoursMinutes(3, 25); - private static final org.apache.iceberg.Schema ICEBERG_SCHEMA = IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA); private static final SimpleFunction RECORD_FUNC = @@ -205,26 +199,6 @@ public void setUp() { catalog = new HadoopCatalog(catalogHadoopConf, warehouseLocation); } - /** - * Currently, Iceberg's timestamptz is only supported with the Beam String type. However, our - * {@link IcebergUtils} naturally converts a Beam String to an Iceberg String. This method - * replaces a String field with it's intended timestamp type. - */ - private Schema getIcebergSchemaWithTimestampTz() { - List fields = new ArrayList<>(ICEBERG_SCHEMA.columns()); - Types.NestedField datetimeTz = ICEBERG_SCHEMA.findField("datetime_tz"); - int index = fields.indexOf(datetimeTz); - fields.set( - index, - Types.NestedField.of( - datetimeTz.fieldId(), - datetimeTz.isOptional(), - datetimeTz.name(), - Types.TimestampType.withZone())); - - return new Schema(fields); - } - /** Populates the Iceberg table and Returns a {@link List} of expected elements. */ private List populateTable(Table table) throws IOException { double recordsPerShardFraction = NUM_RECORDS.doubleValue() / NUM_SHARDS; @@ -263,10 +237,6 @@ private List populateTable(Table table) throws IOException { } private List readRecords(Table table) { - return readRecords(table, false); - } - - private List readRecords(Table table, boolean adjustBackToZone) { Schema tableSchema = table.schema(); TableScan tableScan = table.newScan().project(tableSchema); List writtenRecords = new ArrayList<>(); @@ -285,13 +255,6 @@ private List readRecords(Table table, boolean adjustBackToZone) { .build(); for (Record rec : iterable) { - // Iceberg returns timestamps in UTC, so we apply the zone offset again to check - // correctness with our initial Beam rows - if (adjustBackToZone) { - OffsetDateTime dt = (OffsetDateTime) rec.getField("datetime_tz"); - dt = dt.withOffsetSameInstant(ZONE_OFFSET); - rec.setField("datetime_tz", dt); - } writtenRecords.add(rec); } } @@ -319,8 +282,7 @@ private Map managedIcebergConfig(String tableId) { */ @Test public void testRead() throws Exception { - Schema schema = getIcebergSchemaWithTimestampTz(); - Table table = catalog.createTable(TableIdentifier.parse(tableId), schema); + Table table = catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA); List expectedRows = populateTable(table); @@ -343,38 +305,33 @@ public void testRead() throws Exception { @Test public void testWrite() { // Write with Beam - // These lines are needed to inject a TimestampType.withZone() type in the table schema. - // Beam doesn't support creating tables with this type, so we need to create it beforehand - Schema newSchema = getIcebergSchemaWithTimestampTz(); - Table table = catalog.createTable(TableIdentifier.parse(tableId), newSchema); - + // Expect the sink to create the table Map config = managedIcebergConfig(tableId); PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA); input.apply(Managed.write(Managed.ICEBERG).withConfig(config)); pipeline.run().waitUntilFinish(); + Table table = catalog.loadTable(TableIdentifier.parse(tableId)); + assertTrue(table.schema().sameSchema(ICEBERG_SCHEMA)); + // Read back and check records are correct - List returnedRecords = readRecords(table, true); + List returnedRecords = readRecords(table); assertThat( - returnedRecords, - containsInAnyOrder( - INPUT_ROWS.stream() - .map(row -> IcebergUtils.beamRowToIcebergRecord(newSchema, row)) - .toArray())); + returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); } @Test public void testWritePartitionedData() { // For an example row where bool=true, modulo_5=3, str=value_303, // this partition spec will create a partition like: /bool=true/modulo_5=3/str_trunc=value_3/ - Schema newSchema = getIcebergSchemaWithTimestampTz(); PartitionSpec partitionSpec = - PartitionSpec.builderFor(newSchema) + PartitionSpec.builderFor(ICEBERG_SCHEMA) .identity("bool") .identity("modulo_5") .truncate("str", "value_x".length()) .build(); - Table table = catalog.createTable(TableIdentifier.parse(tableId), newSchema, partitionSpec); + Table table = + catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA, partitionSpec); // Write with Beam Map config = managedIcebergConfig(tableId); @@ -383,13 +340,9 @@ public void testWritePartitionedData() { pipeline.run().waitUntilFinish(); // Read back and check records are correct - List returnedRecords = readRecords(table, true); + List returnedRecords = readRecords(table); assertThat( - returnedRecords, - containsInAnyOrder( - INPUT_ROWS.stream() - .map(row -> IcebergUtils.beamRowToIcebergRecord(newSchema, row)) - .toArray())); + returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); } private PeriodicImpulse getStreamingSource() { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index 46f2efcd68edd..5d0d8b7dba81e 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -147,16 +147,48 @@ public void testTimestamp() { micros, Types.TimestampType.withoutZone(), DateTimeUtil.timestampFromMicros(micros)); + + // Schema.FieldType.STRING + String val = "2024-10-08T13:18:20.053"; + LocalDateTime localDateTime = LocalDateTime.of(2024, 10, 8, 13, 18, 20, 53_000_000); + checkRowValueToRecordValue( + Schema.FieldType.STRING, val, Types.TimestampType.withoutZone(), localDateTime); } @Test public void testTimestampWithZone() { String val = "2024-10-08T13:18:20.053+03:27"; - OffsetDateTime offsetDateTime = - OffsetDateTime.of(2024, 10, 8, 13, 18, 20, 53_000_000, ZoneOffset.ofHoursMinutes(3, 27)); - // Schema.FieldType.String + DateTime dateTime = DateTime.parse(val); + OffsetDateTime offsetDateTime = OffsetDateTime.parse(val); + LocalDateTime localDateTime = + offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC).toLocalDateTime(); + // SqlTypes.DATETIME + checkRowValueToRecordValue( + Schema.FieldType.logicalType(SqlTypes.DATETIME), + localDateTime, + Types.TimestampType.withZone(), + offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC)); + + // Schema.FieldType.DATETIME checkRowValueToRecordValue( - Schema.FieldType.STRING, val, Types.TimestampType.withZone(), offsetDateTime); + Schema.FieldType.DATETIME, + dateTime, + Types.TimestampType.withZone(), + offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC)); + + // Schema.FieldType.INT64 + checkRowValueToRecordValue( + Schema.FieldType.INT64, + DateTimeUtil.microsFromTimestamptz(offsetDateTime), + Types.TimestampType.withZone(), + offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC)); + + // Schema.FieldType.STRING + checkRowValueToRecordValue( + Schema.FieldType.STRING, + val, + Types.TimestampType.withZone(), + offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC)); } @Test @@ -290,34 +322,49 @@ public void testTimestamp() { // Schema.FieldType.DATETIME DateTime dateTime = new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC); - checkRecordValueToRowValue( Types.TimestampType.withoutZone(), dateTime.getMillis() * 1000L, Schema.FieldType.DATETIME, dateTime); - - // Schema.FieldType.INT64 - long micros = 1234567890L; - checkRecordValueToRowValue( - Types.TimestampType.withoutZone(), - DateTimeUtil.timestamptzFromMicros(micros), - Schema.FieldType.INT64, - micros); } @Test public void testTimestampWithZone() { - String val = "2007-12-03T10:15:30+01:00"; - OffsetDateTime offsetDateTime = OffsetDateTime.parse(val); - LocalDateTime expectedDateTime = + String timestamp = "2024-10-08T13:18:20.053+03:27"; + OffsetDateTime offsetDateTime = OffsetDateTime.parse(timestamp); + LocalDateTime localDateTime = offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC).toLocalDateTime(); - // Schema.FieldType.String + // SqlTypes.DATETIME checkRecordValueToRowValue( Types.TimestampType.withZone(), offsetDateTime, Schema.FieldType.logicalType(SqlTypes.DATETIME), - expectedDateTime); + localDateTime); + checkRecordValueToRowValue( + Types.TimestampType.withZone(), + localDateTime, + Schema.FieldType.logicalType(SqlTypes.DATETIME), + localDateTime); + checkRecordValueToRowValue( + Types.TimestampType.withZone(), + DateTimeUtil.microsFromTimestamptz(offsetDateTime), + Schema.FieldType.logicalType(SqlTypes.DATETIME), + localDateTime); + + // Schema.FieldType.DATETIME + DateTime dateTime = DateTime.parse(timestamp).withZone(DateTimeZone.UTC); + checkRecordValueToRowValue( + Types.TimestampType.withZone(), offsetDateTime, Schema.FieldType.DATETIME, dateTime); + checkRecordValueToRowValue( + Types.TimestampType.withZone(), localDateTime, Schema.FieldType.DATETIME, dateTime); + checkRecordValueToRowValue( + Types.TimestampType.withZone(), + DateTimeUtil.microsFromTimestamptz(offsetDateTime), + Schema.FieldType.DATETIME, + dateTime); + checkRecordValueToRowValue( + Types.TimestampType.withZone(), timestamp, Schema.FieldType.DATETIME, dateTime); } @Test @@ -655,13 +702,13 @@ public void testDatetimeBeamToIceberg() { Schema beamSchema = Schema.builder() .addDateTimeField("datetime") - .addLogicalTypeField("sql_datetime", SqlTypes.DATETIME) + .addLogicalTypeField("datetime_tz", SqlTypes.DATETIME) .build(); org.apache.iceberg.Schema icebergSchema = new org.apache.iceberg.Schema( - required(1, "datetime", Types.TimestampType.withoutZone()), - required(2, "sql_datetime", Types.TimestampType.withoutZone())); + required(1, "datetime", Types.TimestampType.withZone()), + required(2, "datetime_tz", Types.TimestampType.withoutZone())); org.apache.iceberg.Schema convertedIcebergSchema = IcebergUtils.beamSchemaToIcebergSchema(beamSchema); @@ -680,7 +727,7 @@ public void testTimestampIcebergToBeam() { Schema expectedBeamSchema = Schema.builder() .addLogicalTypeField("timestamp", SqlTypes.DATETIME) - .addLogicalTypeField("timestamp_tz", SqlTypes.DATETIME) + .addDateTimeField("timestamp_tz") .build(); Schema convertedBeamSchema = IcebergUtils.icebergSchemaToBeamSchema(icebergSchema); From 9016e1388c364821f6adf9072ddb68586f0c236e Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 9 Oct 2024 23:24:07 +0300 Subject: [PATCH 11/14] doc cleanup --- .../main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 11e622c6f7575..a6fba008f530f 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -194,9 +194,9 @@ * * *

    Note: If you expect Beam to create the Iceberg table at runtime, please provide {@code - * SqlTypes.DATETIME} and for a {@code timestamp} column and {@code DATETIME} for a {@code - * timestamptz} column. If the table does not exist, Beam will treat {@code STRING} and {@code - * INT64} at face-value and create equivalent column types. + * SqlTypes.DATETIME} for a {@code timestamp} column and {@code DATETIME} for a {@code timestamptz} + * column. If the table does not exist, Beam will treat {@code STRING} and {@code INT64} at + * face-value and create equivalent column types. * *

    For Iceberg reads, the connector will produce Beam {@code SqlTypes.DATETIME} types for * Iceberg's {@code timestamp} and {@code DATETIME} types for {@code timestamptz}. From f54d4817a892128380ec128a7d31fe9fe2c1554e Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 10 Oct 2024 12:32:32 +0300 Subject: [PATCH 12/14] clean up test cases --- .../io/iceberg/hive/IcebergHiveCatalogIT.java | 4 ++ .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 51 +++---------------- 2 files changed, 10 insertions(+), 45 deletions(-) diff --git a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java index 5745bc549af9e..ca4d862c2c72a 100644 --- a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java +++ b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java @@ -67,6 +67,8 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.util.DateTimeUtil; import org.apache.thrift.TException; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -102,6 +104,7 @@ public class IcebergHiveCatalogIT { .addArrayField("arr_long", Schema.FieldType.INT64) .addRowField("row", NESTED_ROW_SCHEMA) .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) + .addDateTimeField("datetime_tz") .addLogicalTypeField("datetime", SqlTypes.DATETIME) .addLogicalTypeField("date", SqlTypes.DATE) .addLogicalTypeField("time", SqlTypes.TIME) @@ -132,6 +135,7 @@ public Row apply(Long num) { .addValue(LongStream.range(1, num % 10).boxed().collect(Collectors.toList())) .addValue(nestedRow) .addValue(num % 2 == 0 ? null : nestedRow) + .addValue(new DateTime(num).withZone(DateTimeZone.forOffsetHoursMinutes(3, 25))) .addValue(DateTimeUtil.timestampFromMicros(num)) .addValue(DateTimeUtil.dateFromDays(Integer.parseInt(strNum))) .addValue(DateTimeUtil.timeFromMicros(num)) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index 5d0d8b7dba81e..134f05c34bfb3 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -551,7 +551,7 @@ public void testStructBeamFieldTypeToIcebergFieldType() { new BeamFieldTypeTestCase( 1, Schema.FieldType.row(BEAM_SCHEMA_PRIMITIVE), - 10, + 11, Types.StructType.of(ICEBERG_SCHEMA_PRIMITIVE.columns())), new BeamFieldTypeTestCase( 15, @@ -663,6 +663,7 @@ public void testMapBeamFieldTypeToIcebergFieldType() { .addNullableStringField("str") .addNullableBooleanField("bool") .addByteArrayField("bytes") + .addDateTimeField("datetime_tz") .addLogicalTypeField("datetime", SqlTypes.DATETIME) .addLogicalTypeField("time", SqlTypes.TIME) .addLogicalTypeField("date", SqlTypes.DATE) @@ -677,9 +678,10 @@ public void testMapBeamFieldTypeToIcebergFieldType() { optional(5, "str", Types.StringType.get()), optional(6, "bool", Types.BooleanType.get()), required(7, "bytes", Types.BinaryType.get()), - required(8, "datetime", Types.TimestampType.withoutZone()), - required(9, "time", Types.TimeType.get()), - required(10, "date", Types.DateType.get())); + required(8, "datetime_tz", Types.TimestampType.withZone()), + required(9, "datetime", Types.TimestampType.withoutZone()), + required(10, "time", Types.TimeType.get()), + required(11, "date", Types.DateType.get())); @Test public void testPrimitiveBeamSchemaToIcebergSchema() { @@ -696,44 +698,6 @@ public void testPrimitiveIcebergSchemaToBeamSchema() { assertEquals(BEAM_SCHEMA_PRIMITIVE, convertedBeamSchema); } - @Test - public void testDatetimeBeamToIceberg() { - // Support converting both DATETIME types to Iceberg TIMESTAMP type - Schema beamSchema = - Schema.builder() - .addDateTimeField("datetime") - .addLogicalTypeField("datetime_tz", SqlTypes.DATETIME) - .build(); - - org.apache.iceberg.Schema icebergSchema = - new org.apache.iceberg.Schema( - required(1, "datetime", Types.TimestampType.withZone()), - required(2, "datetime_tz", Types.TimestampType.withoutZone())); - - org.apache.iceberg.Schema convertedIcebergSchema = - IcebergUtils.beamSchemaToIcebergSchema(beamSchema); - - assertTrue(convertedIcebergSchema.sameSchema(icebergSchema)); - } - - @Test - public void testTimestampIcebergToBeam() { - // Both timestamp and timestamptz types will convert to Beam SqlTypes.DATETIME - org.apache.iceberg.Schema icebergSchema = - new org.apache.iceberg.Schema( - required(1, "timestamp", Types.TimestampType.withoutZone()), - required(2, "timestamp_tz", Types.TimestampType.withZone())); - - Schema expectedBeamSchema = - Schema.builder() - .addLogicalTypeField("timestamp", SqlTypes.DATETIME) - .addDateTimeField("timestamp_tz") - .build(); - Schema convertedBeamSchema = IcebergUtils.icebergSchemaToBeamSchema(icebergSchema); - - assertEquals(expectedBeamSchema, convertedBeamSchema); - } - static final Schema BEAM_SCHEMA_LIST = Schema.builder() .addIterableField("arr_str", Schema.FieldType.STRING) @@ -758,9 +722,6 @@ public void testArrayBeamSchemaToIcebergSchema() { public void testArrayIcebergSchemaToBeamSchema() { Schema convertedBeamSchema = IcebergUtils.icebergSchemaToBeamSchema(ICEBERG_SCHEMA_LIST); - System.out.println(convertedBeamSchema); - System.out.println(BEAM_SCHEMA_LIST); - assertEquals(BEAM_SCHEMA_LIST, convertedBeamSchema); } From 969263e245c02fb0450d1ea44d97d52449fa861d Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 10 Oct 2024 12:38:28 +0300 Subject: [PATCH 13/14] clean up doc --- .../org/apache/beam/sdk/io/iceberg/IcebergIO.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index a6fba008f530f..fa4ff9714c7f8 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -179,17 +179,14 @@ * *

    Note on timestamps

    * - *

    Iceberg has two timestamp types: {@code timestamp} and {@code timestamptz}. For the latter, - * Iceberg will first resolve the timestamp to UTC before storing it. - * *

    For an existing table, the following Beam types are supported for both {@code timestamp} and * {@code timestamptz}: * *

      - *
    • {@code SqlTypes.DATETIME} --> Using a java.time.LocalDateTime object - *
    • {@code DATETIME} --> Using a org.joda.time.DateTime object - *
    • {@code INT64} --> Using a long representing micros since EPOCH - *
    • {@code STRING} --> Using a timestamp string representation (e.g. {@code + *
    • {@code SqlTypes.DATETIME} --> Using a {@link java.time.LocalDateTime} object + *
    • {@code DATETIME} --> Using a {@link org.joda.time.DateTime} object + *
    • {@code INT64} --> Using a {@link Long} representing micros since EPOCH + *
    • {@code STRING} --> Using a timestamp {@link String} representation (e.g. {@code * "2024-10-08T13:18:20.053+03:27"}) *
    * From 86e8271c4238c00c0d05eaf378ebb4e421e98253 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 10 Oct 2024 16:47:00 +0300 Subject: [PATCH 14/14] address nit --- .../beam/sdk/io/iceberg/IcebergUtils.java | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index 27c587bd22ed4..ef19a58813661 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -433,24 +433,10 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { rowBuilder.addValue(icebergValue); break; case DATETIME: - long micros; - if (icebergValue instanceof OffsetDateTime) { - micros = DateTimeUtil.microsFromTimestamptz((OffsetDateTime) icebergValue); - } else if (icebergValue instanceof LocalDateTime) { - micros = DateTimeUtil.microsFromTimestamp((LocalDateTime) icebergValue); - } else if (icebergValue instanceof Long) { - micros = (long) icebergValue; - } else if (icebergValue instanceof String) { - rowBuilder.addValue(DateTime.parse((String) icebergValue)); - break; - } else { - throw new UnsupportedOperationException( - "Unsupported Iceberg type for Beam type DATETIME: " + icebergValue.getClass()); - } - // Iceberg uses a long for micros + // Iceberg uses a long for micros. // Beam DATETIME uses joda's DateTime, which only supports millis, // so we do lose some precision here - rowBuilder.addValue(new DateTime(micros / 1000L)); + rowBuilder.addValue(getBeamDateTimeValue(icebergValue)); break; case BYTES: // Iceberg uses ByteBuffer; Beam uses byte[] @@ -475,6 +461,23 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { return rowBuilder.build(); } + private static DateTime getBeamDateTimeValue(Object icebergValue) { + long micros; + if (icebergValue instanceof OffsetDateTime) { + micros = DateTimeUtil.microsFromTimestamptz((OffsetDateTime) icebergValue); + } else if (icebergValue instanceof LocalDateTime) { + micros = DateTimeUtil.microsFromTimestamp((LocalDateTime) icebergValue); + } else if (icebergValue instanceof Long) { + micros = (long) icebergValue; + } else if (icebergValue instanceof String) { + return DateTime.parse((String) icebergValue); + } else { + throw new UnsupportedOperationException( + "Unsupported Iceberg type for Beam type DATETIME: " + icebergValue.getClass()); + } + return new DateTime(micros / 1000L); + } + private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType type) { if (icebergValue instanceof String) { String strValue = (String) icebergValue;