Skip to content

Commit 03b779d

Browse files
shardulm94rdsr
authored andcommitted
ORC: Support reading ORC files without Iceberg IDs (#16)
1 parent 4c2436b commit 03b779d

File tree

2 files changed

+228
-1
lines changed

2 files changed

+228
-1
lines changed
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.data.orc;
21+
22+
import java.io.Closeable;
23+
import java.io.File;
24+
import java.io.IOException;
25+
import java.util.List;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
import org.apache.hadoop.conf.Configuration;
28+
import org.apache.hadoop.fs.Path;
29+
import org.apache.iceberg.Files;
30+
import org.apache.iceberg.Schema;
31+
import org.apache.iceberg.data.DataTestHelpers;
32+
import org.apache.iceberg.data.RandomGenericData;
33+
import org.apache.iceberg.data.Record;
34+
import org.apache.iceberg.io.CloseableIterable;
35+
import org.apache.iceberg.orc.ORC;
36+
import org.apache.iceberg.orc.ORCSchemaUtil;
37+
import org.apache.iceberg.orc.OrcRowWriter;
38+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
39+
import org.apache.iceberg.types.TypeUtil;
40+
import org.apache.iceberg.types.Types;
41+
import org.apache.orc.OrcFile;
42+
import org.apache.orc.TypeDescription;
43+
import org.apache.orc.Writer;
44+
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
45+
import org.junit.Assert;
46+
import org.junit.Rule;
47+
import org.junit.Test;
48+
import org.junit.rules.TemporaryFolder;
49+
50+
import static org.apache.iceberg.types.Types.NestedField.optional;
51+
import static org.apache.iceberg.types.Types.NestedField.required;
52+
53+
54+
public class TestReadOrcFileWithoutIDs {
55+
56+
private static final Types.StructType SUPPORTED_PRIMITIVES = Types.StructType.of(
57+
required(100, "id", Types.LongType.get()),
58+
optional(101, "data", Types.StringType.get()),
59+
required(102, "b", Types.BooleanType.get()),
60+
optional(103, "i", Types.IntegerType.get()),
61+
required(104, "l", Types.LongType.get()),
62+
optional(105, "f", Types.FloatType.get()),
63+
required(106, "d", Types.DoubleType.get()),
64+
optional(107, "date", Types.DateType.get()),
65+
required(108, "tsTz", Types.TimestampType.withZone()),
66+
required(109, "ts", Types.TimestampType.withoutZone()),
67+
required(110, "s", Types.StringType.get()),
68+
optional(113, "bytes", Types.BinaryType.get()),
69+
required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
70+
required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
71+
required(116, "dec_38_10", Types.DecimalType.of(38, 10)) // maximum precision
72+
// Disabled some primitives because they cannot work without Iceberg's type attributes and hence won't be present
73+
// in old data anyway
74+
// required(112, "fixed", Types.FixedType.ofLength(7))
75+
// required(117, "time", Types.TimeType.get())
76+
);
77+
78+
@Rule
79+
public TemporaryFolder temp = new TemporaryFolder();
80+
81+
@Test
82+
public void writeAndValidateORCFileWithoutIds() throws IOException {
83+
Types.StructType structType = Types.StructType.of(
84+
required(0, "id", Types.LongType.get()),
85+
optional(1, "list_of_maps",
86+
Types.ListType.ofOptional(2, Types.MapType.ofOptional(3, 4,
87+
Types.StringType.get(),
88+
SUPPORTED_PRIMITIVES))),
89+
optional(5, "map_of_lists",
90+
Types.MapType.ofOptional(6, 7,
91+
Types.StringType.get(),
92+
Types.ListType.ofOptional(8, SUPPORTED_PRIMITIVES))),
93+
required(9, "list_of_lists",
94+
Types.ListType.ofOptional(10, Types.ListType.ofOptional(11, SUPPORTED_PRIMITIVES))),
95+
required(12, "map_of_maps",
96+
Types.MapType.ofOptional(13, 14,
97+
Types.StringType.get(),
98+
Types.MapType.ofOptional(15, 16,
99+
Types.StringType.get(),
100+
SUPPORTED_PRIMITIVES))),
101+
required(17, "list_of_struct_of_nested_types", Types.ListType.ofOptional(19, Types.StructType.of(
102+
Types.NestedField.required(20, "m1", Types.MapType.ofOptional(21, 22,
103+
Types.StringType.get(),
104+
SUPPORTED_PRIMITIVES)),
105+
Types.NestedField.optional(23, "l1", Types.ListType.ofRequired(24, SUPPORTED_PRIMITIVES)),
106+
Types.NestedField.required(25, "l2", Types.ListType.ofRequired(26, SUPPORTED_PRIMITIVES)),
107+
Types.NestedField.optional(27, "m2", Types.MapType.ofOptional(28, 29,
108+
Types.StringType.get(),
109+
SUPPORTED_PRIMITIVES))
110+
)))
111+
);
112+
113+
Schema schema = new Schema(TypeUtil.assignFreshIds(structType, new AtomicInteger(0)::incrementAndGet)
114+
.asStructType().fields());
115+
116+
File testFile = temp.newFile();
117+
Assert.assertTrue("Delete should succeed", testFile.delete());
118+
119+
List<Record> expected = RandomGenericData.generate(schema, 100, 0L);
120+
121+
try (OrcWriter writer = new OrcWriter(schema, testFile)) {
122+
for (Record record : expected) {
123+
writer.write(record);
124+
}
125+
}
126+
127+
Assert.assertEquals("Ensure written file does not have IDs in the file schema", 0,
128+
clearAttributes(orcFileSchema(testFile)));
129+
130+
List<Record> rows;
131+
try (CloseableIterable<Record> reader = ORC.read(Files.localInput(testFile))
132+
.project(schema)
133+
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema))
134+
.build()) {
135+
rows = Lists.newArrayList(reader);
136+
}
137+
138+
for (int i = 0; i < expected.size(); i += 1) {
139+
DataTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i));
140+
}
141+
}
142+
143+
private static TypeDescription orcFileSchema(File file) throws IOException {
144+
return OrcFile.createReader(new Path(file.getPath()), OrcFile.readerOptions(new Configuration())).getSchema();
145+
}
146+
147+
/**
148+
* Remove attributes from a given {@link TypeDescription}
149+
* @param schema the {@link TypeDescription} to remove attributes from
150+
* @return number of attributes removed
151+
*/
152+
public static int clearAttributes(TypeDescription schema) {
153+
int result = 0;
154+
for (String attribute : schema.getAttributeNames()) {
155+
schema.removeAttribute(attribute);
156+
result += 1;
157+
}
158+
List<TypeDescription> children = schema.getChildren();
159+
if (children != null) {
160+
for (TypeDescription child : children) {
161+
result += clearAttributes(child);
162+
}
163+
}
164+
return result;
165+
}
166+
167+
private static class OrcWriter implements Closeable {
168+
169+
private final VectorizedRowBatch batch;
170+
private final Writer writer;
171+
private final OrcRowWriter<Record> valueWriter;
172+
private final File outputFile;
173+
private boolean isClosed = false;
174+
175+
private OrcWriter(Schema schema, File file) {
176+
TypeDescription orcSchema = ORCSchemaUtil.convert(schema);
177+
// clear attributes before writing schema to file so that file schema does not have IDs
178+
TypeDescription orcSchemaWithoutAttributes = orcSchema.clone();
179+
clearAttributes(orcSchemaWithoutAttributes);
180+
181+
this.outputFile = file;
182+
this.batch = orcSchemaWithoutAttributes.createRowBatch(VectorizedRowBatch.DEFAULT_SIZE);
183+
OrcFile.WriterOptions options = OrcFile.writerOptions(new Configuration()).useUTCTimestamp(true);
184+
options.setSchema(orcSchemaWithoutAttributes);
185+
186+
final Path locPath = new Path(file.getPath());
187+
try {
188+
this.writer = OrcFile.createWriter(locPath, options);
189+
} catch (IOException e) {
190+
throw new RuntimeException("Can't create file " + locPath, e);
191+
}
192+
this.valueWriter = GenericOrcWriter.buildWriter(schema, orcSchema);
193+
}
194+
195+
void write(Record record) {
196+
try {
197+
valueWriter.write(record, batch);
198+
if (batch.size == VectorizedRowBatch.DEFAULT_SIZE) {
199+
writer.addRowBatch(batch);
200+
batch.reset();
201+
}
202+
} catch (IOException e) {
203+
throw new RuntimeException("Problem writing to ORC file " + outputFile.getPath(), e);
204+
}
205+
}
206+
207+
@Override
208+
public void close() throws IOException {
209+
if (!isClosed) {
210+
try {
211+
if (batch.size > 0) {
212+
writer.addRowBatch(batch);
213+
batch.reset();
214+
}
215+
} finally {
216+
writer.close();
217+
this.isClosed = true;
218+
}
219+
}
220+
}
221+
}
222+
}

orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,12 @@ public static Schema convert(TypeDescription orcSchema) {
249249
*/
250250
public static TypeDescription buildOrcProjection(Schema schema,
251251
TypeDescription originalOrcSchema) {
252-
final Map<Integer, OrcField> icebergToOrc = icebergToOrcMapping("root", originalOrcSchema);
252+
Map<Integer, OrcField> icebergToOrc = icebergToOrcMapping("root", originalOrcSchema);
253+
if (icebergToOrc.isEmpty()) {
254+
// if no field ids are present in original schema then build mapping from expected schema
255+
// this should ideally be handled at a higher layer with NameMapping
256+
icebergToOrc = icebergToOrcMapping("root", convert(schema));
257+
}
253258
return buildOrcProjection(Integer.MIN_VALUE, schema.asStruct(), true, icebergToOrc);
254259
}
255260

0 commit comments

Comments
 (0)