Skip to content

Commit 34f0d67

Browse files
authored
Add variant type support to ParquetTypeVisitor (#14588) (#14624)
* Parquet: Add variant type support to ParquetTypeVisitor Implement variant(GroupType) method in ParquetTypeVisitor and all subclasses to enable proper handling of Parquet variant logical types during schema conversion and manipulation operations. * Address review comments - Replace variant spec version with constant - Add variant tests * Column id reordering * Address review comments
1 parent c6ec670 commit 34f0d67

File tree

6 files changed

+126
-0
lines changed

6 files changed

+126
-0
lines changed

parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ public Type primitive(PrimitiveType primitive) {
103103
return field == null ? primitive : primitive.withId(field.id());
104104
}
105105

106+
@Override
107+
public Type variant(GroupType variant) {
108+
MappedField field = nameMapping.find(currentPath());
109+
return field == null ? variant : variant.withId(field.id());
110+
}
111+
106112
@Override
107113
public void beforeField(Type type) {
108114
fieldNames.push(type.getName());

parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,11 @@ public Type primitive(PrimitiveType primitive) {
179179
throw new UnsupportedOperationException("Cannot convert unknown primitive type: " + primitive);
180180
}
181181

182+
@Override
183+
public Type variant(GroupType variant) {
184+
return Types.VariantType.get();
185+
}
186+
182187
private static class ParquetLogicalTypeVisitor
183188
implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Type> {
184189
private static final ParquetLogicalTypeVisitor INSTANCE = new ParquetLogicalTypeVisitor();

parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,11 @@ public Boolean map(GroupType map, Boolean keyHasId, Boolean valueHasId) {
217217
public Boolean primitive(PrimitiveType primitive) {
218218
return primitive.getId() != null;
219219
}
220+
221+
@Override
222+
public Boolean variant(GroupType variant) {
223+
return variant.getId() != null;
224+
}
220225
}
221226

222227
public static Type determineListElementType(GroupType array) {

parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeVisitor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2424
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
25+
import org.apache.iceberg.variants.Variant;
2526
import org.apache.parquet.schema.GroupType;
2627
import org.apache.parquet.schema.LogicalTypeAnnotation;
2728
import org.apache.parquet.schema.MessageType;
@@ -46,6 +47,9 @@ public static <T> T visit(Type type, ParquetTypeVisitor<T> visitor) {
4647
return visitList(group, visitor);
4748
} else if (LogicalTypeAnnotation.mapType().equals(annotation)) {
4849
return visitMap(group, visitor);
50+
} else if (LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION)
51+
.equals(annotation)) {
52+
return visitVariant(group, visitor);
4953
}
5054

5155
return visitor.struct(group, visitFields(group, visitor));
@@ -168,6 +172,10 @@ private static <T> T visitMap(GroupType map, ParquetTypeVisitor<T> visitor) {
168172
}
169173
}
170174

175+
private static <T> T visitVariant(GroupType variant, ParquetTypeVisitor<T> visitor) {
176+
return visitor.variant(variant);
177+
}
178+
171179
private static <T> List<T> visitFields(GroupType group, ParquetTypeVisitor<T> visitor) {
172180
List<T> results = Lists.newArrayListWithExpectedSize(group.getFieldCount());
173181
for (Type field : group.getFields()) {
@@ -202,6 +210,10 @@ public T primitive(PrimitiveType primitive) {
202210
return null;
203211
}
204212

213+
public T variant(GroupType variant) {
214+
return null;
215+
}
216+
205217
public void beforeField(Type type) {
206218
fieldNames.push(type.getName());
207219
}

parquet/src/main/java/org/apache/iceberg/parquet/RemoveIds.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,16 @@ public Type primitive(PrimitiveType primitive) {
7777
.named(primitive.getName());
7878
}
7979

80+
@Override
81+
public Type variant(GroupType variant) {
82+
Types.GroupBuilder<GroupType> builder =
83+
Types.buildGroup(variant.getRepetition()).as(variant.getLogicalTypeAnnotation());
84+
for (Type field : variant.getFields()) {
85+
builder.addField(field);
86+
}
87+
return builder.named(variant.getName());
88+
}
89+
8090
public static MessageType removeIds(MessageType type) {
8191
return (MessageType) ParquetTypeVisitor.visit(type, new RemoveIds());
8292
}

parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import org.apache.iceberg.mapping.NameMapping;
2929
import org.apache.iceberg.types.TypeUtil;
3030
import org.apache.iceberg.types.Types;
31+
import org.apache.iceberg.variants.Variant;
3132
import org.apache.parquet.schema.GroupType;
33+
import org.apache.parquet.schema.LogicalTypeAnnotation;
3234
import org.apache.parquet.schema.MessageType;
3335
import org.apache.parquet.schema.MessageTypeParser;
3436
import org.apache.parquet.schema.PrimitiveType;
@@ -127,6 +129,37 @@ public void testAssignIdsByNameMapping() {
127129
assertThat(messageTypeWithIdsFromNameMapping).isEqualTo(messageTypeWithIds);
128130
}
129131

132+
@Test
133+
public void testAssignIdsToVariantTypesByNameMapping() {
134+
Types.StructType structType =
135+
Types.StructType.of(
136+
optional(30, "variant_col", Types.VariantType.get()),
137+
optional(
138+
31,
139+
"struct_with_variant",
140+
Types.StructType.of(
141+
Types.NestedField.required(32, "id", Types.IntegerType.get()),
142+
Types.NestedField.optional(33, "data", Types.VariantType.get()))),
143+
required(
144+
34, "list_of_variants", Types.ListType.ofOptional(35, Types.VariantType.get())),
145+
required(
146+
36,
147+
"map_with_variant_value",
148+
Types.MapType.ofOptional(37, 38, Types.StringType.get(), Types.VariantType.get())));
149+
150+
Schema schema =
151+
new Schema(
152+
TypeUtil.assignFreshIds(structType, new AtomicInteger(0)::incrementAndGet)
153+
.asStructType()
154+
.fields());
155+
NameMapping nameMapping = MappingUtil.create(schema);
156+
MessageType messageTypeWithIds = ParquetSchemaUtil.convert(schema, "parquet_type");
157+
MessageType messageTypeWithIdsFromNameMapping =
158+
ParquetSchemaUtil.applyNameMapping(RemoveIds.removeIds(messageTypeWithIds), nameMapping);
159+
160+
assertThat(messageTypeWithIdsFromNameMapping).isEqualTo(messageTypeWithIds);
161+
}
162+
130163
@Test
131164
public void testSchemaConversionWithoutAssigningIds() {
132165
MessageType messageType =
@@ -264,6 +297,47 @@ public void testSchemaConversionWithoutAssigningIds() {
264297
.isEqualTo(expectedSchema.asStruct());
265298
}
266299

300+
@Test
301+
public void testVariantTypesWithoutAssigningIds() {
302+
MessageType messageType =
303+
new MessageType(
304+
"test",
305+
variant(30, "variant_col_1", Repetition.OPTIONAL),
306+
variant(null, "variant_col_2", Repetition.REQUIRED),
307+
struct(
308+
31,
309+
"struct_col_3",
310+
Repetition.REQUIRED,
311+
primitive(32, "n1", PrimitiveTypeName.INT32, Repetition.REQUIRED),
312+
variant(null, "variant_field", Repetition.OPTIONAL)),
313+
list(33, "list_col_6", Repetition.OPTIONAL, variant(34, "v", Repetition.OPTIONAL)),
314+
map(
315+
35,
316+
"map_col_6",
317+
Repetition.REQUIRED,
318+
primitive(36, "k", PrimitiveTypeName.INT32, Repetition.REQUIRED),
319+
variant(37, "v", Repetition.OPTIONAL)));
320+
321+
Schema expectedSchema =
322+
new Schema(
323+
optional(30, "variant_col_1", Types.VariantType.get()),
324+
required(
325+
31,
326+
"struct_col_3",
327+
Types.StructType.of(required(32, "n1", Types.IntegerType.get()))),
328+
optional(33, "list_col_6", Types.ListType.ofOptional(34, Types.VariantType.get())),
329+
required(
330+
35,
331+
"map_col_6",
332+
Types.MapType.ofOptional(
333+
36, 37, Types.IntegerType.get(), Types.VariantType.get())));
334+
335+
Schema actualSchema = ParquetSchemaUtil.convertAndPrune(messageType);
336+
assertThat(actualSchema.asStruct())
337+
.as("Schema must match")
338+
.isEqualTo(expectedSchema.asStruct());
339+
}
340+
267341
@Test
268342
public void testSchemaConversionForHiveStyleLists() {
269343
String parquetSchemaString =
@@ -464,4 +538,18 @@ private Type map(Integer id, String name, Repetition repetition, Type keyType, T
464538
}
465539
return builder.named(name);
466540
}
541+
542+
private Type variant(Integer id, String name, Repetition repetition) {
543+
GroupBuilder<GroupType> builder =
544+
org.apache.parquet.schema.Types.buildGroup(repetition)
545+
.as(LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION))
546+
.primitive(PrimitiveTypeName.BINARY, Repetition.REQUIRED)
547+
.named("metadata")
548+
.primitive(PrimitiveTypeName.BINARY, Repetition.REQUIRED)
549+
.named("value");
550+
if (id != null) {
551+
builder.id(id);
552+
}
553+
return builder.named(name);
554+
}
467555
}

0 commit comments

Comments
 (0)