From b6422946df18be80b3c21b71458f93bb44ddc5ab Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Wed, 6 Mar 2019 12:38:02 -0800 Subject: [PATCH 01/14] Initial commit. --- .../beam/sdk/schemas/transforms/CoGroup.java | 5 +- .../beam/sdk/schemas/transforms/Group.java | 5 +- .../schemas/transforms/SchemaAggregateFn.java | 9 +- .../beam/sdk/schemas/transforms/Select.java | 5 +- .../beam/sdk/schemas/utils/SelectHelpers.java | 180 +++++++++++++----- 5 files changed, 143 insertions(+), 61 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java index 7dd961a93fde..1db079878a35 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java @@ -361,7 +361,7 @@ private static JoinInformation from( // The key schema contains the field names from the first PCollection specified. FieldAccessDescriptor resolved = fieldAccessDescriptor.withOrderByFieldInsertionOrder().resolve(schema); - Schema currentKeySchema = SelectHelpers.getOutputSchema(schema, resolved); + Schema currentKeySchema = SelectHelpers.getOutputSchema(schema, resolved, true); if (keySchema == null) { keySchema = currentKeySchema; } else { @@ -396,7 +396,8 @@ private static PCollection> extractKey( @ProcessElement public void process(@Element Row row, OutputReceiver> o) { o.output( - KV.of(SelectHelpers.selectRow(row, keyFields, schema, keySchema), row)); + KV.of(SelectHelpers.selectRow(row, keyFields, schema, keySchema, true), + row)); } })) .setCoder(KvCoder.of(SchemaCoder.of(keySchema), SchemaCoder.of(schema))); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java index cf356f4852e7..c03c2138110a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java @@ -631,7 +631,7 @@ public CombineFieldsByFields agg public PCollection>> expand(PCollection input) { Schema schema = input.getSchema(); FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema); - keySchema = SelectHelpers.getOutputSchema(schema, resolved); + keySchema = SelectHelpers.getOutputSchema(schema, resolved, true); return input .apply( "Group by fields", @@ -644,7 +644,8 @@ public void process( OutputReceiver> o) { o.output( KV.of( - SelectHelpers.selectRow(row, resolved, schema, keySchema), element)); + SelectHelpers.selectRow(row, resolved, schema, keySchema, true), + element)); } })) .setCoder(KvCoder.of(SchemaCoder.of(keySchema), input.getCoder())) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java index 77db21846133..985ecd7888f1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java @@ -97,7 +97,8 @@ static class FieldAggregation implements Serializable { @Nullable Schema inputSchema) { if (inputSchema != null) { this.fieldsToAggregate = fieldsToAggregate.resolve(inputSchema); - this.inputSubSchema = SelectHelpers.getOutputSchema(inputSchema, this.fieldsToAggregate); + this.inputSubSchema = SelectHelpers.getOutputSchema(inputSchema, this + .fieldsToAggregate, true); this.unnestedInputSubSchema = Unnest.getUnnestedSchema(inputSubSchema); this.needsUnnesting = !inputSchema.equals(unnestedInputSubSchema); } else { @@ -245,7 +246,8 @@ public OutputT apply(InputT input) { row, fieldAggregation.fieldsToAggregate, row.getSchema(), - fieldAggregation.inputSubSchema); + fieldAggregation.inputSubSchema, + true); if (fieldAggregation.needsUnnesting) { selected = Unnest.unnestRow(selected, fieldAggregation.unnestedInputSubSchema); } @@ -271,7 +273,8 @@ public Row apply(T input) { row, fieldAggregation.fieldsToAggregate, row.getSchema(), - fieldAggregation.inputSubSchema); + fieldAggregation.inputSubSchema, + true); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java index 8686a7a279bc..b8d1e0c728fe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java @@ -98,7 +98,7 @@ public static Select fieldAccess(FieldAccessDescriptor fieldAccessDescrip public PCollection expand(PCollection input) { Schema inputSchema = input.getSchema(); FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(inputSchema); - Schema outputSchema = SelectHelpers.getOutputSchema(inputSchema, resolved); + Schema outputSchema = SelectHelpers.getOutputSchema(inputSchema, resolved, true); return input .apply( @@ -114,7 +114,8 @@ public PCollection expand(PCollection input) { @ProcessElement public void process( @FieldAccess("selectFields") @Element Row row, OutputReceiver r) { - r.output(SelectHelpers.selectRow(row, resolved, inputSchema, outputSchema)); + r.output(SelectHelpers.selectRow(row, resolved, inputSchema, outputSchema, + true)); } })) .setRowSchema(outputSchema); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java index e3baf68df954..063b32863450 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java @@ -19,6 +19,7 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState; import java.util.List; import java.util.Map; @@ -30,31 +31,72 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps; -/** Helper methods to select fields from a Schema. */ +/** Helper methods to select subrows out of rows. */ public class SelectHelpers { - // Currently we don't flatten selected nested fields. - public static Schema getOutputSchema( + private static boolean singleFieldSelected( Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor) { + if (fieldAccessDescriptor.getAllFields()) { + return false; + } + if (fieldAccessDescriptor.fieldIdsAccessed().size() == 1 + && fieldAccessDescriptor.getNestedFieldsAccessed().isEmpty()) { + int fieldId = fieldAccessDescriptor.fieldIdsAccessed().iterator().next(); + Field field = inputSchema.getField(fieldId); + return TypeName.ROW.equals(field.getType().getTypeName()); + } + + if (fieldAccessDescriptor.fieldIdsAccessed().isEmpty() + && fieldAccessDescriptor.getNestedFieldsAccessed().size() == 1) { + FieldDescriptor key = + fieldAccessDescriptor.getNestedFieldsAccessed().keySet().iterator().next(); + Field field = inputSchema.getField(checkNotNull(key.getFieldId())); + if (!field.getType().getTypeName().isCollectionType() + && !field.getType().getTypeName().isMapType() + && key.getQualifiers().isEmpty()) { + return true; + } + } + return false; + } + + /** + * Get the output schema resulting from selecting the given {@link FieldAccessDescriptor} from the + * given schema. + */ + public static Schema getOutputSchema( + Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor, boolean unnest) { if (fieldAccessDescriptor.getAllFields()) { return inputSchema; } + boolean selectsNestedSingle = unnest && singleFieldSelected(inputSchema, fieldAccessDescriptor); + Schema.Builder builder = new Schema.Builder(); for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) { + if (selectsNestedSingle) { + return inputSchema.getField(fieldId).getType().getRowSchema(); + } builder.addField(inputSchema.getField(fieldId)); } for (Map.Entry nested : fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) { FieldDescriptor fieldDescriptor = nested.getKey(); + FieldAccessDescriptor nestedAccess = nested.getValue(); Field field = inputSchema.getField(checkNotNull(fieldDescriptor.getFieldId())); - FieldType outputType = - getOutputSchemaHelper( - field.getType(), nested.getValue(), fieldDescriptor.getQualifiers(), 0); - builder.addField(field.getName(), outputType); + if (selectsNestedSingle) { + checkState(field.getType().getTypeName().isCompositeType()); + return getOutputSchema(field.getType().getRowSchema(), nestedAccess, unnest); + } else { + FieldType outputType = + getOutputSchemaHelper( + field.getType(), nestedAccess, fieldDescriptor.getQualifiers(), unnest, 0); + builder.addField(field.getName(), outputType); + } } return builder.build(); } @@ -63,12 +105,14 @@ private static FieldType getOutputSchemaHelper( FieldType inputFieldType, FieldAccessDescriptor fieldAccessDescriptor, List qualifiers, + boolean unnest, int qualifierPosition) { if (qualifierPosition >= qualifiers.size()) { // We have walked through any containers, and are at a row type. Extract the subschema // for the row, preserving nullable attributes. checkArgument(inputFieldType.getTypeName().isCompositeType()); - return FieldType.row(getOutputSchema(inputFieldType.getRowSchema(), fieldAccessDescriptor)) + return FieldType.row(getOutputSchema(inputFieldType.getRowSchema(), fieldAccessDescriptor, + unnest)) .withNullable(inputFieldType.getNullable()); } @@ -79,7 +123,7 @@ private static FieldType getOutputSchemaHelper( FieldType componentType = checkNotNull(inputFieldType.getCollectionElementType()); FieldType outputComponent = getOutputSchemaHelper( - componentType, fieldAccessDescriptor, qualifiers, qualifierPosition + 1) + componentType, fieldAccessDescriptor, qualifiers, unnest,qualifierPosition + 1) .withNullable(componentType.getNullable()); return FieldType.array(outputComponent).withNullable(inputFieldType.getNullable()); case MAP: @@ -88,7 +132,7 @@ private static FieldType getOutputSchemaHelper( FieldType valueType = checkNotNull(inputFieldType.getMapValueType()); FieldType outputValueType = getOutputSchemaHelper( - valueType, fieldAccessDescriptor, qualifiers, qualifierPosition + 1) + valueType, fieldAccessDescriptor, qualifiers, unnest, qualifierPosition + 1) .withNullable(valueType.getNullable()); return FieldType.map(keyType, outputValueType).withNullable(inputFieldType.getNullable()); default: @@ -96,15 +140,42 @@ private static FieldType getOutputSchemaHelper( } } - public static Row selectRow( + private static Row selectNestedRow( Row input, FieldAccessDescriptor fieldAccessDescriptor, Schema inputSchema, Schema outputSchema) { + for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) { + return input.getValue(fieldId); + } + + for (Map.Entry nested : + fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) { + FieldDescriptor field = nested.getKey(); + FieldAccessDescriptor nestedAccess = nested.getValue(); + String fieldName = inputSchema.nameOf(checkNotNull(field.getFieldId())); + Schema nestedSchema = inputSchema.getField(field.getFieldId()).getType().getRowSchema(); + return selectRow(input.getValue(fieldName), nestedAccess, nestedSchema, outputSchema, true); + } + + throw new IllegalStateException("Unreachable."); + } + + /** Select out of a given {@link Row} object. */ + public static Row selectRow( + Row input, + FieldAccessDescriptor fieldAccessDescriptor, + Schema inputSchema, + Schema outputSchema, + boolean unnest) { if (fieldAccessDescriptor.getAllFields()) { return input; } + if (unnest && singleFieldSelected(inputSchema, fieldAccessDescriptor)) { + return selectNestedRow(input, fieldAccessDescriptor, inputSchema, outputSchema); + } + Row.Builder output = Row.withSchema(outputSchema); for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) { // TODO: Once we support specific qualifiers (like array slices), extract them here. @@ -114,18 +185,20 @@ public static Row selectRow( for (Map.Entry nested : fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) { FieldDescriptor field = nested.getKey(); + FieldAccessDescriptor nestedAccess = nested.getValue(); String fieldName = inputSchema.nameOf(checkNotNull(field.getFieldId())); FieldType nestedInputType = inputSchema.getField(field.getFieldId()).getType(); FieldType nestedOutputType = outputSchema.getField(fieldName).getType(); - Object value = + Object selectedValue = selectRowHelper( field.getQualifiers(), 0, input.getValue(fieldName), - nested.getValue(), + nestedAccess, nestedInputType, - nestedOutputType); - output.addValue(value); + nestedOutputType, + unnest); + output.addValue(selectedValue); } return output.build(); } @@ -137,11 +210,12 @@ private static Object selectRowHelper( Object value, FieldAccessDescriptor fieldAccessDescriptor, FieldType inputType, - FieldType outputType) { + FieldType outputType, + boolean unnest) { if (qualifierPosition >= qualifiers.size()) { Row row = (Row) value; return selectRow( - row, fieldAccessDescriptor, inputType.getRowSchema(), outputType.getRowSchema()); + row, fieldAccessDescriptor, inputType.getRowSchema(), outputType.getRowSchema(), unnest); } if (fieldAccessDescriptor.getAllFields()) { @@ -152,45 +226,47 @@ private static Object selectRowHelper( Qualifier qualifier = qualifiers.get(qualifierPosition); switch (qualifier.getKind()) { case LIST: - { - FieldType nestedInputType = checkNotNull(inputType.getCollectionElementType()); - FieldType nestedOutputType = checkNotNull(outputType.getCollectionElementType()); - List list = (List) value; - List selectedList = Lists.newArrayListWithCapacity(list.size()); - for (Object o : list) { - Object selected = - selectRowHelper( - qualifiers, - qualifierPosition + 1, - o, - fieldAccessDescriptor, - nestedInputType, - nestedOutputType); - selectedList.add(selected); - } - return selectedList; + { + FieldType nestedInputType = checkNotNull(inputType.getCollectionElementType()); + FieldType nestedOutputType = checkNotNull(outputType.getCollectionElementType()); + List list = (List) value; + List selectedList = Lists.newArrayListWithCapacity(list.size()); + for (Object o : list) { + Object selected = + selectRowHelper( + qualifiers, + qualifierPosition + 1, + o, + fieldAccessDescriptor, + nestedInputType, + nestedOutputType, + unnest); + selectedList.add(selected); } + return selectedList; + } case MAP: - { - FieldType nestedInputType = checkNotNull(inputType.getMapValueType()); - FieldType nestedOutputType = checkNotNull(outputType.getMapValueType()); - Map map = (Map) value; - Map selectedMap = Maps.newHashMapWithExpectedSize(map.size()); - for (Map.Entry entry : map.entrySet()) { - Object selected = - selectRowHelper( - qualifiers, - qualifierPosition + 1, - entry.getValue(), - fieldAccessDescriptor, - nestedInputType, - nestedOutputType); - selectedMap.put(entry.getKey(), selected); - } - return selectedMap; + { + FieldType nestedInputType = checkNotNull(inputType.getMapValueType()); + FieldType nestedOutputType = checkNotNull(outputType.getMapValueType()); + Map map = (Map) value; + Map selectedMap = Maps.newHashMapWithExpectedSize(map.size()); + for (Map.Entry entry : map.entrySet()) { + Object selected = + selectRowHelper( + qualifiers, + qualifierPosition + 1, + entry.getValue(), + fieldAccessDescriptor, + nestedInputType, + nestedOutputType, + unnest); + selectedMap.put(entry.getKey(), selected); } + return selectedMap; + } default: throw new RuntimeException("Unexpected type " + qualifier.getKind()); } } -} +} \ No newline at end of file From ab6f1e19f2990c021b427202a6513c53e62b5fe2 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Wed, 6 Mar 2019 12:57:47 -0800 Subject: [PATCH 02/14] Spotless. --- .../beam/sdk/schemas/transforms/CoGroup.java | 3 +- .../schemas/transforms/SchemaAggregateFn.java | 4 +- .../beam/sdk/schemas/transforms/Select.java | 8 +- .../beam/sdk/schemas/utils/SelectHelpers.java | 82 +++++++++---------- .../sdk/schemas/transforms/SelectTest.java | 71 +++++----------- 5 files changed, 69 insertions(+), 99 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java index 1db079878a35..9530c7d8f532 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java @@ -396,7 +396,8 @@ private static PCollection> extractKey( @ProcessElement public void process(@Element Row row, OutputReceiver> o) { o.output( - KV.of(SelectHelpers.selectRow(row, keyFields, schema, keySchema, true), + KV.of( + SelectHelpers.selectRow(row, keyFields, schema, keySchema, true), row)); } })) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java index 985ecd7888f1..578e744a9b07 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java @@ -97,8 +97,8 @@ static class FieldAggregation implements Serializable { @Nullable Schema inputSchema) { if (inputSchema != null) { this.fieldsToAggregate = fieldsToAggregate.resolve(inputSchema); - this.inputSubSchema = SelectHelpers.getOutputSchema(inputSchema, this - .fieldsToAggregate, true); + this.inputSubSchema = + SelectHelpers.getOutputSchema(inputSchema, this.fieldsToAggregate, true); this.unnestedInputSubSchema = Unnest.getUnnestedSchema(inputSubSchema); this.needsUnnesting = !inputSchema.equals(unnestedInputSubSchema); } else { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java index b8d1e0c728fe..cdab38f05629 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java @@ -64,8 +64,10 @@ * *
{@code
  * PCollection events = readUserEvents();
- * PCollection rows = event.apply(Select.fieldNames("location.*"));
+ * PCollection rows = event.apply(Select.fieldNames("location.*")
+ *                              .apply(Convert.to(Location.class));
  * }
+ * */ @Experimental(Kind.SCHEMAS) public class Select extends PTransform, PCollection> { @@ -114,8 +116,8 @@ public PCollection expand(PCollection input) { @ProcessElement public void process( @FieldAccess("selectFields") @Element Row row, OutputReceiver r) { - r.output(SelectHelpers.selectRow(row, resolved, inputSchema, outputSchema, - true)); + r.output( + SelectHelpers.selectRow(row, resolved, inputSchema, outputSchema, true)); } })) .setRowSchema(outputSchema); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java index 063b32863450..85a4a9e09fec 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java @@ -111,8 +111,8 @@ private static FieldType getOutputSchemaHelper( // We have walked through any containers, and are at a row type. Extract the subschema // for the row, preserving nullable attributes. checkArgument(inputFieldType.getTypeName().isCompositeType()); - return FieldType.row(getOutputSchema(inputFieldType.getRowSchema(), fieldAccessDescriptor, - unnest)) + return FieldType.row( + getOutputSchema(inputFieldType.getRowSchema(), fieldAccessDescriptor, unnest)) .withNullable(inputFieldType.getNullable()); } @@ -123,7 +123,7 @@ private static FieldType getOutputSchemaHelper( FieldType componentType = checkNotNull(inputFieldType.getCollectionElementType()); FieldType outputComponent = getOutputSchemaHelper( - componentType, fieldAccessDescriptor, qualifiers, unnest,qualifierPosition + 1) + componentType, fieldAccessDescriptor, qualifiers, unnest, qualifierPosition + 1) .withNullable(componentType.getNullable()); return FieldType.array(outputComponent).withNullable(inputFieldType.getNullable()); case MAP: @@ -132,7 +132,7 @@ private static FieldType getOutputSchemaHelper( FieldType valueType = checkNotNull(inputFieldType.getMapValueType()); FieldType outputValueType = getOutputSchemaHelper( - valueType, fieldAccessDescriptor, qualifiers, unnest, qualifierPosition + 1) + valueType, fieldAccessDescriptor, qualifiers, unnest, qualifierPosition + 1) .withNullable(valueType.getNullable()); return FieldType.map(keyType, outputValueType).withNullable(inputFieldType.getNullable()); default: @@ -226,47 +226,47 @@ private static Object selectRowHelper( Qualifier qualifier = qualifiers.get(qualifierPosition); switch (qualifier.getKind()) { case LIST: - { - FieldType nestedInputType = checkNotNull(inputType.getCollectionElementType()); - FieldType nestedOutputType = checkNotNull(outputType.getCollectionElementType()); - List list = (List) value; - List selectedList = Lists.newArrayListWithCapacity(list.size()); - for (Object o : list) { - Object selected = - selectRowHelper( - qualifiers, - qualifierPosition + 1, - o, - fieldAccessDescriptor, - nestedInputType, - nestedOutputType, - unnest); - selectedList.add(selected); + { + FieldType nestedInputType = checkNotNull(inputType.getCollectionElementType()); + FieldType nestedOutputType = checkNotNull(outputType.getCollectionElementType()); + List list = (List) value; + List selectedList = Lists.newArrayListWithCapacity(list.size()); + for (Object o : list) { + Object selected = + selectRowHelper( + qualifiers, + qualifierPosition + 1, + o, + fieldAccessDescriptor, + nestedInputType, + nestedOutputType, + unnest); + selectedList.add(selected); + } + return selectedList; } - return selectedList; - } case MAP: - { - FieldType nestedInputType = checkNotNull(inputType.getMapValueType()); - FieldType nestedOutputType = checkNotNull(outputType.getMapValueType()); - Map map = (Map) value; - Map selectedMap = Maps.newHashMapWithExpectedSize(map.size()); - for (Map.Entry entry : map.entrySet()) { - Object selected = - selectRowHelper( - qualifiers, - qualifierPosition + 1, - entry.getValue(), - fieldAccessDescriptor, - nestedInputType, - nestedOutputType, - unnest); - selectedMap.put(entry.getKey(), selected); + { + FieldType nestedInputType = checkNotNull(inputType.getMapValueType()); + FieldType nestedOutputType = checkNotNull(outputType.getMapValueType()); + Map map = (Map) value; + Map selectedMap = Maps.newHashMapWithExpectedSize(map.size()); + for (Map.Entry entry : map.entrySet()) { + Object selected = + selectRowHelper( + qualifiers, + qualifierPosition + 1, + entry.getValue(), + fieldAccessDescriptor, + nestedInputType, + nestedOutputType, + unnest); + selectedMap.put(entry.getKey(), selected); + } + return selectedMap; } - return selectedMap; - } default: throw new RuntimeException("Unexpected type " + qualifier.getKind()); } } -} \ No newline at end of file +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java index 3238ebd9d72b..b4131719c30a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java @@ -127,52 +127,6 @@ public int hashCode() { } } - /** A pojo matching the schema results from selection field2.*. */ - @DefaultSchema(JavaFieldSchema.class) - static class POJO2NestedAll { - POJO1 field2 = new POJO1(); - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - POJO2NestedAll that = (POJO2NestedAll) o; - return Objects.equals(field2, that.field2); - } - - @Override - public int hashCode() { - return Objects.hash(field2); - } - } - - /** A pojo matching the schema results from selection field2.field1, field2.field3. */ - @DefaultSchema(JavaFieldSchema.class) - static class POJO2NestedPartial { - POJO1Selected field2 = new POJO1Selected(); - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - POJO2NestedPartial that = (POJO2NestedPartial) o; - return Objects.equals(field2, that.field2); - } - - @Override - public int hashCode() { - return Objects.hash(field2); - } - } - @Test @Category(NeedsRunner.class) public void testSelectMissingFieldName() { @@ -216,24 +170,36 @@ public void testSimpleSelect() { @Test @Category(NeedsRunner.class) public void testSelectNestedAll() { - PCollection pojos = + PCollection pojos = + pipeline + .apply(Create.of(new POJO2())) + .apply(Select.fieldNames("field2")) + .apply(Convert.to(POJO1.class)); + PAssert.that(pojos).containsInAnyOrder(new POJO1()); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testSelectNestedAllWildcard() { + PCollection pojos = pipeline .apply(Create.of(new POJO2())) .apply(Select.fieldNames("field2.*")) - .apply(Convert.to(POJO2NestedAll.class)); - PAssert.that(pojos).containsInAnyOrder(new POJO2NestedAll()); + .apply(Convert.to(POJO1.class)); + PAssert.that(pojos).containsInAnyOrder(new POJO1()); pipeline.run(); } @Test @Category(NeedsRunner.class) public void testSelectNestedPartial() { - PCollection pojos = + PCollection pojos = pipeline .apply(Create.of(new POJO2())) .apply(Select.fieldNames("field2.field1", "field2.field3")) - .apply(Convert.to(POJO2NestedPartial.class)); - PAssert.that(pojos).containsInAnyOrder(new POJO2NestedPartial()); + .apply(Convert.to(POJO1Selected.class)); + PAssert.that(pojos).containsInAnyOrder(new POJO1Selected()); pipeline.run(); } @@ -637,6 +603,7 @@ public void testSelectRowNestedListsAndMaps() { .apply("convert2", Convert.to(PartialRowNestedArraysAndMaps.class)); PAssert.that(selected).containsInAnyOrder(new PartialRowNestedArraysAndMaps()); + PAssert.that(selected2).containsInAnyOrder(new PartialRowNestedArraysAndMaps()); pipeline.run(); } } From be4baf28584d26d27b5a039be4e4a3b002aab106 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Wed, 6 Mar 2019 14:56:08 -0800 Subject: [PATCH 03/14] Apply spotless. --- .../beam/sdk/schemas/transforms/Select.java | 3 +- .../beam/sdk/schemas/utils/SelectHelpers.java | 31 +++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java index cdab38f05629..a9796f7f2708 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java @@ -64,10 +64,9 @@ * *
{@code
  * PCollection events = readUserEvents();
- * PCollection rows = event.apply(Select.fieldNames("location.*")
+ * PCollection rows = event.apply(Select.fieldNames("location")
  *                              .apply(Convert.to(Location.class));
  * }
- * */ @Experimental(Kind.SCHEMAS) public class Select extends PTransform, PCollection> { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java index 85a4a9e09fec..61dd3ce2351e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java @@ -38,6 +38,10 @@ /** Helper methods to select subrows out of rows. */ public class SelectHelpers { + /** + * Checks whether a FieldAccessDescriptor selects only a single field. The descriptor is expected + * to already be resolved. + */ private static boolean singleFieldSelected( Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor) { if (fieldAccessDescriptor.getAllFields()) { @@ -67,6 +71,32 @@ private static boolean singleFieldSelected( /** * Get the output schema resulting from selecting the given {@link FieldAccessDescriptor} from the * given schema. + * + *

The unnest field controls the behavior when selecting a single field. For example, consider + * the following Java POJOs: + * + *

{@code
+   *   class UserEvent {
+   *     String userId;
+   *     String eventId;
+   *     int eventType;
+   *     Location location;
+   *  }
+ * + * + *
{@code
+   *  class Location {
+   *    double latitude;
+   *     double longtitude;
+   *  }}
+ * + *

If selecting just the location field and unnest is true, then the returned schema will + * match just that of the singular field being selected; in this case the returned schema will + * be that of the Location class. If unnest is false, then the returned schema will match the + * levels of nesting that the original schema had. In this case, it would be an outer schema + * containing a single ROW field named "location" that matched the Location schema. + * + *

In most cases, the user's expectations matches that when unnest is true. */ public static Schema getOutputSchema( Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor, boolean unnest) { @@ -78,6 +108,7 @@ public static Schema getOutputSchema( Schema.Builder builder = new Schema.Builder(); for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) { if (selectsNestedSingle) { + // The entire nested row is selected, so we can short circuit and return that type. return inputSchema.getField(fieldId).getType().getRowSchema(); } builder.addField(inputSchema.getField(fieldId)); From 99cf59c6c5f6011c004b4bd0750a75f09f3585b1 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Wed, 6 Mar 2019 18:43:59 -0800 Subject: [PATCH 04/14] Fix GroupTest. --- .../sdk/schemas/transforms/GroupTest.java | 23 +++++-------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java index cd8ec7a8b888..0229c55e9557 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java @@ -216,22 +216,16 @@ public void testGroupByNestedKey() { new OuterPOJO(new POJO("key2", 2L, "value4")))) .apply(Group.byFieldNames("inner.field1", "inner.field2")); - Schema selectedSchema = - Schema.builder().addStringField("field1").addInt64Field("field2").build(); - Schema keySchema = Schema.builder().addRowField("inner", selectedSchema).build(); + Schema keySchema = Schema.builder().addStringField("field1").addInt64Field("field2").build(); List>> expected = ImmutableList.of( KV.of( - Row.withSchema(keySchema) - .addValue(Row.withSchema(selectedSchema).addValues("key1", 1L).build()) - .build(), + Row.withSchema(keySchema).addValues("key1", 1L).build(), ImmutableList.of( new OuterPOJO(new POJO("key1", 1L, "value1")), new OuterPOJO(new POJO("key1", 1L, "value2")))), KV.of( - Row.withSchema(keySchema) - .addValue(Row.withSchema(selectedSchema).addValues("key2", 2L).build()) - .build(), + Row.withSchema(keySchema).addValues("key2", 2L).build(), ImmutableList.of( new OuterPOJO(new POJO("key2", 2L, "value3")), new OuterPOJO(new POJO("key2", 2L, "value4"))))); @@ -535,8 +529,7 @@ public void testByKeyWithSchemaAggregateFnNestedFields() { .aggregateField("inner.field3", Sum.ofIntegers(), "field3_sum") .aggregateField("inner.field1", Top.largestLongsFn(1), "field1_top")); - Schema innerKeySchema = Schema.builder().addInt64Field("field2").build(); - Schema keySchema = Schema.builder().addRowField("inner", innerKeySchema).build(); + Schema keySchema = Schema.builder().addInt64Field("field2").build(); Schema valueSchema = Schema.builder() .addInt64Field("field1_sum") @@ -547,14 +540,10 @@ public void testByKeyWithSchemaAggregateFnNestedFields() { List> expected = ImmutableList.of( KV.of( - Row.withSchema(keySchema) - .addValue(Row.withSchema(innerKeySchema).addValue(1L).build()) - .build(), + Row.withSchema(keySchema).addValue(1L).build(), Row.withSchema(valueSchema).addValue(3L).addValue(5).addArray(2L).build()), KV.of( - Row.withSchema(keySchema) - .addValue(Row.withSchema(innerKeySchema).addValue(2L).build()) - .build(), + Row.withSchema(keySchema).addValue(2L).build(), Row.withSchema(valueSchema).addValue(7L).addValue(9).addArray(4L).build())); PAssert.that(aggregations).satisfies(actual -> containsKvs(expected, actual)); From 2d14af01092612ab53bc40272bdb3ddcd5aeeabc Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Fri, 8 Mar 2019 14:00:43 -0800 Subject: [PATCH 05/14] Fix Javadoc. --- .../java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java index 61dd3ce2351e..08d86ebac84f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java @@ -81,7 +81,7 @@ private static boolean singleFieldSelected( * String eventId; * int eventType; * Location location; - * } + * }} * * *

{@code

From e588e476d8a4b87a0466b4333d34af3f2c09a886 Mon Sep 17 00:00:00 2001
From: Reuven Lax 
Date: Sat, 9 Mar 2019 08:53:00 -0800
Subject: [PATCH 06/14] Apply spotless.

---
 .../beam/sdk/schemas/utils/SelectHelpers.java | 35 ++++++++++---------
 1 file changed, 18 insertions(+), 17 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
index 08d86ebac84f..1aabee94c757 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
@@ -76,27 +76,28 @@ private static boolean singleFieldSelected(
    * the following Java POJOs:
    *
    * 
{@code
-   *   class UserEvent {
-   *     String userId;
-   *     String eventId;
-   *     int eventType;
-   *     Location location;
-   *  }}
- * + * class UserEvent { + * String userId; + * String eventId; + * int eventType; + * Location location; + * } + * }
* *
{@code
-   *  class Location {
-   *    double latitude;
-   *     double longtitude;
-   *  }}
+ * class Location { + * double latitude; + * double longtitude; + * } + * } * - *

If selecting just the location field and unnest is true, then the returned schema will - * match just that of the singular field being selected; in this case the returned schema will - * be that of the Location class. If unnest is false, then the returned schema will match the - * levels of nesting that the original schema had. In this case, it would be an outer schema - * containing a single ROW field named "location" that matched the Location schema. + *

If selecting just the location field and unnest is true, then the returned schema will match + * just that of the singular field being selected; in this case the returned schema will be that + * of the Location class. If unnest is false, then the returned schema will match the levels of + * nesting that the original schema had. In this case, it would be an outer schema containing a + * single ROW field named "location" that matched the Location schema. * - *

In most cases, the user's expectations matches that when unnest is true. + *

In most cases, the user's expectations matches that when unnest is true. */ public static Schema getOutputSchema( Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor, boolean unnest) { From 55300b09ee4a97875bf9913ea73517c806a56bf8 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 19 Mar 2019 14:42:17 -0700 Subject: [PATCH 07/14] Change select to always return a row. --- .../beam/sdk/schemas/transforms/CoGroup.java | 6 +- .../beam/sdk/schemas/transforms/Convert.java | 52 ++++- .../beam/sdk/schemas/transforms/Group.java | 5 +- .../schemas/transforms/SchemaAggregateFn.java | 9 +- .../beam/sdk/schemas/transforms/Select.java | 5 +- .../beam/sdk/schemas/utils/SelectHelpers.java | 211 +++++++++--------- .../java/org/apache/beam/sdk/values/Row.java | 11 + 7 files changed, 161 insertions(+), 138 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java index 9530c7d8f532..7dd961a93fde 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java @@ -361,7 +361,7 @@ private static JoinInformation from( // The key schema contains the field names from the first PCollection specified. FieldAccessDescriptor resolved = fieldAccessDescriptor.withOrderByFieldInsertionOrder().resolve(schema); - Schema currentKeySchema = SelectHelpers.getOutputSchema(schema, resolved, true); + Schema currentKeySchema = SelectHelpers.getOutputSchema(schema, resolved); if (keySchema == null) { keySchema = currentKeySchema; } else { @@ -396,9 +396,7 @@ private static PCollection> extractKey( @ProcessElement public void process(@Element Row row, OutputReceiver> o) { o.output( - KV.of( - SelectHelpers.selectRow(row, keyFields, schema, keySchema, true), - row)); + KV.of(SelectHelpers.selectRow(row, keyFields, schema, keySchema), row)); } })) .setCoder(KvCoder.of(SchemaCoder.of(keySchema), SchemaCoder.of(schema))); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java index 1d8116cb7bb2..1ff0a01b255e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java @@ -17,9 +17,12 @@ */ package org.apache.beam.sdk.schemas.transforms; +import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.transforms.DoFn; @@ -70,7 +73,9 @@ public static PTransform, PCollection> fromR * *

This function allows converting between two types as long as the two types have * compatible schemas. Two schemas are said to be compatible if they recursively - * have fields with the same names, but possibly different orders. + * have fields with the same names, but possibly different orders. If the source schema can be + * unboxed to match the target schema (i.e. the source schema contains a single field that is + * compatible with the target schema), then conversion also succeeds. */ public static PTransform, PCollection> to( Class clazz) { @@ -82,7 +87,9 @@ public static PTransform, PCollectionThis function allows converting between two types as long as the two types have * compatible schemas. Two schemas are said to be compatible if they recursively - * have fields with the same names, but possibly different orders. + * have fields with the same names, but possibly different orders. If the source schema can be + * unboxed to match the target schema (i.e. the source schema contains a single field that is + * compatible with the target schema), then conversion also succeeds. */ public static PTransform, PCollection> to( TypeDescriptor typeDescriptor) { @@ -92,11 +99,24 @@ public static PTransform, PCollection extends PTransform, PCollection> { TypeDescriptor outputTypeDescriptor; + Schema unboxedSchema = null; ConvertTransform(TypeDescriptor outputTypeDescriptor) { this.outputTypeDescriptor = outputTypeDescriptor; } + @Nullable + private static Schema getBoxedNestedSchema(Schema schema) { + if (schema.getFieldCount() > 1) { + return null; + } + FieldType fieldType = schema.getField(0).getType(); + if (!fieldType.getTypeName().isCompositeType()) { + return null; + } + return fieldType.getRowSchema(); + } + @Override @SuppressWarnings("unchecked") public PCollection expand(PCollection input) { @@ -124,15 +144,21 @@ public PCollection expand(PCollection input) { registry.getSchema(outputTypeDescriptor), registry.getToRowFunction(outputTypeDescriptor), registry.getFromRowFunction(outputTypeDescriptor)); - // assert matches input schema. - // TODO: Properly handle nullable. - if (!outputSchemaCoder.getSchema().assignableToIgnoreNullable(input.getSchema())) { - throw new RuntimeException( - "Cannot convert between types that don't have equivalent schemas." - + " input schema: " - + input.getSchema() - + " output schema: " - + outputSchemaCoder.getSchema()); + + Schema outputSchema = outputSchemaCoder.getSchema(); + if (!outputSchema.assignableToIgnoreNullable(input.getSchema())) { + // We also support unboxing nested Row schemas, so attempt that. + // TODO: Support unboxing to primitive types as well. + unboxedSchema = getBoxedNestedSchema(input.getSchema()); + if (unboxedSchema == null || !outputSchema.assignableToIgnoreNullable(unboxedSchema)) { + Schema checked = (unboxedSchema == null) ? input.getSchema() : unboxedSchema; + throw new RuntimeException( + "Cannot convert between types that don't have equivalent schemas." + + " input schema: " + + checked + + " output schema: " + + outputSchemaCoder.getSchema()); + } } } catch (NoSuchSchemaException e) { throw new RuntimeException("No schema registered for " + outputTypeDescriptor); @@ -145,7 +171,9 @@ public PCollection expand(PCollection input) { new DoFn() { @ProcessElement public void processElement(@Element Row row, OutputReceiver o) { - o.output(outputSchemaCoder.getFromRowFunction().apply(row)); + // Read the row, potentially unboxing if necessary. + Row input = (unboxedSchema == null) ? row : row.getValue(0); + o.output(outputSchemaCoder.getFromRowFunction().apply(input)); } })) .setSchema( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java index c03c2138110a..cf356f4852e7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java @@ -631,7 +631,7 @@ public CombineFieldsByFields agg public PCollection>> expand(PCollection input) { Schema schema = input.getSchema(); FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema); - keySchema = SelectHelpers.getOutputSchema(schema, resolved, true); + keySchema = SelectHelpers.getOutputSchema(schema, resolved); return input .apply( "Group by fields", @@ -644,8 +644,7 @@ public void process( OutputReceiver> o) { o.output( KV.of( - SelectHelpers.selectRow(row, resolved, schema, keySchema, true), - element)); + SelectHelpers.selectRow(row, resolved, schema, keySchema), element)); } })) .setCoder(KvCoder.of(SchemaCoder.of(keySchema), input.getCoder())) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java index 578e744a9b07..77db21846133 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java @@ -97,8 +97,7 @@ static class FieldAggregation implements Serializable { @Nullable Schema inputSchema) { if (inputSchema != null) { this.fieldsToAggregate = fieldsToAggregate.resolve(inputSchema); - this.inputSubSchema = - SelectHelpers.getOutputSchema(inputSchema, this.fieldsToAggregate, true); + this.inputSubSchema = SelectHelpers.getOutputSchema(inputSchema, this.fieldsToAggregate); this.unnestedInputSubSchema = Unnest.getUnnestedSchema(inputSubSchema); this.needsUnnesting = !inputSchema.equals(unnestedInputSubSchema); } else { @@ -246,8 +245,7 @@ public OutputT apply(InputT input) { row, fieldAggregation.fieldsToAggregate, row.getSchema(), - fieldAggregation.inputSubSchema, - true); + fieldAggregation.inputSubSchema); if (fieldAggregation.needsUnnesting) { selected = Unnest.unnestRow(selected, fieldAggregation.unnestedInputSubSchema); } @@ -273,8 +271,7 @@ public Row apply(T input) { row, fieldAggregation.fieldsToAggregate, row.getSchema(), - fieldAggregation.inputSubSchema, - true); + fieldAggregation.inputSubSchema); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java index a9796f7f2708..077cc33f34de 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java @@ -99,7 +99,7 @@ public static Select fieldAccess(FieldAccessDescriptor fieldAccessDescrip public PCollection expand(PCollection input) { Schema inputSchema = input.getSchema(); FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(inputSchema); - Schema outputSchema = SelectHelpers.getOutputSchema(inputSchema, resolved, true); + Schema outputSchema = SelectHelpers.getOutputSchema(inputSchema, resolved); return input .apply( @@ -115,8 +115,7 @@ public PCollection expand(PCollection input) { @ProcessElement public void process( @FieldAccess("selectFields") @Element Row row, OutputReceiver r) { - r.output( - SelectHelpers.selectRow(row, resolved, inputSchema, outputSchema, true)); + r.output(SelectHelpers.selectRow(row, resolved, inputSchema, outputSchema)); } })) .setRowSchema(outputSchema); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java index 1aabee94c757..2d06480b4a90 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java @@ -19,7 +19,6 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; -import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState; import java.util.List; import java.util.Map; @@ -31,49 +30,27 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps; /** Helper methods to select subrows out of rows. */ public class SelectHelpers { - /** - * Checks whether a FieldAccessDescriptor selects only a single field. The descriptor is expected - * to already be resolved. - */ - private static boolean singleFieldSelected( - Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor) { - if (fieldAccessDescriptor.getAllFields()) { - return false; - } - if (fieldAccessDescriptor.fieldIdsAccessed().size() == 1 - && fieldAccessDescriptor.getNestedFieldsAccessed().isEmpty()) { - int fieldId = fieldAccessDescriptor.fieldIdsAccessed().iterator().next(); - Field field = inputSchema.getField(fieldId); - return TypeName.ROW.equals(field.getType().getTypeName()); - } - if (fieldAccessDescriptor.fieldIdsAccessed().isEmpty() - && fieldAccessDescriptor.getNestedFieldsAccessed().size() == 1) { - FieldDescriptor key = - fieldAccessDescriptor.getNestedFieldsAccessed().keySet().iterator().next(); - Field field = inputSchema.getField(checkNotNull(key.getFieldId())); - if (!field.getType().getTypeName().isCollectionType() - && !field.getType().getTypeName().isMapType() - && key.getQualifiers().isEmpty()) { - return true; - } + private static Schema union(Iterable schemas) { + Schema.Builder unioned = Schema.builder(); + for (Schema schema : schemas) { + unioned.addFields(schema.getFields()); } - return false; + return unioned.build(); } /** * Get the output schema resulting from selecting the given {@link FieldAccessDescriptor} from the * given schema. * - *

The unnest field controls the behavior when selecting a single field. For example, consider - * the following Java POJOs: + *

Fields are always extracted and then stored in a new Row. For example, consider the + * following Java POJOs: * *

{@code
    *  class UserEvent {
@@ -87,64 +64,68 @@ private static boolean singleFieldSelected(
    * 
{@code
    * class Location {
    *   double latitude;
-   *    double longtitude;
+   *   double longtitude;
    * }
    * }
* - *

If selecting just the location field and unnest is true, then the returned schema will match - * just that of the singular field being selected; in this case the returned schema will be that - * of the Location class. If unnest is false, then the returned schema will match the levels of - * nesting that the original schema had. In this case, it would be an outer schema containing a - * single ROW field named "location" that matched the Location schema. + *

If selecting just the location field, then the returned schema will wrap that of the + * singular field being selected; in this case the returned schema will be a Row containing a + * single Location field. If location.latitude is selected, then the returned Schema will be a Row + * containing a double latitude field. * - *

In most cases, the user's expectations matches that when unnest is true. + *

The same holds true when selecting from lists or maps. For example: + * + *

{@code
+   * class EventList {
+   *   List events;
+   * }
+   * }
+ * + *

If selecting events.location.latitude, the returned schema will contain a single array of + * Row, where that Row contains a single double latitude field; it will not contain an array of + * double. */ public static Schema getOutputSchema( - Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor, boolean unnest) { + Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor) { if (fieldAccessDescriptor.getAllFields()) { return inputSchema; } - boolean selectsNestedSingle = unnest && singleFieldSelected(inputSchema, fieldAccessDescriptor); - Schema.Builder builder = new Schema.Builder(); + List schemas = Lists.newArrayList(); + Schema.Builder builder = Schema.builder(); for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) { - if (selectsNestedSingle) { - // The entire nested row is selected, so we can short circuit and return that type. - return inputSchema.getField(fieldId).getType().getRowSchema(); - } builder.addField(inputSchema.getField(fieldId)); } + schemas.add(builder.build()); for (Map.Entry nested : fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) { FieldDescriptor fieldDescriptor = nested.getKey(); FieldAccessDescriptor nestedAccess = nested.getValue(); Field field = inputSchema.getField(checkNotNull(fieldDescriptor.getFieldId())); - if (selectsNestedSingle) { - checkState(field.getType().getTypeName().isCompositeType()); - return getOutputSchema(field.getType().getRowSchema(), nestedAccess, unnest); + + FieldType outputType = + getOutputSchemaHelper(field.getType(), nestedAccess, fieldDescriptor.getQualifiers(), 0); + if (outputType.getTypeName().isCompositeType()) { + schemas.add(outputType.getRowSchema()); } else { - FieldType outputType = - getOutputSchemaHelper( - field.getType(), nestedAccess, fieldDescriptor.getQualifiers(), unnest, 0); - builder.addField(field.getName(), outputType); + schemas.add(Schema.builder().addField(field.getName(), outputType).build()); } } - return builder.build(); + + return union(schemas); } private static FieldType getOutputSchemaHelper( FieldType inputFieldType, FieldAccessDescriptor fieldAccessDescriptor, List qualifiers, - boolean unnest, int qualifierPosition) { if (qualifierPosition >= qualifiers.size()) { // We have walked through any containers, and are at a row type. Extract the subschema // for the row, preserving nullable attributes. checkArgument(inputFieldType.getTypeName().isCompositeType()); - return FieldType.row( - getOutputSchema(inputFieldType.getRowSchema(), fieldAccessDescriptor, unnest)) + return FieldType.row(getOutputSchema(inputFieldType.getRowSchema(), fieldAccessDescriptor)) .withNullable(inputFieldType.getNullable()); } @@ -155,7 +136,7 @@ private static FieldType getOutputSchemaHelper( FieldType componentType = checkNotNull(inputFieldType.getCollectionElementType()); FieldType outputComponent = getOutputSchemaHelper( - componentType, fieldAccessDescriptor, qualifiers, unnest, qualifierPosition + 1) + componentType, fieldAccessDescriptor, qualifiers, qualifierPosition + 1) .withNullable(componentType.getNullable()); return FieldType.array(outputComponent).withNullable(inputFieldType.getNullable()); case MAP: @@ -164,7 +145,7 @@ private static FieldType getOutputSchemaHelper( FieldType valueType = checkNotNull(inputFieldType.getMapValueType()); FieldType outputValueType = getOutputSchemaHelper( - valueType, fieldAccessDescriptor, qualifiers, unnest, qualifierPosition + 1) + valueType, fieldAccessDescriptor, qualifiers, qualifierPosition + 1) .withNullable(valueType.getNullable()); return FieldType.map(keyType, outputValueType).withNullable(inputFieldType.getNullable()); default: @@ -172,87 +153,99 @@ private static FieldType getOutputSchemaHelper( } } - private static Row selectNestedRow( + public static Row selectRow( Row input, FieldAccessDescriptor fieldAccessDescriptor, Schema inputSchema, Schema outputSchema) { - for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) { - return input.getValue(fieldId); - } - - for (Map.Entry nested : - fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) { - FieldDescriptor field = nested.getKey(); - FieldAccessDescriptor nestedAccess = nested.getValue(); - String fieldName = inputSchema.nameOf(checkNotNull(field.getFieldId())); - Schema nestedSchema = inputSchema.getField(field.getFieldId()).getType().getRowSchema(); - return selectRow(input.getValue(fieldName), nestedAccess, nestedSchema, outputSchema, true); + if (fieldAccessDescriptor.getAllFields()) { + return input; } - throw new IllegalStateException("Unreachable."); + Row.Builder output = Row.withSchema(outputSchema); + selectIntoRow(input, output, fieldAccessDescriptor, inputSchema); + return output.build(); } /** Select out of a given {@link Row} object. */ - public static Row selectRow( + public static void selectIntoRow( Row input, + Row.Builder output, FieldAccessDescriptor fieldAccessDescriptor, - Schema inputSchema, - Schema outputSchema, - boolean unnest) { + Schema inputSchema) { if (fieldAccessDescriptor.getAllFields()) { - return input; + output.addValues(input.getValues()); + return; } - if (unnest && singleFieldSelected(inputSchema, fieldAccessDescriptor)) { - return selectNestedRow(input, fieldAccessDescriptor, inputSchema, outputSchema); - } - - Row.Builder output = Row.withSchema(outputSchema); for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) { // TODO: Once we support specific qualifiers (like array slices), extract them here. output.addValue(input.getValue(fieldId)); } + Schema outputSchema = output.getSchema(); for (Map.Entry nested : fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) { FieldDescriptor field = nested.getKey(); FieldAccessDescriptor nestedAccess = nested.getValue(); - String fieldName = inputSchema.nameOf(checkNotNull(field.getFieldId())); FieldType nestedInputType = inputSchema.getField(field.getFieldId()).getType(); - FieldType nestedOutputType = outputSchema.getField(fieldName).getType(); - Object selectedValue = - selectRowHelper( - field.getQualifiers(), - 0, - input.getValue(fieldName), - nestedAccess, - nestedInputType, - nestedOutputType, - unnest); - output.addValue(selectedValue); + FieldType nestedOutputType = outputSchema.getField(output.nextFieldId()).getType(); + + if (nestedOutputType.getTypeName().isCompositeType()) { + Row.Builder nestedBuilder = Row.withSchema(nestedOutputType.getRowSchema()); + selectIntoRowHelper( + field.getQualifiers(), + input.getValue(field.getFieldId()), + nestedBuilder, + nestedAccess, + nestedInputType, + nestedOutputType); + output.addValue(nestedBuilder.build()); + } else { + selectIntoRowHelper( + field.getQualifiers(), + input.getValue(field.getFieldId()), + output, + nestedAccess, + nestedInputType, + nestedOutputType); + } } - return output.build(); } @SuppressWarnings("unchecked") - private static Object selectRowHelper( + private static void selectIntoRowHelper( List qualifiers, - int qualifierPosition, Object value, + Row.Builder output, FieldAccessDescriptor fieldAccessDescriptor, FieldType inputType, - FieldType outputType, - boolean unnest) { - if (qualifierPosition >= qualifiers.size()) { + FieldType outputType) { + if (qualifiers.isEmpty()) { Row row = (Row) value; - return selectRow( - row, fieldAccessDescriptor, inputType.getRowSchema(), outputType.getRowSchema(), unnest); + selectIntoRow(row, output, fieldAccessDescriptor, inputType.getRowSchema()); + return; } - if (fieldAccessDescriptor.getAllFields()) { - // Since we are selecting all fields (and we do not yet support array slicing), short circuit. - return value; + // There are qualifiers. That means that the result will be either a list or a map, so + // construct the result and add that to our Row. + output.addValue( + selectValueHelper(qualifiers, 0, value, fieldAccessDescriptor, inputType, outputType)); + } + + private static Object selectValueHelper( + List qualifiers, + int qualifierPosition, + Object value, + FieldAccessDescriptor fieldAccessDescriptor, + FieldType inputType, + FieldType outputType) { + if (qualifierPosition >= qualifiers.size()) { + // We have already constructed all arrays and maps. What remains must be a Row. + Row row = (Row) value; + Row.Builder output = Row.withSchema(outputType.getRowSchema()); + selectIntoRow(row, output, fieldAccessDescriptor, inputType.getRowSchema()); + return output.build(); } Qualifier qualifier = qualifiers.get(qualifierPosition); @@ -262,17 +255,16 @@ private static Object selectRowHelper( FieldType nestedInputType = checkNotNull(inputType.getCollectionElementType()); FieldType nestedOutputType = checkNotNull(outputType.getCollectionElementType()); List list = (List) value; - List selectedList = Lists.newArrayListWithCapacity(list.size()); + List selectedList = Lists.newArrayListWithCapacity(list.size()); for (Object o : list) { Object selected = - selectRowHelper( + selectValueHelper( qualifiers, qualifierPosition + 1, o, fieldAccessDescriptor, nestedInputType, - nestedOutputType, - unnest); + nestedOutputType); selectedList.add(selected); } return selectedList; @@ -285,14 +277,13 @@ private static Object selectRowHelper( Map selectedMap = Maps.newHashMapWithExpectedSize(map.size()); for (Map.Entry entry : map.entrySet()) { Object selected = - selectRowHelper( + selectValueHelper( qualifiers, qualifierPosition + 1, entry.getValue(), fieldAccessDescriptor, nestedInputType, - nestedOutputType, - unnest); + nestedOutputType); selectedMap.put(entry.getKey(), selected); } return selectedMap; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java index 47958e2d6119..1f6e38617006 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java @@ -500,6 +500,17 @@ public static class Builder { this.schema = schema; } + public int nextFieldId() { + if (fieldValueGetterFactory != null) { + throw new RuntimeException("Not supported"); + } + return values.size(); + } + + public Schema getSchema() { + return schema; + } + public Builder addValue(@Nullable Object values) { this.values.add(values); return this; From cf2dd9f58a6999d653d5a31a3606fa5af89b2a9f Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 19 Mar 2019 20:50:13 -0700 Subject: [PATCH 08/14] Add unit tests for SelectHelpers. --- .../beam/sdk/schemas/utils/SelectHelpers.java | 1 + .../sdk/schemas/utils/SelectHelpersTest.java | 324 ++++++++++++++++++ 2 files changed, 325 insertions(+) create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java index 2d06480b4a90..2cd54739c294 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java @@ -153,6 +153,7 @@ private static FieldType getOutputSchemaHelper( } } + /** Select a sub Row from an input Row. */ public static Row selectRow( Row input, FieldAccessDescriptor fieldAccessDescriptor, diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java new file mode 100644 index 000000000000..fe40f17c5928 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.utils; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; +import org.junit.Test; + +/** Tests for {@link SelectHelpers}. */ +public class SelectHelpersTest { + static final Schema FLAT_SCHEMA = + Schema.builder() + .addStringField("field1") + .addInt32Field("field2") + .addDoubleField("field3") + .build(); + static final Row FLAT_ROW = Row.withSchema(FLAT_SCHEMA).addValues("first", 42, 3.14).build(); + + static final Schema NESTED_SCHEMA = + Schema.builder().addRowField("nested", FLAT_SCHEMA).addStringField("foo").build(); + static final Row NESTED_ROW = Row.withSchema(NESTED_SCHEMA).addValues(FLAT_ROW, "").build(); + + static final Schema DOUBLE_NESTED_SCHEMA = + Schema.builder().addRowField("nested2", NESTED_SCHEMA).build(); + static final Row DOUBLE_NESTED_ROW = + Row.withSchema(DOUBLE_NESTED_SCHEMA).addValue(NESTED_ROW).build(); + + static final Schema ARRAY_SCHEMA = + Schema.builder() + .addArrayField("primitiveArray", FieldType.INT32) + .addArrayField("rowArray", FieldType.row(FLAT_SCHEMA)) + .addArrayField("arrayOfRowArray", FieldType.array(FieldType.row(FLAT_SCHEMA))) + .addArrayField("nestedRowArray", FieldType.row(NESTED_SCHEMA)) + .build(); + static final Row ARRAY_ROW = + Row.withSchema(ARRAY_SCHEMA) + .addArray(1, 2) + .addArray(FLAT_ROW, FLAT_ROW) + .addArray(ImmutableList.of(FLAT_ROW), ImmutableList.of(FLAT_ROW)) + .addArray(NESTED_ROW, NESTED_ROW) + .build(); + + static final Schema MAP_SCHEMA = + Schema.builder().addMapField("map", FieldType.INT32, FieldType.row(FLAT_SCHEMA)).build(); + static final Row MAP_ROW = + Row.withSchema(MAP_SCHEMA).addValue(ImmutableMap.of(1, FLAT_ROW)).build(); + + static final Schema MAP_ARRAY_SCHEMA = + Schema.builder() + .addMapField("map", FieldType.INT32, FieldType.array(FieldType.row(FLAT_SCHEMA))) + .build(); + static final Row MAP_ARRAY_ROW = + Row.withSchema(MAP_ARRAY_SCHEMA) + .addValue(ImmutableMap.of(1, ImmutableList.of(FLAT_ROW))) + .build(); + + @Test + public void testSelectAll() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("*").resolve(FLAT_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA, fieldAccessDescriptor); + assertEquals(FLAT_SCHEMA, outputSchema); + + Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, FLAT_SCHEMA, outputSchema); + assertEquals(FLAT_ROW, row); + } + + @Test + public void testsSimpleSelectSingle() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("field1").resolve(FLAT_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA, fieldAccessDescriptor); + Schema expectedSchema = Schema.builder().addStringField("field1").build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, FLAT_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build(); + assertEquals(expectedRow, row); + } + + @Test + public void testsSimpleSelectMultiple() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("field1", "field3").resolve(FLAT_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA, fieldAccessDescriptor); + Schema expectedSchema = + Schema.builder().addStringField("field1").addDoubleField("field3").build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, FLAT_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addValues("first", 3.14).build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectedNested() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("nested").resolve(NESTED_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA, fieldAccessDescriptor); + Schema expectedSchema = Schema.builder().addRowField("nested", FLAT_SCHEMA).build(); + assertEquals(expectedSchema, outputSchema); + + Row row = + SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor, NESTED_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addValue(FLAT_ROW).build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectedNestedSingle() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("nested.field1").resolve(NESTED_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA, fieldAccessDescriptor); + Schema expectedSchema = Schema.builder().addStringField("field1").build(); + assertEquals(expectedSchema, outputSchema); + + Row row = + SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor, NESTED_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectedNestedWildcard() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("nested.*").resolve(NESTED_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA, fieldAccessDescriptor); + assertEquals(FLAT_SCHEMA, outputSchema); + + Row row = + SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor, NESTED_SCHEMA, outputSchema); + assertEquals(FLAT_ROW, row); + } + + @Test + public void testSelectDoubleNested() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("nested2.nested.field1").resolve(DOUBLE_NESTED_SCHEMA); + Schema outputSchema = + SelectHelpers.getOutputSchema(DOUBLE_NESTED_SCHEMA, fieldAccessDescriptor); + Schema expectedSchema = Schema.builder().addStringField("field1").build(); + assertEquals(expectedSchema, outputSchema); + + Row row = + SelectHelpers.selectRow( + DOUBLE_NESTED_ROW, fieldAccessDescriptor, DOUBLE_NESTED_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectArrayOfPrimitive() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("primitiveArray").resolve(ARRAY_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor); + Schema expectedSchema = + Schema.builder().addArrayField("primitiveArray", FieldType.INT32).build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addArray(1, 2).build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectArrayOfRow() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("rowArray").resolve(ARRAY_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor); + Schema expectedSchema = + Schema.builder().addArrayField("rowArray", FieldType.row(FLAT_SCHEMA)).build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addArray(FLAT_ROW, FLAT_ROW).build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectArrayOfRowPartial() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("rowArray[].field1").resolve(ARRAY_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor); + + Schema expectedElementSchema = Schema.builder().addStringField("field1").build(); + Schema expectedSchema = + Schema.builder().addArrayField("rowArray", FieldType.row(expectedElementSchema)).build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema); + + Row expectedElement = Row.withSchema(expectedElementSchema).addValue("first").build(); + Row expectedRow = + Row.withSchema(expectedSchema).addArray(expectedElement, expectedElement).build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectArrayOfRowArray() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("arrayOfRowArray[][].field1").resolve(ARRAY_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor); + + Schema expectedElementSchema = Schema.builder().addStringField("field1").build(); + Schema expectedSchema = + Schema.builder() + .addArrayField("arrayOfRowArray", FieldType.array(FieldType.row(expectedElementSchema))) + .build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema); + + Row expectedElement = Row.withSchema(expectedElementSchema).addValue("first").build(); + Row expectedRow = + Row.withSchema(expectedSchema) + .addArray(ImmutableList.of(expectedElement), ImmutableList.of(expectedElement)) + .build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectArrayOfNestedRow() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("nestedRowArray[].nested.field1") + .resolve(ARRAY_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor); + + Schema expectedElementSchema = Schema.builder().addStringField("field1").build(); + Schema expectedSchema = + Schema.builder() + .addArrayField("nestedRowArray", FieldType.row(expectedElementSchema)) + .build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema); + + Row expectedElement = Row.withSchema(expectedElementSchema).addValue("first").build(); + Row expectedRow = + Row.withSchema(expectedSchema).addArray(expectedElement, expectedElement).build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectMapOfRowSelectSingle() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("map{}.field1").resolve(MAP_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(MAP_SCHEMA, fieldAccessDescriptor); + + Schema expectedValueSchema = Schema.builder().addStringField("field1").build(); + Schema expectedSchema = + Schema.builder() + .addMapField("map", FieldType.INT32, FieldType.row(expectedValueSchema)) + .build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(MAP_ROW, fieldAccessDescriptor, MAP_SCHEMA, outputSchema); + + Row expectedValueRow = Row.withSchema(expectedValueSchema).addValue("first").build(); + Row expectedRow = + Row.withSchema(expectedSchema).addValue(ImmutableMap.of(1, expectedValueRow)).build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectMapOfRowSelectAll() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("map{}.*").resolve(MAP_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(MAP_SCHEMA, fieldAccessDescriptor); + Schema expectedSchema = + Schema.builder().addMapField("map", FieldType.INT32, FieldType.row(FLAT_SCHEMA)).build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(MAP_ROW, fieldAccessDescriptor, MAP_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addValue(ImmutableMap.of(1, FLAT_ROW)).build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectMapOfArray() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("map.field1").resolve(MAP_ARRAY_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(MAP_ARRAY_SCHEMA, fieldAccessDescriptor); + + Schema expectedValueSchema = Schema.builder().addStringField("field1").build(); + Schema expectedSchema = + Schema.builder() + .addMapField( + "map", FieldType.INT32, FieldType.array(FieldType.row(expectedValueSchema))) + .build(); + assertEquals(expectedSchema, outputSchema); + + Row row = + SelectHelpers.selectRow( + MAP_ARRAY_ROW, fieldAccessDescriptor, MAP_ARRAY_SCHEMA, outputSchema); + Row expectedElement = Row.withSchema(expectedValueSchema).addValue("first").build(); + + Row expectedRow = + Row.withSchema(expectedSchema) + .addValue(ImmutableMap.of(1, ImmutableList.of(expectedElement))) + .build(); + assertEquals(expectedRow, row); + } +} From 6a9a7da90571bdc0ab18d8f9e90b6925d7606700 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Thu, 28 Mar 2019 09:56:11 -0700 Subject: [PATCH 09/14] Address comments. --- .../beam/sdk/schemas/transforms/Convert.java | 2 +- .../beam/sdk/schemas/utils/SelectHelpers.java | 40 ++++++------------- .../sdk/schemas/utils/SelectHelpersTest.java | 21 ++++++++++ 3 files changed, 34 insertions(+), 29 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java index 1ff0a01b255e..9b01b352c333 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java @@ -107,7 +107,7 @@ private static class ConvertTransform @Nullable private static Schema getBoxedNestedSchema(Schema schema) { - if (schema.getFieldCount() > 1) { + if (schema.getFieldCount() != 1) { return null; } FieldType fieldType = schema.getField(0).getType(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java index 2cd54739c294..ab4db36da213 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java @@ -164,16 +164,13 @@ public static Row selectRow( } Row.Builder output = Row.withSchema(outputSchema); - selectIntoRow(input, output, fieldAccessDescriptor, inputSchema); + selectIntoRow(input, output, fieldAccessDescriptor); return output.build(); } /** Select out of a given {@link Row} object. */ public static void selectIntoRow( - Row input, - Row.Builder output, - FieldAccessDescriptor fieldAccessDescriptor, - Schema inputSchema) { + Row input, Row.Builder output, FieldAccessDescriptor fieldAccessDescriptor) { if (fieldAccessDescriptor.getAllFields()) { output.addValues(input.getValues()); return; @@ -189,28 +186,15 @@ public static void selectIntoRow( fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) { FieldDescriptor field = nested.getKey(); FieldAccessDescriptor nestedAccess = nested.getValue(); - FieldType nestedInputType = inputSchema.getField(field.getFieldId()).getType(); + FieldType nestedInputType = input.getSchema().getField(field.getFieldId()).getType(); FieldType nestedOutputType = outputSchema.getField(output.nextFieldId()).getType(); - - if (nestedOutputType.getTypeName().isCompositeType()) { - Row.Builder nestedBuilder = Row.withSchema(nestedOutputType.getRowSchema()); - selectIntoRowHelper( - field.getQualifiers(), - input.getValue(field.getFieldId()), - nestedBuilder, - nestedAccess, - nestedInputType, - nestedOutputType); - output.addValue(nestedBuilder.build()); - } else { - selectIntoRowHelper( - field.getQualifiers(), - input.getValue(field.getFieldId()), - output, - nestedAccess, - nestedInputType, - nestedOutputType); - } + selectIntoRowHelper( + field.getQualifiers(), + input.getValue(field.getFieldId()), + output, + nestedAccess, + nestedInputType, + nestedOutputType); } } @@ -224,7 +208,7 @@ private static void selectIntoRowHelper( FieldType outputType) { if (qualifiers.isEmpty()) { Row row = (Row) value; - selectIntoRow(row, output, fieldAccessDescriptor, inputType.getRowSchema()); + selectIntoRow(row, output, fieldAccessDescriptor); return; } @@ -245,7 +229,7 @@ private static Object selectValueHelper( // We have already constructed all arrays and maps. What remains must be a Row. Row row = (Row) value; Row.Builder output = Row.withSchema(outputType.getRowSchema()); - selectIntoRow(row, output, fieldAccessDescriptor, inputType.getRowSchema()); + selectIntoRow(row, output, fieldAccessDescriptor); return output.build(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java index fe40f17c5928..260bf35d0bce 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java @@ -321,4 +321,25 @@ public void testSelectMapOfArray() { .build(); assertEquals(expectedRow, row); } + + @Test + public void testSelectFieldOfRecord() { + Schema f1 = Schema.builder().addInt64Field("f0").build(); + Schema f2 = Schema.builder().addRowField("f1", f1).build(); + Schema f3 = Schema.builder().addRowField("f2", f2).build(); + + Row r1 = Row.withSchema(f1).addValue(42L).build(); // {"f0": 42} + Row r2 = Row.withSchema(f2).addValue(r1).build(); // {"f1": {"f0": 42}} + Row r3 = Row.withSchema(f3).addValue(r2).build(); // {"f2": {"f1": {"f0": 42}}} + + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("f2.f1").resolve(f3); + + Schema outputSchema = SelectHelpers.getOutputSchema(f3, fieldAccessDescriptor); + + Row out = SelectHelpers.selectRow(r3, fieldAccessDescriptor, r3.getSchema(), outputSchema); + + assertEquals(outputSchema, f2); + assertEquals(out, r2); + } } From 0d667c08dd4527df99ae56b7305a3c7d6c22b71a Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Thu, 4 Apr 2019 14:45:30 -0700 Subject: [PATCH 10/14] Address code-review comments. --- .../sdk/schemas/utils/SelectHelpersTest.java | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java index 260bf35d0bce..f19bc324b81a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java @@ -340,6 +340,29 @@ public void testSelectFieldOfRecord() { Row out = SelectHelpers.selectRow(r3, fieldAccessDescriptor, r3.getSchema(), outputSchema); assertEquals(outputSchema, f2); - assertEquals(out, r2); + assertEquals(r2, out); + } + + @Test + public void testSelectFieldOfRecordOrRecord() { + Schema f1 = Schema.builder().addInt64Field("f0").build(); + Schema f2 = Schema.builder().addRowField("f1", f1).build(); + Schema f3 = Schema.builder().addRowField("f2", f2).build(); + Schema f4 = Schema.builder().addRowField("f3", f3).build(); + + Row r1 = Row.withSchema(f1).addValue(42L).build(); // {"f0": 42} + Row r2 = Row.withSchema(f2).addValue(r1).build(); // {"f1": {"f0": 42}} + Row r3 = Row.withSchema(f3).addValue(r2).build(); // {"f2": {"f1": {"f0": 42}}} + Row r4 = Row.withSchema(f4).addValue(r3).build(); // {"f3": {"f2": {"f1": {"f0": 42}}}} + + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("f3.f2").resolve(f4); + + Schema outputSchema = SelectHelpers.getOutputSchema(f4, fieldAccessDescriptor); + + Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor, r4.getSchema(), outputSchema); + + assertEquals(f3, outputSchema); + assertEquals(r3, out); } } From 73d5d4b4605e63d3f7f4ccaec087c3b90eb7ac6e Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Thu, 4 Apr 2019 22:33:42 -0700 Subject: [PATCH 11/14] Fix semantics for list and map selects. --- .../beam/sdk/schemas/utils/SelectHelpers.java | 144 ++++++++++++------ .../sdk/schemas/utils/SelectHelpersTest.java | 85 ++++++----- 2 files changed, 148 insertions(+), 81 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java index ab4db36da213..37742a5c06f7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java @@ -103,20 +103,15 @@ public static Schema getOutputSchema( FieldDescriptor fieldDescriptor = nested.getKey(); FieldAccessDescriptor nestedAccess = nested.getValue(); Field field = inputSchema.getField(checkNotNull(fieldDescriptor.getFieldId())); - - FieldType outputType = + Schema outputSchema = getOutputSchemaHelper(field.getType(), nestedAccess, fieldDescriptor.getQualifiers(), 0); - if (outputType.getTypeName().isCompositeType()) { - schemas.add(outputType.getRowSchema()); - } else { - schemas.add(Schema.builder().addField(field.getName(), outputType).build()); - } + schemas.add(outputSchema); } return union(schemas); } - private static FieldType getOutputSchemaHelper( + private static Schema getOutputSchemaHelper( FieldType inputFieldType, FieldAccessDescriptor fieldAccessDescriptor, List qualifiers, @@ -125,29 +120,39 @@ private static FieldType getOutputSchemaHelper( // We have walked through any containers, and are at a row type. Extract the subschema // for the row, preserving nullable attributes. checkArgument(inputFieldType.getTypeName().isCompositeType()); - return FieldType.row(getOutputSchema(inputFieldType.getRowSchema(), fieldAccessDescriptor)) - .withNullable(inputFieldType.getNullable()); + return getOutputSchema(inputFieldType.getRowSchema(), fieldAccessDescriptor); } Qualifier qualifier = qualifiers.get(qualifierPosition); + Schema.Builder builder = Schema.builder(); switch (qualifier.getKind()) { case LIST: checkArgument(qualifier.getList().equals(ListQualifier.ALL)); FieldType componentType = checkNotNull(inputFieldType.getCollectionElementType()); - FieldType outputComponent = + Schema outputComponent = getOutputSchemaHelper( - componentType, fieldAccessDescriptor, qualifiers, qualifierPosition + 1) - .withNullable(componentType.getNullable()); - return FieldType.array(outputComponent).withNullable(inputFieldType.getNullable()); + componentType, fieldAccessDescriptor, qualifiers, qualifierPosition + 1); + for (Field field : outputComponent.getFields()) { + Field newField = + Field.of(field.getName(), FieldType.array(field.getType())) + .withNullable(inputFieldType.getNullable()); + builder.addField(newField); + } + return builder.build(); case MAP: checkArgument(qualifier.getMap().equals(MapQualifier.ALL)); FieldType keyType = checkNotNull(inputFieldType.getMapKeyType()); FieldType valueType = checkNotNull(inputFieldType.getMapValueType()); - FieldType outputValueType = + Schema outputValueSchema = getOutputSchemaHelper( - valueType, fieldAccessDescriptor, qualifiers, qualifierPosition + 1) - .withNullable(valueType.getNullable()); - return FieldType.map(keyType, outputValueType).withNullable(inputFieldType.getNullable()); + valueType, fieldAccessDescriptor, qualifiers, qualifierPosition + 1); + for (Field field : outputValueSchema.getFields()) { + Field newField = + Field.of(field.getName(), FieldType.map(keyType, field.getType())) + .withNullable(inputFieldType.getNullable()); + builder.addField(newField); + } + return builder.build(); default: throw new RuntimeException("unexpected"); } @@ -214,23 +219,23 @@ private static void selectIntoRowHelper( // There are qualifiers. That means that the result will be either a list or a map, so // construct the result and add that to our Row. - output.addValue( - selectValueHelper(qualifiers, 0, value, fieldAccessDescriptor, inputType, outputType)); + selectIntoRowWithQualifiers( + qualifiers, 0, value, output, fieldAccessDescriptor, inputType, outputType); } - private static Object selectValueHelper( + private static void selectIntoRowWithQualifiers( List qualifiers, int qualifierPosition, Object value, + Row.Builder output, FieldAccessDescriptor fieldAccessDescriptor, FieldType inputType, FieldType outputType) { if (qualifierPosition >= qualifiers.size()) { // We have already constructed all arrays and maps. What remains must be a Row. Row row = (Row) value; - Row.Builder output = Row.withSchema(outputType.getRowSchema()); selectIntoRow(row, output, fieldAccessDescriptor); - return output.build(); + return; } Qualifier qualifier = qualifiers.get(qualifierPosition); @@ -240,38 +245,87 @@ private static Object selectValueHelper( FieldType nestedInputType = checkNotNull(inputType.getCollectionElementType()); FieldType nestedOutputType = checkNotNull(outputType.getCollectionElementType()); List list = (List) value; - List selectedList = Lists.newArrayListWithCapacity(list.size()); + + // When selecting multiple subelements under a list, we distribute the select + // resulting in multiple lists. For example, if there is a field "list" with type + // {a: string, b: int}[], selecting list.a, list.b results in a schema of type + // {a: string[], b: int[]}. This preserves the invariant that the name selected always + // appears in the top-level schema. + Schema tempSchema = Schema.builder().addField("a", nestedInputType).build(); + FieldAccessDescriptor tempAccessDescriptor = + FieldAccessDescriptor.create() + .withNestedField("a", fieldAccessDescriptor) + .resolve(tempSchema); + // TODO: doing this on each element might be inefficient. Consider caching this, or + // using codegen based on the schema. + Schema nestedSchema = getOutputSchema(tempSchema, tempAccessDescriptor); + + List> selectedLists = + Lists.newArrayListWithCapacity(nestedSchema.getFieldCount()); + for (int i = 0; i < nestedSchema.getFieldCount(); i++) { + selectedLists.add(Lists.newArrayListWithCapacity(list.size())); + } for (Object o : list) { - Object selected = - selectValueHelper( - qualifiers, - qualifierPosition + 1, - o, - fieldAccessDescriptor, - nestedInputType, - nestedOutputType); - selectedList.add(selected); + Row.Builder selectElementBuilder = Row.withSchema(nestedSchema); + selectIntoRowWithQualifiers( + qualifiers, + qualifierPosition + 1, + o, + selectElementBuilder, + fieldAccessDescriptor, + nestedInputType, + nestedOutputType); + + Row elementBeforeDistribution = selectElementBuilder.build(); + for (int i = 0; i < nestedSchema.getFieldCount(); ++i) { + selectedLists.get(i).add(elementBeforeDistribution.getValue(i)); + } } - return selectedList; + for (List aList : selectedLists) { + output.addValue(aList); + } + break; } case MAP: { FieldType nestedInputType = checkNotNull(inputType.getMapValueType()); FieldType nestedOutputType = checkNotNull(outputType.getMapValueType()); + + // When selecting multiple subelements under a map, we distribute the select + // resulting in multiple maps. The semantics are the same as for lists above (except we + // only support subelement select for map values, not for map keys). + Schema tempSchema = Schema.builder().addField("a", nestedInputType).build(); + FieldAccessDescriptor tempAccessDescriptor = + FieldAccessDescriptor.create() + .withNestedField("a", fieldAccessDescriptor) + .resolve(tempSchema); + Schema nestedSchema = getOutputSchema(tempSchema, tempAccessDescriptor); + List selectedMaps = Lists.newArrayListWithExpectedSize(nestedSchema.getFieldCount()); + for (int i = 0; i < nestedSchema.getFieldCount(); ++i) { + selectedMaps.add(Maps.newHashMap()); + } + Map map = (Map) value; - Map selectedMap = Maps.newHashMapWithExpectedSize(map.size()); for (Map.Entry entry : map.entrySet()) { - Object selected = - selectValueHelper( - qualifiers, - qualifierPosition + 1, - entry.getValue(), - fieldAccessDescriptor, - nestedInputType, - nestedOutputType); - selectedMap.put(entry.getKey(), selected); + Row.Builder selectValueBuilder = Row.withSchema(nestedSchema); + selectIntoRowWithQualifiers( + qualifiers, + qualifierPosition + 1, + entry.getValue(), + selectValueBuilder, + fieldAccessDescriptor, + nestedInputType, + nestedOutputType); + + Row valueBeforeDistribution = selectValueBuilder.build(); + for (int i = 0; i < nestedSchema.getFieldCount(); ++i) { + selectedMaps.get(i).put(entry.getKey(), valueBeforeDistribution.getValue(i)); + } + } + for (Map aMap : selectedMaps) { + output.addValue(aMap); } - return selectedMap; + break; } default: throw new RuntimeException("Unexpected type " + qualifier.getKind()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java index f19bc324b81a..3e72fd809ac7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; +import com.google.common.collect.Lists; import org.apache.beam.sdk.schemas.FieldAccessDescriptor; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -203,16 +204,11 @@ public void testSelectArrayOfRowPartial() { FieldAccessDescriptor.withFieldNames("rowArray[].field1").resolve(ARRAY_SCHEMA); Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor); - Schema expectedElementSchema = Schema.builder().addStringField("field1").build(); - Schema expectedSchema = - Schema.builder().addArrayField("rowArray", FieldType.row(expectedElementSchema)).build(); + Schema expectedSchema = Schema.builder().addArrayField("field1", FieldType.STRING).build(); assertEquals(expectedSchema, outputSchema); Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema); - - Row expectedElement = Row.withSchema(expectedElementSchema).addValue("first").build(); - Row expectedRow = - Row.withSchema(expectedSchema).addArray(expectedElement, expectedElement).build(); + Row expectedRow = Row.withSchema(expectedSchema).addArray("first", "first").build(); assertEquals(expectedRow, row); } @@ -222,19 +218,15 @@ public void testSelectArrayOfRowArray() { FieldAccessDescriptor.withFieldNames("arrayOfRowArray[][].field1").resolve(ARRAY_SCHEMA); Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor); - Schema expectedElementSchema = Schema.builder().addStringField("field1").build(); Schema expectedSchema = - Schema.builder() - .addArrayField("arrayOfRowArray", FieldType.array(FieldType.row(expectedElementSchema))) - .build(); + Schema.builder().addArrayField("field1", FieldType.array(FieldType.STRING)).build(); assertEquals(expectedSchema, outputSchema); Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema); - Row expectedElement = Row.withSchema(expectedElementSchema).addValue("first").build(); Row expectedRow = Row.withSchema(expectedSchema) - .addArray(ImmutableList.of(expectedElement), ImmutableList.of(expectedElement)) + .addArray(ImmutableList.of("first"), ImmutableList.of("first")) .build(); assertEquals(expectedRow, row); } @@ -247,17 +239,11 @@ public void testSelectArrayOfNestedRow() { Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor); Schema expectedElementSchema = Schema.builder().addStringField("field1").build(); - Schema expectedSchema = - Schema.builder() - .addArrayField("nestedRowArray", FieldType.row(expectedElementSchema)) - .build(); + Schema expectedSchema = Schema.builder().addArrayField("field1", FieldType.STRING).build(); assertEquals(expectedSchema, outputSchema); Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema); - - Row expectedElement = Row.withSchema(expectedElementSchema).addValue("first").build(); - Row expectedRow = - Row.withSchema(expectedSchema).addArray(expectedElement, expectedElement).build(); + Row expectedRow = Row.withSchema(expectedSchema).addArray("first", "first").build(); assertEquals(expectedRow, row); } @@ -269,16 +255,11 @@ public void testSelectMapOfRowSelectSingle() { Schema expectedValueSchema = Schema.builder().addStringField("field1").build(); Schema expectedSchema = - Schema.builder() - .addMapField("map", FieldType.INT32, FieldType.row(expectedValueSchema)) - .build(); + Schema.builder().addMapField("field1", FieldType.INT32, FieldType.STRING).build(); assertEquals(expectedSchema, outputSchema); Row row = SelectHelpers.selectRow(MAP_ROW, fieldAccessDescriptor, MAP_SCHEMA, outputSchema); - - Row expectedValueRow = Row.withSchema(expectedValueSchema).addValue("first").build(); - Row expectedRow = - Row.withSchema(expectedSchema).addValue(ImmutableMap.of(1, expectedValueRow)).build(); + Row expectedRow = Row.withSchema(expectedSchema).addValue(ImmutableMap.of(1, "first")).build(); assertEquals(expectedRow, row); } @@ -288,11 +269,20 @@ public void testSelectMapOfRowSelectAll() { FieldAccessDescriptor.withFieldNames("map{}.*").resolve(MAP_SCHEMA); Schema outputSchema = SelectHelpers.getOutputSchema(MAP_SCHEMA, fieldAccessDescriptor); Schema expectedSchema = - Schema.builder().addMapField("map", FieldType.INT32, FieldType.row(FLAT_SCHEMA)).build(); + Schema.builder() + .addMapField("field1", FieldType.INT32, FieldType.STRING) + .addMapField("field2", FieldType.INT32, FieldType.INT32) + .addMapField("field3", FieldType.INT32, FieldType.DOUBLE) + .build(); assertEquals(expectedSchema, outputSchema); Row row = SelectHelpers.selectRow(MAP_ROW, fieldAccessDescriptor, MAP_SCHEMA, outputSchema); - Row expectedRow = Row.withSchema(expectedSchema).addValue(ImmutableMap.of(1, FLAT_ROW)).build(); + Row expectedRow = + Row.withSchema(expectedSchema) + .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(0))) + .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(1))) + .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(2))) + .build(); assertEquals(expectedRow, row); } @@ -302,22 +292,19 @@ public void testSelectMapOfArray() { FieldAccessDescriptor.withFieldNames("map.field1").resolve(MAP_ARRAY_SCHEMA); Schema outputSchema = SelectHelpers.getOutputSchema(MAP_ARRAY_SCHEMA, fieldAccessDescriptor); - Schema expectedValueSchema = Schema.builder().addStringField("field1").build(); Schema expectedSchema = Schema.builder() - .addMapField( - "map", FieldType.INT32, FieldType.array(FieldType.row(expectedValueSchema))) + .addMapField("field1", FieldType.INT32, FieldType.array(FieldType.STRING)) .build(); assertEquals(expectedSchema, outputSchema); Row row = SelectHelpers.selectRow( MAP_ARRAY_ROW, fieldAccessDescriptor, MAP_ARRAY_SCHEMA, outputSchema); - Row expectedElement = Row.withSchema(expectedValueSchema).addValue("first").build(); Row expectedRow = Row.withSchema(expectedSchema) - .addValue(ImmutableMap.of(1, ImmutableList.of(expectedElement))) + .addValue(ImmutableMap.of(1, ImmutableList.of("first"))) .build(); assertEquals(expectedRow, row); } @@ -339,7 +326,7 @@ public void testSelectFieldOfRecord() { Row out = SelectHelpers.selectRow(r3, fieldAccessDescriptor, r3.getSchema(), outputSchema); - assertEquals(outputSchema, f2); + assertEquals(f2, outputSchema); assertEquals(r2, out); } @@ -365,4 +352,30 @@ public void testSelectFieldOfRecordOrRecord() { assertEquals(f3, outputSchema); assertEquals(r3, out); } + + @Test + public void testArrayRowArray() { + Schema f1 = Schema.builder().addStringField("f0").build(); + Schema f2 = Schema.builder().addArrayField("f1", FieldType.row(f1)).build(); + Schema f3 = Schema.builder().addRowField("f2", f2).build(); + Schema f4 = Schema.builder().addArrayField("f3", FieldType.row(f3)).build(); + + Row r1 = Row.withSchema(f1).addValue("first").build(); + Row r2 = Row.withSchema(f2).addArray(r1, r1).build(); + Row r3 = Row.withSchema(f3).addValue(r2).build(); + Row r4 = Row.withSchema(f4).addArray(r3, r3).build(); + + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("f3.f2.f1.f0").resolve(f4); + Schema outputSchema = SelectHelpers.getOutputSchema(f4, fieldAccessDescriptor); + Schema expectedSchema = + Schema.builder().addArrayField("f0", FieldType.array(FieldType.STRING)).build(); + assertEquals(expectedSchema, outputSchema); + Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor, r4.getSchema(), outputSchema); + Row expected = + Row.withSchema(outputSchema) + .addArray(Lists.newArrayList("first", "first"), Lists.newArrayList("first", "first")) + .build(); + assertEquals(expected, out); + } } From 7a4482a169f24feca2f2f8f9eeed3b2b03e73146 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Fri, 5 Apr 2019 10:46:25 -0700 Subject: [PATCH 12/14] Fix SelectTest. --- .../sdk/schemas/transforms/SelectTest.java | 128 +++++++++++------- 1 file changed, 80 insertions(+), 48 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java index b4131719c30a..2bd5e28e453d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java @@ -261,24 +261,22 @@ public int hashCode() { @DefaultSchema(JavaFieldSchema.class) static class PartialRowSingleArray { - List field1 = - ImmutableList.of(new POJO1Selected(), new POJO1Selected(), new POJO1Selected()); + List field1 = ImmutableList.of("field1", "field1", "field1"); + List field3 = ImmutableList.of(3.14, 3.14, 3.14); @Override public boolean equals(Object o) { if (this == o) { return true; } - if (!(o instanceof PartialRowSingleArray)) { - return false; - } + if (!(o instanceof PartialRowSingleArray)) return false; PartialRowSingleArray that = (PartialRowSingleArray) o; - return Objects.equals(field1, that.field1); + return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3); } @Override public int hashCode() { - return Objects.hash(field1); + return Objects.hash(field1, field3); } } @@ -322,27 +320,30 @@ public int hashCode() { @DefaultSchema(JavaFieldSchema.class) static class PartialRowSingleMap { - Map field1 = + Map field1 = ImmutableMap.of( - "key1", new POJO1Selected(), - "key2", new POJO1Selected(), - "key3", new POJO1Selected()); + "key1", "field1", + "key2", "field1", + "key3", "field1"); + Map field3 = + ImmutableMap.of( + "key1", 3.14, + "key2", 3.14, + "key3", 3.14); @Override public boolean equals(Object o) { if (this == o) { return true; } - if (!(o instanceof PartialRowSingleMap)) { - return false; - } + if (!(o instanceof PartialRowSingleMap)) return false; PartialRowSingleMap that = (PartialRowSingleMap) o; - return Objects.equals(field1, that.field1); + return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3); } @Override public int hashCode() { - return Objects.hash(field1); + return Objects.hash(field1, field3); } } @@ -393,8 +394,17 @@ static class PartialRowMultipleArray { private static final List> POJO_LIST_LIST = ImmutableList.of(POJO_LIST, POJO_LIST, POJO_LIST); - List>> field1 = - ImmutableList.of(POJO_LIST_LIST, POJO_LIST_LIST, POJO_LIST_LIST); + private static final List STRING_LIST = ImmutableList.of("field1", "field1", "field1"); + private static final List> STRING_LISTLIST = + ImmutableList.of(STRING_LIST, STRING_LIST, STRING_LIST); + List>> field1 = + ImmutableList.of(STRING_LISTLIST, STRING_LISTLIST, STRING_LISTLIST); + + private static final List DOUBLE_LIST = ImmutableList.of(3.14, 3.14, 3.14); + private static final List> DOUBLE_LISTLIST = + ImmutableList.of(DOUBLE_LIST, DOUBLE_LIST, DOUBLE_LIST); + List>> field3 = + ImmutableList.of(DOUBLE_LISTLIST, DOUBLE_LISTLIST, DOUBLE_LISTLIST); @Override public boolean equals(Object o) { @@ -473,37 +483,51 @@ public int hashCode() { @DefaultSchema(JavaFieldSchema.class) static class PartialRowMultipleMaps { - static final Map POJO_MAP = + static final Map STRING_MAP = ImmutableMap.of( - "key1", new POJO1Selected(), - "key2", new POJO1Selected(), - "key3", new POJO1Selected()); - static final Map> POJO_MAP_MAP = + "key1", "field1", + "key2", "field1", + "key3", "field1"); + static final Map> STRING_MAPMAP = ImmutableMap.of( - "key1", POJO_MAP, - "key2", POJO_MAP, - "key3", POJO_MAP); - Map>> field1 = + "key1", STRING_MAP, + "key2", STRING_MAP, + "key3", STRING_MAP); + Map>> field1 = ImmutableMap.of( - "key1", POJO_MAP_MAP, - "key2", POJO_MAP_MAP, - "key3", POJO_MAP_MAP); + "key1", STRING_MAPMAP, + "key2", STRING_MAPMAP, + "key3", STRING_MAPMAP); + static final Map DOUBLE_MAP = + ImmutableMap.of( + "key1", 3.14, + "key2", 3.14, + "key3", 3.14); + static final Map> DOUBLE_MAPMAP = + ImmutableMap.of( + "key1", DOUBLE_MAP, + "key2", DOUBLE_MAP, + "key3", DOUBLE_MAP); + + Map>> field3 = + ImmutableMap.of( + "key1", DOUBLE_MAPMAP, + "key2", DOUBLE_MAPMAP, + "key3", DOUBLE_MAPMAP); @Override public boolean equals(Object o) { if (this == o) { return true; } - if (!(o instanceof PartialRowMultipleMaps)) { - return false; - } + if (!(o instanceof PartialRowMultipleMaps)) return false; PartialRowMultipleMaps that = (PartialRowMultipleMaps) o; - return Objects.equals(field1, that.field1); + return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3); } @Override public int hashCode() { - return Objects.hash(field1); + return Objects.hash(field1, field3); } } @@ -558,31 +582,39 @@ public int hashCode() { @DefaultSchema(JavaFieldSchema.class) static class PartialRowNestedArraysAndMaps { - static final List POJO_LIST = - ImmutableList.of(new POJO1Selected(), new POJO1Selected(), new POJO1Selected()); - static final Map> POJO_MAP_LIST = + static final Map> STRING_MAP = ImmutableMap.of( - "key1", POJO_LIST, - "key2", POJO_LIST, - "key3", POJO_LIST); - List>> field1 = - ImmutableList.of(POJO_MAP_LIST, POJO_MAP_LIST, POJO_MAP_LIST); + "key1", ImmutableList.of("field1", "field1", "field1"), + "key2", ImmutableList.of("field1", "field1", "field1"), + "key3", ImmutableList.of("field1", "field1", "field1")); + List>> field1 = ImmutableList.of(STRING_MAP, STRING_MAP, STRING_MAP); + + static final Map> DOUBLE_MAP = + ImmutableMap.of( + "key1", ImmutableList.of(3.14, 3.14, 3.14), + "key2", ImmutableList.of(3.14, 3.14, 3.14), + "key3", ImmutableList.of(3.14, 3.14, 3.14)); + + List>> field3 = ImmutableList.of(DOUBLE_MAP, DOUBLE_MAP, DOUBLE_MAP); @Override public boolean equals(Object o) { if (this == o) { return true; } - if (!(o instanceof PartialRowNestedArraysAndMaps)) { - return false; - } + if (!(o instanceof PartialRowNestedArraysAndMaps)) return false; PartialRowNestedArraysAndMaps that = (PartialRowNestedArraysAndMaps) o; - return Objects.equals(field1, that.field1); + return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3); } @Override public int hashCode() { - return Objects.hash(field1); + return Objects.hash(field1, field3); + } + + @Override + public String toString() { + return "PartialRowNestedArraysAndMaps{" + "field1=" + field1 + ", field3=" + field3 + '}'; } } From c03d6491bbb846aa250dbac859f6b1299bbb0b2f Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Mon, 8 Apr 2019 17:03:56 -0700 Subject: [PATCH 13/14] Fix CheckStyle failures --- .../beam/sdk/schemas/transforms/SelectTest.java | 12 +++++++++--- .../beam/sdk/schemas/utils/SelectHelpersTest.java | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java index 2bd5e28e453d..aaba41b94468 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java @@ -336,7 +336,9 @@ public boolean equals(Object o) { if (this == o) { return true; } - if (!(o instanceof PartialRowSingleMap)) return false; + if (!(o instanceof PartialRowSingleMap)) { + return false; + } PartialRowSingleMap that = (PartialRowSingleMap) o; return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3); } @@ -520,7 +522,9 @@ public boolean equals(Object o) { if (this == o) { return true; } - if (!(o instanceof PartialRowMultipleMaps)) return false; + if (!(o instanceof PartialRowMultipleMaps)) { + return false; + } PartialRowMultipleMaps that = (PartialRowMultipleMaps) o; return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3); } @@ -602,7 +606,9 @@ public boolean equals(Object o) { if (this == o) { return true; } - if (!(o instanceof PartialRowNestedArraysAndMaps)) return false; + if (!(o instanceof PartialRowNestedArraysAndMaps)) { + return false; + } PartialRowNestedArraysAndMaps that = (PartialRowNestedArraysAndMaps) o; return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java index 3e72fd809ac7..77183bf08e18 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java @@ -19,13 +19,13 @@ import static org.junit.Assert.assertEquals; -import com.google.common.collect.Lists; import org.apache.beam.sdk.schemas.FieldAccessDescriptor; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; import org.junit.Test; /** Tests for {@link SelectHelpers}. */ From 4d9e63d368dc16c246ce92bf100df0decbe1c4db Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 9 Apr 2019 09:22:19 -0700 Subject: [PATCH 14/14] Fix CheckStyle --- .../org/apache/beam/sdk/schemas/transforms/SelectTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java index aaba41b94468..f2728e0e7aee 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java @@ -269,7 +269,9 @@ public boolean equals(Object o) { if (this == o) { return true; } - if (!(o instanceof PartialRowSingleArray)) return false; + if (!(o instanceof PartialRowSingleArray)) { + return false; + } PartialRowSingleArray that = (PartialRowSingleArray) o; return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3); }