Skip to content

Commit a7584bc

Browse files
funcheetahWenye Zhang
andauthored
Handle non-nullable union of single type for Avro (#98)
* Handle non-nullable union of single type Co-authored-by: Wenye Zhang <wyzhang@wyzhang-mn1.linkedin.biz>
1 parent 4319f37 commit a7584bc

File tree

6 files changed

+198
-2
lines changed

6 files changed

+198
-2
lines changed

core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,16 @@ public static boolean isOptionSchema(Schema schema) {
138138
return false;
139139
}
140140

141+
/**
142+
* This method decides whether a schema represents a single type union, i.e., a union that contains only one option
143+
*
144+
* @param schema input schema
145+
* @return true if schema is single type union
146+
*/
147+
public static boolean isSingleTypeUnion(Schema schema) {
148+
return schema.getType() == UNION && schema.getTypes().size() == 1;
149+
}
150+
141151
/**
142152
* This method decides whether a schema is of type union and is complex union and is optional
143153
*

core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,13 @@ private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisit
8989
options.add(visit(type, branch, visitor));
9090
}
9191
}
92+
} else if (AvroSchemaUtil.isSingleTypeUnion(union)) { // single type union case
93+
Schema branch = types.get(0);
94+
if (branch.getType() == Schema.Type.NULL) {
95+
options.add(visit((Type) null, branch, visitor));
96+
} else {
97+
options.add(visit(type, branch, visitor));
98+
}
9299
} else { // complex union case
93100
int index = 1;
94101
for (Schema branch : types) {

core/src/main/java/org/apache/iceberg/avro/SchemaToType.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@ public Type union(Schema union, List<Type> options) {
116116
} else {
117117
return options.get(0);
118118
}
119+
} else if (AvroSchemaUtil.isSingleTypeUnion(union)) {
120+
// Single type union
121+
return options.get(0);
119122
} else {
120123
// Complex union
121124
List<Types.NestedField> newFields = new ArrayList<>();

core/src/test/java/org/apache/iceberg/avro/TestUnionSchemaConversions.java

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public void testOptionalComplexUnion() {
7474
}
7575

7676
@Test
77-
public void testSimpleUnionSchema() {
77+
public void testOptionalSingleUnionSchema() {
7878
Schema avroSchema = SchemaBuilder.record("root")
7979
.fields()
8080
.name("optionCol")
@@ -92,4 +92,62 @@ public void testSimpleUnionSchema() {
9292

9393
Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString());
9494
}
95+
96+
@Test
97+
public void testSingleTypeUnionSchema() {
98+
Schema avroSchema = SchemaBuilder.record("root")
99+
.fields()
100+
.name("unionCol")
101+
.type()
102+
.unionOf()
103+
.intType()
104+
.endUnion()
105+
.noDefault()
106+
.endRecord();
107+
108+
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
109+
String expectedIcebergSchema = "table {\n" + " 0: unionCol: required int\n" + "}";
110+
111+
Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString());
112+
}
113+
114+
@Test
115+
public void testNestedSingleTypeUnionSchema() {
116+
Schema avroSchema = SchemaBuilder.record("root")
117+
.fields()
118+
.name("col1")
119+
.type()
120+
.array()
121+
.items()
122+
.unionOf()
123+
.stringType()
124+
.endUnion()
125+
.noDefault()
126+
.endRecord();
127+
128+
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
129+
String expectedIcebergSchema = "table {\n" + " 0: col1: required list<string>\n" + "}";
130+
131+
Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString());
132+
}
133+
134+
@Test
135+
public void testSingleTypeUnionOfComplexTypeSchema() {
136+
Schema avroSchema = SchemaBuilder.record("root")
137+
.fields()
138+
.name("unionCol")
139+
.type()
140+
.unionOf()
141+
.array()
142+
.items()
143+
.intType()
144+
.endUnion()
145+
.noDefault()
146+
.endRecord();
147+
148+
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
149+
String expectedIcebergSchema = "table {\n" + " 0: unionCol: required list<int>\n" + "}";
150+
151+
Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString());
152+
}
95153
}

spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public ValueReader<?> record(Types.StructType expected, Schema record, List<Stri
8080

8181
@Override
8282
public ValueReader<?> union(Type expected, Schema union, List<ValueReader<?>> options) {
83-
if (AvroSchemaUtil.isOptionSchema(union)) {
83+
if (AvroSchemaUtil.isOptionSchema(union) || AvroSchemaUtil.isSingleTypeUnion(union)) {
8484
return ValueReaders.union(options);
8585
} else {
8686
return SparkValueReaders.union(union, options);

spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,124 @@ public void writeAndValidateOptionalComplexUnion() throws IOException {
141141

142142
@Test
143143
public void writeAndValidateSingleTypeUnion() throws IOException {
144+
org.apache.avro.Schema avroSchema = SchemaBuilder.record("root")
145+
.fields()
146+
.name("unionCol")
147+
.type()
148+
.unionOf()
149+
.intType()
150+
.endUnion()
151+
.noDefault()
152+
.endRecord();
153+
154+
GenericData.Record unionRecord1 = new GenericData.Record(avroSchema);
155+
unionRecord1.put("unionCol", 0);
156+
GenericData.Record unionRecord2 = new GenericData.Record(avroSchema);
157+
unionRecord2.put("unionCol", 1);
158+
159+
File testFile = temp.newFile();
160+
try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
161+
writer.create(avroSchema, testFile);
162+
writer.append(unionRecord1);
163+
writer.append(unionRecord2);
164+
}
165+
166+
Schema expectedSchema = AvroSchemaUtil.toIceberg(avroSchema);
167+
168+
List<InternalRow> rows;
169+
try (AvroIterable<InternalRow> reader = Avro.read(Files.localInput(testFile))
170+
.createReaderFunc(SparkAvroReader::new)
171+
.project(expectedSchema)
172+
.build()) {
173+
rows = Lists.newArrayList(reader);
174+
175+
Assert.assertEquals(0, rows.get(0).getInt(0));
176+
Assert.assertEquals(1, rows.get(1).getInt(0));
177+
}
178+
}
179+
180+
@Test
181+
public void writeAndValidateNestedSingleTypeUnion() throws IOException {
182+
org.apache.avro.Schema avroSchema = SchemaBuilder.record("root")
183+
.fields()
184+
.name("col1")
185+
.type()
186+
.array()
187+
.items()
188+
.unionOf()
189+
.stringType()
190+
.endUnion()
191+
.noDefault()
192+
.endRecord();
193+
194+
GenericData.Record unionRecord1 = new GenericData.Record(avroSchema);
195+
unionRecord1.put("col1", Arrays.asList("foo"));
196+
GenericData.Record unionRecord2 = new GenericData.Record(avroSchema);
197+
unionRecord2.put("col1", Arrays.asList("bar"));
198+
199+
File testFile = temp.newFile();
200+
try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
201+
writer.create(avroSchema, testFile);
202+
writer.append(unionRecord1);
203+
writer.append(unionRecord2);
204+
}
205+
206+
Schema expectedSchema = AvroSchemaUtil.toIceberg(avroSchema);
207+
208+
List<InternalRow> rows;
209+
try (AvroIterable<InternalRow> reader = Avro.read(Files.localInput(testFile))
210+
.createReaderFunc(SparkAvroReader::new)
211+
.project(expectedSchema)
212+
.build()) {
213+
rows = Lists.newArrayList(reader);
214+
215+
Assert.assertEquals("foo", rows.get(0).getArray(0).getUTF8String(0).toString());
216+
Assert.assertEquals("bar", rows.get(1).getArray(0).getUTF8String(0).toString());
217+
}
218+
}
219+
220+
@Test
221+
public void writeAndValidateSingleTypeUnionOfComplexType() throws IOException {
222+
org.apache.avro.Schema avroSchema = SchemaBuilder.record("root")
223+
.fields()
224+
.name("unionCol")
225+
.type()
226+
.unionOf()
227+
.array()
228+
.items()
229+
.intType()
230+
.endUnion()
231+
.noDefault()
232+
.endRecord();
233+
234+
GenericData.Record unionRecord1 = new GenericData.Record(avroSchema);
235+
unionRecord1.put("unionCol", Arrays.asList(1));
236+
GenericData.Record unionRecord2 = new GenericData.Record(avroSchema);
237+
unionRecord2.put("unionCol", Arrays.asList(2));
238+
239+
File testFile = temp.newFile();
240+
try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
241+
writer.create(avroSchema, testFile);
242+
writer.append(unionRecord1);
243+
writer.append(unionRecord2);
244+
}
245+
246+
Schema expectedSchema = AvroSchemaUtil.toIceberg(avroSchema);
247+
248+
List<InternalRow> rows;
249+
try (AvroIterable<InternalRow> reader = Avro.read(Files.localInput(testFile))
250+
.createReaderFunc(SparkAvroReader::new)
251+
.project(expectedSchema)
252+
.build()) {
253+
rows = Lists.newArrayList(reader);
254+
255+
Assert.assertEquals(1, rows.get(0).getArray(0).getInt(0));
256+
Assert.assertEquals(2, rows.get(1).getArray(0).getInt(0));
257+
}
258+
}
259+
260+
@Test
261+
public void writeAndValidateOptionalSingleUnion() throws IOException {
144262
org.apache.avro.Schema avroSchema = SchemaBuilder.record("root")
145263
.fields()
146264
.name("unionCol")

0 commit comments

Comments
 (0)