Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
*/
package org.apache.beam.sdk.schemas.transforms;

import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -70,7 +73,9 @@ public static <OutputT> PTransform<PCollection<Row>, PCollection<OutputT>> fromR
*
* <p>This function allows converting between two types as long as the two types have
* <i>compatible</i> schemas. Two schemas are said to be <i>compatible</i> if they recursively
* have fields with the same names, but possibly different orders.
* have fields with the same names, but possibly different orders. If the source schema can be
* unboxed to match the target schema (i.e. the source schema contains a single field that is
* compatible with the target schema), then conversion also succeeds.
*/
public static <InputT, OutputT> PTransform<PCollection<InputT>, PCollection<OutputT>> to(
Class<OutputT> clazz) {
Expand All @@ -82,7 +87,9 @@ public static <InputT, OutputT> PTransform<PCollection<InputT>, PCollection<Outp
*
* <p>This function allows converting between two types as long as the two types have
* <i>compatible</i> schemas. Two schemas are said to be <i>compatible</i> if they recursively
* have fields with the same names, but possibly different orders.
* have fields with the same names, but possibly different orders. If the source schema can be
* unboxed to match the target schema (i.e. the source schema contains a single field that is
* compatible with the target schema), then conversion also succeeds.
*/
public static <InputT, OutputT> PTransform<PCollection<InputT>, PCollection<OutputT>> to(
TypeDescriptor<OutputT> typeDescriptor) {
Expand All @@ -92,11 +99,24 @@ public static <InputT, OutputT> PTransform<PCollection<InputT>, PCollection<Outp
private static class ConvertTransform<InputT, OutputT>
extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
TypeDescriptor<OutputT> outputTypeDescriptor;
Schema unboxedSchema = null;

ConvertTransform(TypeDescriptor<OutputT> outputTypeDescriptor) {
this.outputTypeDescriptor = outputTypeDescriptor;
}

@Nullable
private static Schema getBoxedNestedSchema(Schema schema) {
if (schema.getFieldCount() != 1) {
return null;
}
FieldType fieldType = schema.getField(0).getType();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would it work if the field is nullable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the entire Convert transform ignores nullable (this was already a bit of a bug with it). We appear to be missing a JIRA on this issue, so I'll file one.

if (!fieldType.getTypeName().isCompositeType()) {
return null;
}
return fieldType.getRowSchema();
}

@Override
@SuppressWarnings("unchecked")
public PCollection<OutputT> expand(PCollection<InputT> input) {
Expand Down Expand Up @@ -124,15 +144,21 @@ public PCollection<OutputT> expand(PCollection<InputT> input) {
registry.getSchema(outputTypeDescriptor),
registry.getToRowFunction(outputTypeDescriptor),
registry.getFromRowFunction(outputTypeDescriptor));
// assert matches input schema.
// TODO: Properly handle nullable.
if (!outputSchemaCoder.getSchema().assignableToIgnoreNullable(input.getSchema())) {
throw new RuntimeException(
"Cannot convert between types that don't have equivalent schemas."
+ " input schema: "
+ input.getSchema()
+ " output schema: "
+ outputSchemaCoder.getSchema());

Schema outputSchema = outputSchemaCoder.getSchema();
if (!outputSchema.assignableToIgnoreNullable(input.getSchema())) {
// We also support unboxing nested Row schemas, so attempt that.
// TODO: Support unboxing to primitive types as well.
unboxedSchema = getBoxedNestedSchema(input.getSchema());
if (unboxedSchema == null || !outputSchema.assignableToIgnoreNullable(unboxedSchema)) {
Schema checked = (unboxedSchema == null) ? input.getSchema() : unboxedSchema;
throw new RuntimeException(
"Cannot convert between types that don't have equivalent schemas."
+ " input schema: "
+ checked
+ " output schema: "
+ outputSchemaCoder.getSchema());
}
}
} catch (NoSuchSchemaException e) {
throw new RuntimeException("No schema registered for " + outputTypeDescriptor);
Expand All @@ -145,7 +171,9 @@ public PCollection<OutputT> expand(PCollection<InputT> input) {
new DoFn<InputT, OutputT>() {
@ProcessElement
public void processElement(@Element Row row, OutputReceiver<OutputT> o) {
o.output(outputSchemaCoder.getFromRowFunction().apply(row));
// Read the row, potentially unboxing if necessary.
Row input = (unboxedSchema == null) ? row : row.getValue(0);
o.output(outputSchemaCoder.getFromRowFunction().apply(input));
}
}))
.setSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@
*
* <pre>{@code
* PCollection<UserEvent> events = readUserEvents();
* PCollection<Row> rows = event.apply(Select.fieldNames("location.*"));
* PCollection<Row> rows = event.apply(Select.fieldNames("location")
* .apply(Convert.to(Location.class));
* }</pre>
*/
@Experimental(Kind.SCHEMAS)
Expand Down
Loading