Skip to content

Commit fc51e32

Browse files
committed
Fix ORC schema visitors to support reading ORC files with deeply nest… (linkedin#81)
* Fix ORC schema visitors to support reading ORC files with deeply nested union type schema * Added test for vectorized read
1 parent 52ec9b9 commit fc51e32

File tree

3 files changed

+114
-12
lines changed

3 files changed

+114
-12
lines changed

orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,12 @@ private static TypeDescription buildOrcProjectForStructType(Integer fieldId, Typ
312312
TypeDescription orcType;
313313
OrcField orcField = mapping.getOrDefault(fieldId, null);
314314
if (orcField != null && orcField.type.getCategory().equals(TypeDescription.Category.UNION)) {
315-
orcType = orcField.type;
315+
orcType = TypeDescription.createUnion();
316+
for (Types.NestedField nestedField : type.asStructType().fields()) {
317+
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
318+
isRequired && nestedField.isRequired(), mapping);
319+
orcType.addUnionChild(childType);
320+
}
316321
} else {
317322
orcType = TypeDescription.createStruct();
318323
for (Types.NestedField nestedField : type.asStructType().fields()) {

orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,14 @@ public static <T> T visit(TypeDescription schema, OrcSchemaVisitor<T> visitor) {
4949
case UNION:
5050
List<TypeDescription> types = schema.getChildren();
5151
List<T> options = Lists.newArrayListWithExpectedSize(types.size());
52-
for (TypeDescription type : types) {
53-
visitor.beforeUnionOption(type);
52+
for (int i = 0; i < types.size(); i++) {
53+
visitor.beforeUnionOption(types.get(i), i);
5454
try {
55-
options.add(visit(type, visitor));
55+
options.add(visit(types.get(i), visitor));
5656
} finally {
57-
visitor.afterUnionOption(type);
57+
visitor.afterUnionOption(types.get(i), i);
5858
}
5959
}
60-
6160
return visitor.union(schema, options);
6261

6362
case LIST:
@@ -123,8 +122,8 @@ private static <T> T visitRecord(TypeDescription record, OrcSchemaVisitor<T> vis
123122
return visitor.record(record, names, visitFields(fields, names, visitor));
124123
}
125124

126-
public String optionName() {
127-
return "_option";
125+
public String optionName(int ordinal) {
126+
return "tag_" + ordinal;
128127
}
129128

130129
public String elementName() {
@@ -151,12 +150,12 @@ public void afterField(String name, TypeDescription type) {
151150
fieldNames.pop();
152151
}
153152

154-
public void beforeUnionOption(TypeDescription option) {
155-
beforeField(optionName(), option);
153+
public void beforeUnionOption(TypeDescription option, int ordinal) {
154+
beforeField(optionName(ordinal), option);
156155
}
157156

158-
public void afterUnionOption(TypeDescription option) {
159-
afterField(optionName(), option);
157+
public void afterUnionOption(TypeDescription option, int ordinal) {
158+
afterField(optionName(ordinal), option);
160159
}
161160

162161
public void beforeElementField(TypeDescription element) {

spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.iceberg.io.CloseableIterable;
3232
import org.apache.iceberg.orc.ORC;
3333
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
34+
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
3435
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3536
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
3637
import org.apache.iceberg.types.Types;
@@ -39,6 +40,7 @@
3940
import org.apache.orc.Writer;
4041
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
4142
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
43+
import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
4244
import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
4345
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
4446
import org.apache.spark.sql.catalyst.InternalRow;
@@ -215,4 +217,100 @@ public void testSingleComponentUnion() throws IOException {
215217
assertEquals(expectedSchema, expectedSecondRow, rowIterator.next());
216218
}
217219
}
220+
221+
@Test
222+
public void testDeeplyNestedUnion() throws IOException {
223+
TypeDescription orcSchema =
224+
TypeDescription.fromString("struct<c1:uniontype<int,struct<c2:string,c3:uniontype<int,string>>>>");
225+
226+
Schema expectedSchema = new Schema(
227+
Types.NestedField.optional(0, "c1", Types.StructType.of(
228+
Types.NestedField.optional(1, "tag_0", Types.IntegerType.get()),
229+
Types.NestedField.optional(2, "tag_1",
230+
Types.StructType.of(Types.NestedField.optional(3, "c2", Types.StringType.get()),
231+
Types.NestedField.optional(4, "c3", Types.StructType.of(
232+
Types.NestedField.optional(5, "tag_0", Types.IntegerType.get()),
233+
Types.NestedField.optional(6, "tag_1", Types.StringType.get()))))))));
234+
235+
final InternalRow expectedFirstRow = new GenericInternalRow(1);
236+
final InternalRow inner1 = new GenericInternalRow(2);
237+
inner1.update(0, null);
238+
final InternalRow inner2 = new GenericInternalRow(2);
239+
inner2.update(0, UTF8String.fromString("foo0"));
240+
final InternalRow inner3 = new GenericInternalRow(2);
241+
inner3.update(0, 0);
242+
inner3.update(1, null);
243+
inner2.update(1, inner3);
244+
inner1.update(1, inner2);
245+
expectedFirstRow.update(0, inner1);
246+
247+
Configuration conf = new Configuration();
248+
249+
File orcFile = temp.newFile();
250+
Path orcFilePath = new Path(orcFile.getPath());
251+
252+
Writer writer = OrcFile.createWriter(orcFilePath,
253+
OrcFile.writerOptions(conf)
254+
.setSchema(orcSchema).overwrite(true));
255+
256+
VectorizedRowBatch batch = orcSchema.createRowBatch();
257+
UnionColumnVector innerUnion1 = (UnionColumnVector) batch.cols[0];
258+
LongColumnVector innerInt1 = (LongColumnVector) innerUnion1.fields[0];
259+
innerInt1.fillWithNulls();
260+
StructColumnVector innerStruct2 = (StructColumnVector) innerUnion1.fields[1];
261+
BytesColumnVector innerString2 = (BytesColumnVector) innerStruct2.fields[0];
262+
UnionColumnVector innerUnion3 = (UnionColumnVector) innerStruct2.fields[1];
263+
LongColumnVector innerInt3 = (LongColumnVector) innerUnion3.fields[0];
264+
BytesColumnVector innerString3 = (BytesColumnVector) innerUnion3.fields[1];
265+
innerString3.fillWithNulls();
266+
267+
for (int r = 0; r < NUM_OF_ROWS; ++r) {
268+
int row = batch.size++;
269+
innerUnion1.tags[row] = 1;
270+
innerString2.setVal(row, ("foo" + row).getBytes(StandardCharsets.UTF_8));
271+
innerUnion3.tags[row] = 0;
272+
innerInt3.vector[row] = r;
273+
// If the batch is full, write it out and start over.
274+
if (batch.size == batch.getMaxSize()) {
275+
writer.addRowBatch(batch);
276+
batch.reset();
277+
innerInt1.fillWithNulls();
278+
innerString3.fillWithNulls();
279+
}
280+
}
281+
if (batch.size != 0) {
282+
writer.addRowBatch(batch);
283+
batch.reset();
284+
}
285+
writer.close();
286+
287+
// test non-vectorized reader
288+
List<InternalRow> results = Lists.newArrayList();
289+
try (CloseableIterable<InternalRow> reader = ORC.read(Files.localInput(orcFile))
290+
.project(expectedSchema)
291+
.createReaderFunc(readOrcSchema -> new SparkOrcReader(expectedSchema, readOrcSchema))
292+
.build()) {
293+
reader.forEach(results::add);
294+
final InternalRow actualFirstRow = results.get(0);
295+
296+
Assert.assertEquals(results.size(), NUM_OF_ROWS);
297+
assertEquals(expectedSchema, expectedFirstRow, actualFirstRow);
298+
}
299+
300+
// test vectorized reader
301+
try (CloseableIterable<ColumnarBatch> reader = ORC.read(Files.localInput(orcFile))
302+
.project(expectedSchema)
303+
.createBatchedReaderFunc(readOrcSchema ->
304+
VectorizedSparkOrcReaders.buildReader(expectedSchema, readOrcSchema, ImmutableMap.of()))
305+
.build()) {
306+
final Iterator<InternalRow> actualRows = batchesToRows(reader.iterator());
307+
final InternalRow actualFirstRow = actualRows.next();
308+
309+
assertEquals(expectedSchema, expectedFirstRow, actualFirstRow);
310+
}
311+
}
312+
313+
private Iterator<InternalRow> batchesToRows(Iterator<ColumnarBatch> batches) {
314+
return Iterators.concat(Iterators.transform(batches, ColumnarBatch::rowIterator));
315+
}
218316
}

0 commit comments

Comments
 (0)