Skip to content

Commit e7149e0

Browse files
committed
[LI][Spark][Avro] read avro union using decoder instead of directly returning v… (linkedin#94)
* [LI][Spark] read avro union using decoder instead of directly returning value * Add a comment for the schema
1 parent a8bf3fb commit e7149e0

File tree

2 files changed

+142
-1
lines changed

2 files changed

+142
-1
lines changed

spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,8 @@ public InternalRow read(Decoder decoder, Object reuse) throws IOException {
320320
int index = decoder.readIndex();
321321
if (index == nullIndex) {
322322
// if it is a null data, directly return null as the whole union result
323-
return null;
323+
// we know for sure it is a null so the casting will always work.
324+
return (InternalRow) readers[nullIndex].read(decoder, reuse);
324325
}
325326

326327
// otherwise, we need to return an InternalRow as a struct data

spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.iceberg.avro.Avro;
3333
import org.apache.iceberg.avro.AvroIterable;
3434
import org.apache.iceberg.avro.AvroSchemaUtil;
35+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
3536
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3637
import org.apache.spark.sql.catalyst.InternalRow;
3738
import org.junit.Assert;
@@ -275,4 +276,143 @@ public void testDeeplyNestedUnionSchema2() throws IOException {
275276
Assert.assertEquals(1, rows.get(0).getArray(0).getStruct(0, 3).getStruct(1, 1).getInt(0));
276277
}
277278
}
279+
280+
@Test
281+
public void testDeeplyNestedUnionSchema3() throws IOException {
282+
/*
283+
* the printed write schema:
284+
* {
285+
"type": "record",
286+
"name": "root",
287+
"fields": [
288+
{
289+
"name": "value",
290+
"type": [
291+
{
292+
"type": "record",
293+
"name": "r1",
294+
"fields": [
295+
{
296+
"name": "ff1",
297+
"type": "long"
298+
},
299+
{
300+
"name": "ff2",
301+
"type": {
302+
"type": "record",
303+
"name": "r2",
304+
"fields": [
305+
{
306+
"name": "fff1",
307+
"type": [
308+
"null",
309+
"string",
310+
"int"
311+
],
312+
"default": null
313+
}
314+
]
315+
}
316+
},
317+
{
318+
"name": "ff3",
319+
"type": {
320+
"type": "array",
321+
"items": "string"
322+
},
323+
"default": []
324+
}
325+
]
326+
},
327+
"null"
328+
]
329+
}
330+
]
331+
}
332+
* */
333+
org.apache.avro.Schema writeSchema = SchemaBuilder
334+
.record("root")
335+
.fields()
336+
.name("value")
337+
.type()
338+
.unionOf()
339+
.record("r1")
340+
.fields()
341+
.name("ff1")
342+
.type()
343+
.longType()
344+
.noDefault()
345+
.name("ff2")
346+
.type()
347+
.record("r2")
348+
.fields()
349+
.name("fff1")
350+
.type()
351+
.unionOf()
352+
.nullType()
353+
.and()
354+
.stringType()
355+
.and()
356+
.intType()
357+
.endUnion()
358+
.nullDefault()
359+
.endRecord()
360+
.noDefault()
361+
.name("ff3")
362+
.type()
363+
.array()
364+
.items()
365+
.stringType()
366+
.arrayDefault(ImmutableList.of())
367+
.endRecord()
368+
.and()
369+
.nullType()
370+
.endUnion()
371+
.noDefault()
372+
.endRecord();
373+
374+
GenericData.Record record1 = new GenericData.Record(writeSchema);
375+
GenericData.Record record11 = new GenericData.Record(writeSchema.getField("value").schema().getTypes().get(0));
376+
GenericData.Record record111 =
377+
new GenericData.Record(writeSchema.getField("value").schema().getTypes().get(0).getField("ff2").schema());
378+
// record111.put("fff1", 1);
379+
record11.put("ff1", 99);
380+
record11.put("ff2", record111);
381+
record11.put("ff3", ImmutableList.of());
382+
record1.put("value", record11);
383+
384+
GenericData.Record record2 = new GenericData.Record(writeSchema);
385+
GenericData.Record record22 = new GenericData.Record(writeSchema.getField("value").schema().getTypes().get(0));
386+
GenericData.Record record222 =
387+
new GenericData.Record(writeSchema.getField("value").schema().getTypes().get(0).getField("ff2").schema());
388+
record222.put("fff1", 1);
389+
record22.put("ff1", 99);
390+
record22.put("ff2", record222);
391+
record22.put("ff3", ImmutableList.of("foo"));
392+
record2.put("value", record22);
393+
394+
File testFile = temp.newFile();
395+
Assert.assertTrue("Delete should succeed", testFile.delete());
396+
397+
try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
398+
writer.create(writeSchema, testFile);
399+
writer.append(record1);
400+
writer.append(record2);
401+
}
402+
403+
List<GenericData.Record> expected = ImmutableList.of(record1, record2);
404+
405+
org.apache.iceberg.Schema readIcebergSchema = AvroSchemaUtil.toIceberg(writeSchema);
406+
// read written rows with evolved schema
407+
List<InternalRow> rows;
408+
try (AvroIterable<InternalRow> reader = Avro.read(Files.localInput(testFile))
409+
.createReaderFunc(SparkAvroReader::new)
410+
.project(readIcebergSchema)
411+
.build()) {
412+
rows = Lists.newArrayList(reader);
413+
}
414+
415+
// making sure the rows can be read successfully
416+
Assert.assertEquals(2, rows.size());
417+
}
278418
}

0 commit comments

Comments
 (0)