Skip to content

Commit fee4aa3

Browse files
authored
Handle non-nullable union of single type for ORC spark non-vectorized reader (#104)
* Handle single type union for non-vectorized reader
1 parent ab49e9b commit fee4aa3

File tree

5 files changed

+344
-43
lines changed

5 files changed

+344
-43
lines changed

orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java

Lines changed: 96 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -263,36 +263,36 @@ public static TypeDescription buildOrcProjection(Schema schema,
263263
private static TypeDescription buildOrcProjection(Integer fieldId, Type type, boolean isRequired,
264264
Map<Integer, OrcField> mapping) {
265265
final TypeDescription orcType;
266+
final OrcField orcField = mapping.getOrDefault(fieldId, null);
266267

267268
switch (type.typeId()) {
268269
case STRUCT:
269270
orcType = buildOrcProjectForStructType(fieldId, type, isRequired, mapping);
270271
break;
271272
case LIST:
272-
Types.ListType list = (Types.ListType) type;
273-
TypeDescription elementType = buildOrcProjection(list.elementId(), list.elementType(),
274-
isRequired && list.isElementRequired(), mapping);
275-
orcType = TypeDescription.createList(elementType);
273+
orcType = buildOrcProjectionForListType((Types.ListType) type, isRequired, mapping, orcField);
276274
break;
277275
case MAP:
278-
Types.MapType map = (Types.MapType) type;
279-
TypeDescription keyType = buildOrcProjection(map.keyId(), map.keyType(), isRequired, mapping);
280-
TypeDescription valueType = buildOrcProjection(map.valueId(), map.valueType(),
281-
isRequired && map.isValueRequired(), mapping);
282-
orcType = TypeDescription.createMap(keyType, valueType);
276+
orcType = buildOrcProjectionForMapType((Types.MapType) type, isRequired, mapping, orcField);
283277
break;
284278
default:
285279
if (mapping.containsKey(fieldId)) {
286280
TypeDescription originalType = mapping.get(fieldId).type();
287-
Optional<TypeDescription> promotedType = getPromotedType(type, originalType);
288-
289-
if (promotedType.isPresent()) {
290-
orcType = promotedType.get();
291-
} else {
292-
Preconditions.checkArgument(isSameType(originalType, type),
293-
"Can not promote %s type to %s",
294-
originalType.getCategory(), type.typeId().name());
281+
if (originalType != null && originalType.getCategory().equals(TypeDescription.Category.UNION)) {
282+
Preconditions.checkState(originalType.getChildren().size() == 1,
283+
"Expect single type union for orc schema.");
295284
orcType = originalType.clone();
285+
} else {
286+
Optional<TypeDescription> promotedType = getPromotedType(type, originalType);
287+
288+
if (promotedType.isPresent()) {
289+
orcType = promotedType.get();
290+
} else {
291+
Preconditions.checkArgument(isSameType(originalType, type),
292+
"Can not promote %s type to %s",
293+
originalType.getCategory(), type.typeId().name());
294+
orcType = originalType.clone();
295+
}
296296
}
297297
} else {
298298
if (isRequired) {
@@ -307,19 +307,58 @@ private static TypeDescription buildOrcProjection(Integer fieldId, Type type, bo
307307
return orcType;
308308
}
309309

310+
private static TypeDescription buildOrcProjectionForMapType(Types.MapType type, boolean isRequired,
311+
Map<Integer, OrcField> mapping, OrcField orcField) {
312+
final TypeDescription orcType;
313+
if (orcField != null && orcField.type.getCategory().equals(TypeDescription.Category.UNION)) {
314+
Preconditions.checkState(orcField.type.getChildren().size() == 1,
315+
"Expect single type union for orc schema.");
316+
317+
orcType = TypeDescription.createUnion();
318+
Types.MapType map = type;
319+
TypeDescription keyType = buildOrcProjection(map.keyId(), map.keyType(), isRequired, mapping);
320+
TypeDescription valueType = buildOrcProjection(map.valueId(), map.valueType(),
321+
isRequired && map.isValueRequired(), mapping);
322+
orcType.addUnionChild(TypeDescription.createMap(keyType, valueType));
323+
} else {
324+
Types.MapType map = type;
325+
TypeDescription keyType = buildOrcProjection(map.keyId(), map.keyType(), isRequired, mapping);
326+
TypeDescription valueType = buildOrcProjection(map.valueId(), map.valueType(),
327+
isRequired && map.isValueRequired(), mapping);
328+
orcType = TypeDescription.createMap(keyType, valueType);
329+
}
330+
return orcType;
331+
}
332+
333+
private static TypeDescription buildOrcProjectionForListType(Types.ListType type, boolean isRequired,
334+
Map<Integer, OrcField> mapping, OrcField orcField) {
335+
final TypeDescription orcType;
336+
if (orcField != null && orcField.type.getCategory().equals(TypeDescription.Category.UNION)) {
337+
Preconditions.checkState(orcField.type.getChildren().size() == 1,
338+
"Expect single type union for orc schema.");
339+
340+
orcType = TypeDescription.createUnion();
341+
Types.ListType list = type;
342+
TypeDescription elementType = buildOrcProjection(list.elementId(), list.elementType(),
343+
isRequired && list.isElementRequired(), mapping);
344+
orcType.addUnionChild(TypeDescription.createList(elementType));
345+
} else {
346+
Types.ListType list = type;
347+
TypeDescription elementType = buildOrcProjection(list.elementId(), list.elementType(),
348+
isRequired && list.isElementRequired(), mapping);
349+
orcType = TypeDescription.createList(elementType);
350+
}
351+
return orcType;
352+
}
353+
310354
private static TypeDescription buildOrcProjectForStructType(Integer fieldId, Type type, boolean isRequired,
311355
Map<Integer, OrcField> mapping) {
312356
TypeDescription orcType;
313357
OrcField orcField = mapping.getOrDefault(fieldId, null);
314-
// this branch means the iceberg struct schema actually correspond to an underlying union
358+
315359
if (orcField != null && orcField.type.getCategory().equals(TypeDescription.Category.UNION)) {
316-
orcType = TypeDescription.createUnion();
317-
List<Types.NestedField> nestedFields = type.asStructType().fields();
318-
for (Types.NestedField nestedField : nestedFields.subList(1, nestedFields.size())) {
319-
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
320-
isRequired && nestedField.isRequired(), mapping);
321-
orcType.addUnionChild(childType);
322-
}
360+
// this branch means the iceberg struct schema actually correspond to an underlying union
361+
orcType = getOrcSchemaForUnionType(type, isRequired, mapping, orcField);
323362
} else {
324363
orcType = TypeDescription.createStruct();
325364
for (Types.NestedField nestedField : type.asStructType().fields()) {
@@ -340,6 +379,38 @@ private static TypeDescription buildOrcProjectForStructType(Integer fieldId, Typ
340379
return orcType;
341380
}
342381

382+
private static TypeDescription getOrcSchemaForUnionType(Type type, boolean isRequired, Map<Integer, OrcField> mapping,
383+
OrcField orcField) {
384+
TypeDescription orcType;
385+
if (orcField.type.getChildren().size() == 1) { // single type union
386+
orcType = TypeDescription.createUnion();
387+
388+
TypeDescription childOrcStructType = TypeDescription.createStruct();
389+
for (Types.NestedField nestedField : type.asStructType().fields()) {
390+
if (mapping.get(nestedField.fieldId()) == null && nestedField.hasDefaultValue()) {
391+
continue;
392+
}
393+
String name = Optional.ofNullable(mapping.get(nestedField.fieldId()))
394+
.map(OrcField::name)
395+
.orElseGet(() -> nestedField.name());
396+
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
397+
isRequired && nestedField.isRequired(), mapping);
398+
childOrcStructType.addField(name, childType);
399+
}
400+
401+
orcType.addUnionChild(childOrcStructType);
402+
} else { // complex union
403+
orcType = TypeDescription.createUnion();
404+
List<Types.NestedField> nestedFields = type.asStructType().fields();
405+
for (Types.NestedField nestedField : nestedFields.subList(1, nestedFields.size())) {
406+
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
407+
isRequired && nestedField.isRequired(), mapping);
408+
orcType.addUnionChild(childType);
409+
}
410+
}
411+
return orcType;
412+
}
413+
343414
private static Map<Integer, OrcField> icebergToOrcMapping(String name, TypeDescription orcType) {
344415
Map<Integer, OrcField> icebergToOrc = Maps.newHashMap();
345416
switch (orcType.getCategory()) {

orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,20 @@ public static <T> T visit(TypeDescription schema, OrcSchemaVisitor<T> visitor) {
4949
case UNION:
5050
List<TypeDescription> types = schema.getChildren();
5151
List<T> options = Lists.newArrayListWithExpectedSize(types.size());
52-
for (int i = 0; i < types.size(); i++) {
53-
visitor.beforeUnionOption(types.get(i), i);
54-
try {
55-
options.add(visit(types.get(i), visitor));
56-
} finally {
57-
visitor.afterUnionOption(types.get(i), i);
52+
53+
if (types.size() == 1) {
54+
options.add(visit(types.get(0), visitor));
55+
} else {
56+
for (int i = 0; i < types.size(); i++) {
57+
visitor.beforeUnionOption(types.get(i), i);
58+
try {
59+
options.add(visit(types.get(i), visitor));
60+
} finally {
61+
visitor.afterUnionOption(types.get(i), i);
62+
}
5863
}
5964
}
65+
6066
return visitor.union(schema, options);
6167

6268
case LIST:

orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,12 @@ protected T visitUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisito
7575
List<TypeDescription> types = union.getChildren();
7676
List<T> options = Lists.newArrayListWithCapacity(types.size());
7777

78-
for (int i = 0; i < types.size(); i += 1) {
79-
options.add(visit(type.asStructType().fields().get(i + 1).type(), types.get(i), visitor));
78+
if (types.size() == 1) { // single type union
79+
options.add(visit(type, types.get(0), visitor));
80+
} else { // complex union
81+
for (int i = 0; i < types.size(); i += 1) {
82+
options.add(visit(type.asStructType().fields().get(i + 1).type(), types.get(i), visitor));
83+
}
8084
}
8185

8286
return visitor.union(type, union, options);

spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ protected void set(InternalRow struct, int pos, Object value) {
164164
}
165165
}
166166

167-
static class UnionReader implements OrcValueReader<InternalRow> {
167+
static class UnionReader implements OrcValueReader<Object> {
168168
private final OrcValueReader[] readers;
169169

170170
private UnionReader(List<OrcValueReader<?>> readers) {
@@ -175,20 +175,23 @@ private UnionReader(List<OrcValueReader<?>> readers) {
175175
}
176176

177177
@Override
178-
public InternalRow nonNullRead(ColumnVector vector, int row) {
179-
InternalRow struct = new GenericInternalRow(readers.length + 1);
178+
public Object nonNullRead(ColumnVector vector, int row) {
180179
UnionColumnVector unionColumnVector = (UnionColumnVector) vector;
181-
182180
int fieldIndex = unionColumnVector.tags[row];
183181
Object value = this.readers[fieldIndex].read(unionColumnVector.fields[fieldIndex], row);
184182

185-
for (int i = 0; i < readers.length; i += 1) {
186-
struct.setNullAt(i + 1);
183+
if (readers.length == 1) {
184+
return value;
185+
} else {
186+
InternalRow struct = new GenericInternalRow(readers.length + 1);
187+
for (int i = 0; i < readers.length; i += 1) {
188+
struct.setNullAt(i + 1);
189+
}
190+
struct.update(0, fieldIndex);
191+
struct.update(fieldIndex + 1, value);
192+
193+
return struct;
187194
}
188-
struct.update(0, fieldIndex);
189-
struct.update(fieldIndex + 1, value);
190-
191-
return struct;
192195
}
193196
}
194197

0 commit comments

Comments
 (0)