Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types #32688

Merged
merged 15 commits into from
Oct 10, 2024
Merged
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.sdk.io.iceberg.hive.testutils.HiveMetastoreExtension;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
Expand Down Expand Up @@ -64,6 +65,7 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.thrift.TException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -100,6 +102,9 @@ public class IcebergHiveCatalogIT {
.addArrayField("arr_long", Schema.FieldType.INT64)
.addRowField("row", NESTED_ROW_SCHEMA)
.addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
.addLogicalTypeField("datetime", SqlTypes.DATETIME)
.addLogicalTypeField("date", SqlTypes.DATE)
.addLogicalTypeField("time", SqlTypes.TIME)
.build();

private static final SimpleFunction<Long, Row> ROW_FUNC =
Expand Down Expand Up @@ -127,6 +132,9 @@ public Row apply(Long num) {
.addValue(LongStream.range(1, num % 10).boxed().collect(Collectors.toList()))
.addValue(nestedRow)
.addValue(num % 2 == 0 ? null : nestedRow)
.addValue(DateTimeUtil.timestampFromMicros(num))
.addValue(DateTimeUtil.dateFromDays(Integer.parseInt(strNum)))
.addValue(DateTimeUtil.timeFromMicros(num))
.build();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,13 @@
* <td> DOUBLE </td> <td> DOUBLE </td>
* </tr>
* <tr>
* <td> DATETIME </td> <td> STRING </td>
* <td> SqlTypes.DATETIME </td> <td> TIMESTAMP </td>
* </tr>
* <tr>
* <td> SqlTypes.DATE </td> <td> DATE </td>
* </tr>
* <tr>
* <td> SqlTypes.TIME </td> <td> TIME </td>
* </tr>
* <tr>
* <td> ITERABLE </td> <td> LIST </td>
Expand All @@ -157,6 +163,12 @@
* </tr>
* </table>
*
* <p><b>Note:</b> {@code SqlTypes} are Beam logical types.
*
* <p><b>Note:</b> Both of Beam's {@code DATETIME} and {@code SqlTypes.DATETIME} types will convert
* to Iceberg's {@code TIMESTAMP} type. In the opposite direction, Iceberg's {@code TIMESTAMP} will
* only convert to Beam's {@code SqlTypes.DATETIME}.
*
* <h3>Dynamic Destinations</h3>
*
* <p>Managed Iceberg supports writing to dynamic destinations. To do so, please provide an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand All @@ -34,15 +38,15 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.SerializableFunction;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Instant;

/** Utilities for converting between Beam and Iceberg types. */
/** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */
public class IcebergUtils {
// This is made public for users convenience, as many may have more experience working with
// Iceberg types.

private IcebergUtils() {}

private static final Map<Schema.TypeName, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
Expand All @@ -54,6 +58,14 @@ private IcebergUtils() {}
.put(Schema.TypeName.DOUBLE, Types.DoubleType.get())
.put(Schema.TypeName.STRING, Types.StringType.get())
.put(Schema.TypeName.BYTES, Types.BinaryType.get())
.put(Schema.TypeName.DATETIME, Types.TimestampType.withoutZone())
.build();

private static final Map<String, Type> BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES =
ImmutableMap.<String, Type>builder()
.put(SqlTypes.DATE.getIdentifier(), Types.DateType.get())
.put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get())
.put(SqlTypes.DATETIME.getIdentifier(), Types.TimestampType.withoutZone())
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
.build();

private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
Expand All @@ -69,9 +81,15 @@ private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
case DOUBLE:
return Schema.FieldType.DOUBLE;
case DATE:
return Schema.FieldType.logicalType(SqlTypes.DATE);
case TIME:
case TIMESTAMP: // TODO: Logical types?
return Schema.FieldType.DATETIME;
return Schema.FieldType.logicalType(SqlTypes.TIME);
case TIMESTAMP:
// Beam has two 'DATETIME' types: SqlTypes.DATETIME and Schema.FieldType.DATETIME, but
// we have to choose one type to output.
// The former uses the `java.time` library and the latter uses the `org.joda.time` library
// Iceberg API leans towards `java.time`, so we output the SqlTypes.DATETIME logical type
return Schema.FieldType.logicalType(SqlTypes.DATETIME);
case STRING:
return Schema.FieldType.STRING;
case UUID:
Expand Down Expand Up @@ -151,6 +169,14 @@ static TypeAndMaxId beamFieldTypeToIcebergFieldType(
// other types.
return new TypeAndMaxId(
--nestedFieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
} else if (beamType.getTypeName().isLogicalType()) {
String logicalTypeIdentifier =
checkArgumentNotNull(beamType.getLogicalType()).getIdentifier();
@Nullable Type type = BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES.get(logicalTypeIdentifier);
if (type == null) {
throw new RuntimeException("Unsupported Beam logical type " + logicalTypeIdentifier);
}
return new TypeAndMaxId(--nestedFieldId, type);
} else if (beamType.getTypeName().isCollectionType()) { // ARRAY or ITERABLE
Schema.FieldType beamCollectionType =
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType());
Expand Down Expand Up @@ -227,8 +253,6 @@ static TypeAndMaxId beamFieldTypeToIcebergFieldType(
*
* <p>The following unsupported Beam types will be defaulted to {@link Types.StringType}:
* <li>{@link Schema.TypeName.DECIMAL}
* <li>{@link Schema.TypeName.DATETIME}
* <li>{@link Schema.TypeName.LOGICAL_TYPE}
*/
public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema schema) {
List<Types.NestedField> fields = new ArrayList<>(schema.getFieldCount());
Expand Down Expand Up @@ -282,12 +306,24 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row
Optional.ofNullable(value.getDouble(name)).ifPresent(v -> rec.setField(name, v));
break;
case DATE:
throw new UnsupportedOperationException("Date fields not yet supported");
Optional.ofNullable(value.getLogicalTypeValue(name, LocalDate.class))
.ifPresent(v -> rec.setField(name, v));
break;
case TIME:
throw new UnsupportedOperationException("Time fields not yet supported");
Optional.ofNullable(value.getLogicalTypeValue(name, LocalTime.class))
.ifPresent(v -> rec.setField(name, v));
break;
case TIMESTAMP:
Optional.ofNullable(value.getDateTime(name))
.ifPresent(v -> rec.setField(name, v.getMillis()));
Object val = value.getValue(name);
if (val instanceof Instant) { // case Schema.FieldType.DATETIME
rec.setField(name, DateTimeUtil.timestampFromMicros(((Instant) val).getMillis()));
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
} else if (val instanceof LocalDateTime) { // case SqlTypes.DATETIME
rec.setField(name, val);
} else {
String invalidType = val == null ? "unknown" : val.getClass().toString();
throw new IllegalStateException(
"Unexpected Beam type for Iceberg TIMESTAMP: " + invalidType);
}
break;
case STRING:
Optional.ofNullable(value.getString(name)).ifPresent(v -> rec.setField(name, v));
Expand Down Expand Up @@ -322,6 +358,15 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row
}
}

static final Map<String, SerializableFunction<Object, Object>> LOGICAL_TYPE_CONVERTERS =
ImmutableMap.<String, SerializableFunction<Object, Object>>builder()
.put(SqlTypes.DATE.getIdentifier(), val -> SqlTypes.DATE.toBaseType((LocalDate) val))
.put(SqlTypes.TIME.getIdentifier(), val -> SqlTypes.TIME.toBaseType((LocalTime) val))
.put(
SqlTypes.DATETIME.getIdentifier(),
val -> SqlTypes.DATETIME.toBaseType((LocalDateTime) val))
.build();

/** Converts an Iceberg {@link Record} to a Beam {@link Row}. */
public static Row icebergRecordToBeamRow(Schema schema, Record record) {
Row.Builder rowBuilder = Row.withSchema(schema);
Expand Down Expand Up @@ -369,8 +414,19 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) {
rowBuilder.addValue(icebergRecordToBeamRow(nestedSchema, nestedRecord));
break;
case LOGICAL_TYPE:
throw new UnsupportedOperationException(
"Cannot convert iceberg field to Beam logical type");
Schema.LogicalType<?, ?> logicalType = field.getType().getLogicalType();
if (logicalType == null) {
throw new RuntimeException("Unexpected null Beam logical type " + field.getType());
}
@Nullable
SerializableFunction<Object, Object> converter =
LOGICAL_TYPE_CONVERTERS.get(logicalType.getIdentifier());
if (converter == null) {
throw new RuntimeException(
"Unsupported Beam logical type " + logicalType.getIdentifier());
}
rowBuilder.addValue(icebergValue);
break;
default:
throw new UnsupportedOperationException(
"Unsupported Beam type: " + field.getType().getTypeName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.stream.Stream;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
Expand Down Expand Up @@ -71,6 +72,7 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.util.DateTimeUtil;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -109,6 +111,9 @@ public class IcebergIOIT implements Serializable {
.addArrayField("arr_long", org.apache.beam.sdk.schemas.Schema.FieldType.INT64)
.addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
.addNullableInt64Field("nullable_long")
.addLogicalTypeField("datetime", SqlTypes.DATETIME)
.addLogicalTypeField("date", SqlTypes.DATE)
.addLogicalTypeField("time", SqlTypes.TIME)
.build();

private static final SimpleFunction<Long, Row> ROW_FUNC =
Expand Down Expand Up @@ -138,6 +143,9 @@ public Row apply(Long num) {
.addValue(LongStream.range(0, num % 10).boxed().collect(Collectors.toList()))
.addValue(num % 2 == 0 ? null : nestedRow)
.addValue(num)
.addValue(DateTimeUtil.timestampFromMicros(num))
.addValue(DateTimeUtil.dateFromDays(Integer.parseInt(strNum)))
.addValue(DateTimeUtil.timeFromMicros(num))
.build();
}
};
Expand Down
Loading
Loading