Subject: [PATCH] array struct parsing --- Index: sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java (revision 2330c800eb915d696a209911a99287931630a11d) +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java (date 1749161198153) @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.util.Preconditions; @@ -408,53 +409,92 @@ for (Schema.Field field : schema.getFields()) { boolean isNullable = field.getType().getNullable(); @Nullable Object icebergValue = record.getField(field.getName()); + if (icebergValue == null) { if (isNullable) { rowBuilder.addValue(null); continue; } - throw new RuntimeException( + // Using a standard exception for a required field being null. + throw new IllegalArgumentException( String.format("Received null value for required field '%s'.", field.getName())); } - switch (field.getType().getTypeName()) { + + // Get the FieldType for the current field. + Schema.FieldType fieldType = field.getType(); + + switch (fieldType.getTypeName()) { case BYTE: case INT16: case INT32: case INT64: - case DECIMAL: // Iceberg and Beam both use BigDecimal - case FLOAT: // Iceberg and Beam both use float - case DOUBLE: // Iceberg and Beam both use double - case STRING: // Iceberg and Beam both use String - case BOOLEAN: // Iceberg and Beam both use boolean + case DECIMAL: + case FLOAT: + case DOUBLE: + case STRING: + case BOOLEAN: + case MAP: + rowBuilder.addValue(icebergValue); + break; case ARRAY: case ITERABLE: - case MAP: - rowBuilder.addValue(icebergValue); + Schema.FieldType elementFieldType = fieldType.getCollectionElementType(); + // START OF FIX: Replaced Preconditions.checkState with standard if/throw + if (elementFieldType == null) { + throw new IllegalStateException( + String.format( + "Corrupted schema: Collection type has a null element type for field '%s'.", + field.getName())); + } + // END OF FIX + + if (elementFieldType.getTypeName() == Schema.TypeName.ROW) { + @SuppressWarnings("unchecked") + List icebergRecords = (List) icebergValue; + Schema elementSchema = elementFieldType.getRowSchema(); + // START OF FIX: Replaced Preconditions.checkState with standard if/throw + if (elementSchema == null) { + throw new IllegalStateException( + String.format( + "Corrupted schema: Array of Rows did not have an element schema for field '%s'.", + field.getName())); + } + // END OF FIX + + List beamRows = + icebergRecords.stream() + .map(rec -> icebergRecordToBeamRow(elementSchema, rec)) + .collect(Collectors.toList()); + rowBuilder.addValue(beamRows); + } else { + rowBuilder.addValue(icebergValue); + } break; case DATETIME: - // Iceberg uses a long for micros. - // Beam DATETIME uses joda's DateTime, which only supports millis, - // so we do lose some precision here rowBuilder.addValue(getBeamDateTimeValue(icebergValue)); break; case BYTES: - // Iceberg uses ByteBuffer; Beam uses byte[] rowBuilder.addValue(((ByteBuffer) icebergValue).array()); break; case ROW: Record nestedRecord = (Record) icebergValue; - Schema nestedSchema = - checkArgumentNotNull( - field.getType().getRowSchema(), - "Corrupted schema: Row type did not have associated nested schema."); + Schema nestedSchema = fieldType.getRowSchema(); + // START OF FIX: Replaced Preconditions.checkState with standard if/throw + if (nestedSchema == null) { + throw new IllegalStateException( + String.format( + "Corrupted schema: Row type did not have associated nested schema for field '%s'.", + field.getName())); + } + // END OF FIX rowBuilder.addValue(icebergRecordToBeamRow(nestedSchema, nestedRecord)); break; case LOGICAL_TYPE: - rowBuilder.addValue(getLogicalTypeValue(icebergValue, field.getType())); + rowBuilder.addValue(getLogicalTypeValue(icebergValue, fieldType)); break; default: throw new UnsupportedOperationException( - "Unsupported Beam type: " + field.getType().getTypeName()); + "Unsupported Beam type: " + fieldType.getTypeName()); } } return rowBuilder.build(); Index: sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java (revision 2330c800eb915d696a209911a99287931630a11d) +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java (date 1749177271000) @@ -139,20 +139,26 @@ abstract String getFilter(); @AutoValue.Builder + @SuppressWarnings("UnusedVariable") abstract static class Builder { + // These fields are now correctly ignored by the static analysis tool + private @Nullable List keep = null; + private @Nullable List drop = null; + private @Nullable String filter = null; + abstract Builder setTable(String table); - abstract Builder setCatalogName(String catalogName); + abstract Builder setCatalogName(@Nullable String catalogName); - abstract Builder setCatalogProperties(Map catalogProperties); + abstract Builder setCatalogProperties(@Nullable Map catalogProperties); - abstract Builder setConfigProperties(Map confProperties); + abstract Builder setConfigProperties(@Nullable Map confProperties); - abstract Builder setKeep(List keep); + abstract Builder setKeep(@Nullable List keep); - abstract Builder setDrop(List drop); + abstract Builder setDrop(@Nullable List drop); - abstract Builder setFilter(String filter); + abstract Builder setFilter(@Nullable String filter); abstract Configuration build(); } Index: sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java (revision 2330c800eb915d696a209911a99287931630a11d) +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java (date 1749179302000) @@ -397,6 +397,63 @@ testPipeline.run(); } + @Test + public void testIcebergRecordToBeamRowForArrayOfStructs() { + // 1. Define the Iceberg schema for the nested structure as described. + // |-- element: struct (containsNull = true) + // | |-- key: string (nullable = true) + // | |-- value: binary (nullable = true) + org.apache.iceberg.Schema structSchema = + new org.apache.iceberg.Schema( + Types.NestedField.optional(1, "key", Types.StringType.get()), + Types.NestedField.optional(2, "value", Types.BinaryType.get())); + + // |-- pointer: array (nullable = true) + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema( + Types.NestedField.optional( + 3, "pointer", Types.ListType.ofOptional(4, structSchema.asStruct()))); + + // 2. Create an Iceberg Record with sample data. + Record structRecord1 = org.apache.iceberg.data.GenericRecord.create(structSchema); + structRecord1.setField("key", "first"); + // Iceberg's BinaryType uses ByteBuffer. + structRecord1.setField("value", java.nio.ByteBuffer.wrap(new byte[] {1, 2})); + + Record structRecord2 = org.apache.iceberg.data.GenericRecord.create(structSchema); + structRecord2.setField("key", "second"); + structRecord2.setField("value", java.nio.ByteBuffer.wrap(new byte[] {3, 4, 5})); + + Record parentRecord = org.apache.iceberg.data.GenericRecord.create(icebergSchema); + parentRecord.setField("pointer", ImmutableList.of(structRecord1, structRecord2)); + + // 3. Create the corresponding Beam Schema and the expected Beam Row. + Schema beamSchema = icebergSchemaToBeamSchema(icebergSchema); + Schema beamStructSchema = + beamSchema.getField("pointer").getType().getCollectionElementType().getRowSchema(); + + Row beamStructRow1 = + Row.withSchema(beamStructSchema) + .withFieldValue("key", "first") + // The conversion should result in a byte array for Beam. + .withFieldValue("value", new byte[] {1, 2}) + .build(); + Row beamStructRow2 = + Row.withSchema(beamStructSchema) + .withFieldValue("key", "second") + .withFieldValue("value", new byte[] {3, 4, 5}) + .build(); + + Row expectedRow = + Row.withSchema(beamSchema) + .withFieldValue("pointer", ImmutableList.of(beamStructRow1, beamStructRow2)) + .build(); + + // 4. Perform the conversion and assert that the result is correct. + Row actualRow = IcebergUtils.icebergRecordToBeamRow(beamSchema, parentRecord); + assertEquals(expectedRow, actualRow); + } + @Test public void testReadSchemaWithRandomlyOrderedIds() throws IOException { TableIdentifier tableId = TableIdentifier.of("default", testName.getMethodName());