diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java index 044b4fb5eb..11e4fbd67c 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java @@ -320,7 +320,8 @@ public InternalRow read(Decoder decoder, Object reuse) throws IOException { int index = decoder.readIndex(); if (index == nullIndex) { // if it is a null data, directly return null as the whole union result - return null; + // we know for sure it is a null so the casting will always work. + return (InternalRow) readers[nullIndex].read(decoder, reuse); } // otherwise, we need to return an InternalRow as a struct data diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java index 636a4e6baf..fa3ff37e42 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java @@ -32,6 +32,7 @@ import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.AvroIterable; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.spark.sql.catalyst.InternalRow; import org.junit.Assert; @@ -275,4 +276,143 @@ public void testDeeplyNestedUnionSchema2() throws IOException { Assert.assertEquals(1, rows.get(0).getArray(0).getStruct(0, 3).getStruct(1, 1).getInt(0)); } } + + @Test + public void testDeeplyNestedUnionSchema3() throws IOException { + /* + * the printed write schema: + * { + "type": "record", + "name": "root", + "fields": [ + { + "name": "value", + "type": [ + { + "type": "record", + "name": "r1", + "fields": [ + { + "name": "ff1", + "type": "long" + }, + { + "name": "ff2", + "type": { + "type": "record", + "name": "r2", + "fields": [ + { + "name": "fff1", + "type": [ + "null", + "string", + "int" + ], + "default": null + } + ] + } + }, + { + "name": "ff3", + "type": { + "type": "array", + "items": "string" + }, + "default": [] + } + ] + }, + "null" + ] + } + ] + } + * */ + org.apache.avro.Schema writeSchema = SchemaBuilder + .record("root") + .fields() + .name("value") + .type() + .unionOf() + .record("r1") + .fields() + .name("ff1") + .type() + .longType() + .noDefault() + .name("ff2") + .type() + .record("r2") + .fields() + .name("fff1") + .type() + .unionOf() + .nullType() + .and() + .stringType() + .and() + .intType() + .endUnion() + .nullDefault() + .endRecord() + .noDefault() + .name("ff3") + .type() + .array() + .items() + .stringType() + .arrayDefault(ImmutableList.of()) + .endRecord() + .and() + .nullType() + .endUnion() + .noDefault() + .endRecord(); + + GenericData.Record record1 = new GenericData.Record(writeSchema); + GenericData.Record record11 = new GenericData.Record(writeSchema.getField("value").schema().getTypes().get(0)); + GenericData.Record record111 = + new GenericData.Record(writeSchema.getField("value").schema().getTypes().get(0).getField("ff2").schema()); + // record111.put("fff1", 1); + record11.put("ff1", 99); + record11.put("ff2", record111); + record11.put("ff3", ImmutableList.of()); + record1.put("value", record11); + + GenericData.Record record2 = new GenericData.Record(writeSchema); + GenericData.Record record22 = new GenericData.Record(writeSchema.getField("value").schema().getTypes().get(0)); + GenericData.Record record222 = + new GenericData.Record(writeSchema.getField("value").schema().getTypes().get(0).getField("ff2").schema()); + record222.put("fff1", 1); + record22.put("ff1", 99); + record22.put("ff2", record222); + record22.put("ff3", ImmutableList.of("foo")); + record2.put("value", record22); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>())) { + writer.create(writeSchema, testFile); + writer.append(record1); + writer.append(record2); + } + + List expected = ImmutableList.of(record1, record2); + + org.apache.iceberg.Schema readIcebergSchema = AvroSchemaUtil.toIceberg(writeSchema); + // read written rows with evolved schema + List rows; + try (AvroIterable reader = Avro.read(Files.localInput(testFile)) + .createReaderFunc(SparkAvroReader::new) + .project(readIcebergSchema) + .build()) { + rows = Lists.newArrayList(reader); + } + + // making sure the rows can be read successfully + Assert.assertEquals(2, rows.size()); + } }