Skip to content

Commit

Permalink
Attempt deserialize all non-standard portable logical types from proto (
Browse files Browse the repository at this point in the history
#24910)

* Attempt deserialize all non-standard logical types from proto
* Fixes portable and not-yet-standard logical type get deserialized
  to UnknownLogicalType
  • Loading branch information
Abacn authored Jan 6, 2023
1 parent de3ebdb commit 5096daa
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.apache.commons.lang3.ClassUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Utility methods for translating schemas. */
@Experimental(Kind.SCHEMAS)
Expand All @@ -71,6 +74,7 @@
"rawtypes"
})
public class SchemaTranslation {
private static final Logger LOG = LoggerFactory.getLogger(SchemaTranslation.class);

private static final String URN_BEAM_LOGICAL_DECIMAL = FixedPrecisionNumeric.BASE_IDENTIFIER;
private static final String URN_BEAM_LOGICAL_JAVASDK = "beam:logical_type:javasdk:v1";
Expand Down Expand Up @@ -124,8 +128,8 @@ private static SchemaApi.Field fieldToProto(
.build();
}

private static SchemaApi.FieldType fieldTypeToProto(
FieldType fieldType, boolean serializeLogicalType) {
@VisibleForTesting
static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType, boolean serializeLogicalType) {
SchemaApi.FieldType.Builder builder = SchemaApi.FieldType.newBuilder();
switch (fieldType.getTypeName()) {
case ROW:
Expand Down Expand Up @@ -297,7 +301,8 @@ private static Field fieldFromProto(SchemaApi.Field protoField) {
.withDescription(protoField.getDescription());
}

private static FieldType fieldTypeFromProto(SchemaApi.FieldType protoFieldType) {
@VisibleForTesting
static FieldType fieldTypeFromProto(SchemaApi.FieldType protoFieldType) {
FieldType fieldType = fieldTypeFromProtoWithoutNullable(protoFieldType);

if (protoFieldType.getNullable()) {
Expand Down Expand Up @@ -426,26 +431,32 @@ private static FieldType fieldTypeFromProtoWithoutNullable(SchemaApi.FieldType p
return FieldType.DATETIME;
} else if (urn.equals(URN_BEAM_LOGICAL_DECIMAL)) {
return FieldType.DECIMAL;
} else if (urn.equals(URN_BEAM_LOGICAL_JAVASDK)) {
return FieldType.logicalType(
(LogicalType)
SerializableUtils.deserializeFromByteArray(
logicalType.getPayload().toByteArray(), "logicalType"));
} else {
@Nullable FieldType argumentType = null;
@Nullable Object argumentValue = null;
if (logicalType.hasArgumentType()) {
argumentType = fieldTypeFromProto(logicalType.getArgumentType());
argumentValue = fieldValueFromProto(argumentType, logicalType.getArgument());
} else if (urn.startsWith("beam:logical_type:")) {
try {
return FieldType.logicalType(
(LogicalType)
SerializableUtils.deserializeFromByteArray(
logicalType.getPayload().toByteArray(), "logicalType"));
} catch (IllegalArgumentException e) {
LOG.warn(
"Unable to deserialize the logical type {} from proto. Mark as UnknownLogicalType.",
urn);
}
return FieldType.logicalType(
new UnknownLogicalType(
urn,
logicalType.getPayload().toByteArray(),
argumentType,
argumentValue,
fieldTypeFromProto(logicalType.getRepresentation())));
}
// assemble an UnknownLogicalType
@Nullable FieldType argumentType = null;
@Nullable Object argumentValue = null;
if (logicalType.hasArgumentType()) {
argumentType = fieldTypeFromProto(logicalType.getArgumentType());
argumentValue = fieldValueFromProto(argumentType, logicalType.getArgument());
}
return FieldType.logicalType(
new UnknownLogicalType(
urn,
logicalType.getPayload().toByteArray(),
argumentType,
argumentValue,
fieldTypeFromProto(logicalType.getRepresentation())));
default:
throw new IllegalArgumentException(
"Unexpected type_info: " + protoFieldType.getTypeInfoCase());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,16 @@
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.DateTime;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration;
import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable;
import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
Expand Down Expand Up @@ -395,6 +402,45 @@ public void typeInfoNotSet() {
}
}

/** Test schema translation of logical types. */
@RunWith(Parameterized.class)
public static class LogicalTypesTest {
@Parameters(name = "{index}: {0}")
public static Iterable<Schema.FieldType> data() {
return ImmutableList.<Schema.FieldType>builder()
.add(FieldType.logicalType(SqlTypes.DATE))
.add(FieldType.logicalType(SqlTypes.TIME))
.add(FieldType.logicalType(SqlTypes.DATETIME))
.add(FieldType.logicalType(SqlTypes.TIMESTAMP))
.add(FieldType.logicalType(new NanosInstant()))
.add(FieldType.logicalType(new NanosDuration()))
.add(FieldType.logicalType(FixedBytes.of(10)))
.add(FieldType.logicalType(VariableBytes.of(10)))
.add(FieldType.logicalType(FixedString.of(10)))
.add(FieldType.logicalType(VariableString.of(10)))
.add(FieldType.logicalType(FixedPrecisionNumeric.of(10)))
.build();
}

@Parameter(0)
public Schema.FieldType fieldType;

@Test
public void testPortableLogicalTypeSerializeDeserilizeCorrectly() {
SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, true);
Schema.FieldType translated = SchemaTranslation.fieldTypeFromProto(proto);

assertThat(
translated.getLogicalType().getClass(), equalTo(fieldType.getLogicalType().getClass()));
assertThat(
translated.getLogicalType().getArgumentType(),
equalTo(fieldType.getLogicalType().getArgumentType()));
assertThat(
translated.getLogicalType().getArgument(),
equalTo(fieldType.getLogicalType().getArgument()));
}
}

/** A simple logical type that has no argument. */
private static class NullArgumentLogicalType implements Schema.LogicalType<String, String> {
public static final String IDENTIFIER = "beam:logical_type:null_argument:v1";
Expand Down

0 comments on commit 5096daa

Please sign in to comment.