Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ public InternalRow read(Decoder decoder, Object reuse) throws IOException {
int index = decoder.readIndex();
if (index == nullIndex) {
// if it is a null data, directly return null as the whole union result
return null;
// we know for sure it is a null so the casting will always work.
return (InternalRow) readers[nullIndex].read(decoder, reuse);
}

// otherwise, we need to return an InternalRow as a struct data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.Assert;
Expand Down Expand Up @@ -275,4 +276,143 @@ public void testDeeplyNestedUnionSchema2() throws IOException {
Assert.assertEquals(1, rows.get(0).getArray(0).getStruct(0, 3).getStruct(1, 1).getInt(0));
}
}

@Test
public void testDeeplyNestedUnionSchema3() throws IOException {
/*
* the printed write schema:
* {
"type": "record",
"name": "root",
"fields": [
{
"name": "value",
"type": [
{
"type": "record",
"name": "r1",
"fields": [
{
"name": "ff1",
"type": "long"
},
{
"name": "ff2",
"type": {
"type": "record",
"name": "r2",
"fields": [
{
"name": "fff1",
"type": [
"null",
"string",
"int"
],
"default": null
}
]
}
},
{
"name": "ff3",
"type": {
"type": "array",
"items": "string"
},
"default": []
}
]
},
"null"
]
}
]
}
* */
org.apache.avro.Schema writeSchema = SchemaBuilder

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it makes sense to add a comment for the string format schema? It is hard to read the actual schema this test is testing against.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will add a pretty printed json string version of the schema as a comment.

.record("root")
.fields()
.name("value")
.type()
.unionOf()
.record("r1")
.fields()
.name("ff1")
.type()
.longType()
.noDefault()
.name("ff2")
.type()
.record("r2")
.fields()
.name("fff1")
.type()
.unionOf()
.nullType()
.and()
.stringType()
.and()
.intType()
.endUnion()
.nullDefault()
.endRecord()
.noDefault()
.name("ff3")
.type()
.array()
.items()
.stringType()
.arrayDefault(ImmutableList.of())
.endRecord()
.and()
.nullType()
.endUnion()
.noDefault()
.endRecord();

GenericData.Record record1 = new GenericData.Record(writeSchema);
GenericData.Record record11 = new GenericData.Record(writeSchema.getField("value").schema().getTypes().get(0));
GenericData.Record record111 =
new GenericData.Record(writeSchema.getField("value").schema().getTypes().get(0).getField("ff2").schema());
// record111.put("fff1", 1);
record11.put("ff1", 99);
record11.put("ff2", record111);
record11.put("ff3", ImmutableList.of());
record1.put("value", record11);

GenericData.Record record2 = new GenericData.Record(writeSchema);
GenericData.Record record22 = new GenericData.Record(writeSchema.getField("value").schema().getTypes().get(0));
GenericData.Record record222 =
new GenericData.Record(writeSchema.getField("value").schema().getTypes().get(0).getField("ff2").schema());
record222.put("fff1", 1);
record22.put("ff1", 99);
record22.put("ff2", record222);
record22.put("ff3", ImmutableList.of("foo"));
record2.put("value", record22);

File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());

try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
writer.create(writeSchema, testFile);
writer.append(record1);
writer.append(record2);
}

List<GenericData.Record> expected = ImmutableList.of(record1, record2);

org.apache.iceberg.Schema readIcebergSchema = AvroSchemaUtil.toIceberg(writeSchema);
// read written rows with evolved schema
List<InternalRow> rows;
try (AvroIterable<InternalRow> reader = Avro.read(Files.localInput(testFile))
.createReaderFunc(SparkAvroReader::new)
.project(readIcebergSchema)
.build()) {
rows = Lists.newArrayList(reader);
}

// making sure the rows can be read successfully
Assert.assertEquals(2, rows.size());
}
}