Skip to content

Commit fb1cc27

Browse files
JingsongLishardulm94
authored andcommitted
Data: Add GenericAppenderFactory and GenericAppenderHelper (#1340)
Cherry-picked GenericAppenderFactory from: Data: Add GenericAppenderFactory and GenericAppenderHelper (#1340)
1 parent f6fbeb6 commit fb1cc27

File tree

7 files changed

+222
-183
lines changed

7 files changed

+222
-183
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.data;
21+
22+
import java.io.IOException;
23+
import java.io.UncheckedIOException;
24+
import java.util.Map;
25+
import org.apache.iceberg.FileFormat;
26+
import org.apache.iceberg.MetricsConfig;
27+
import org.apache.iceberg.Schema;
28+
import org.apache.iceberg.avro.Avro;
29+
import org.apache.iceberg.data.avro.DataWriter;
30+
import org.apache.iceberg.data.orc.GenericOrcWriter;
31+
import org.apache.iceberg.data.parquet.GenericParquetWriter;
32+
import org.apache.iceberg.io.FileAppender;
33+
import org.apache.iceberg.io.FileAppenderFactory;
34+
import org.apache.iceberg.io.OutputFile;
35+
import org.apache.iceberg.orc.ORC;
36+
import org.apache.iceberg.parquet.Parquet;
37+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
38+
39+
/**
40+
* Factory to create a new {@link FileAppender} to write {@link Record}s.
41+
*/
42+
public class GenericAppenderFactory implements FileAppenderFactory<Record> {
43+
44+
private final Schema schema;
45+
private final Map<String, String> config = Maps.newHashMap();
46+
47+
public GenericAppenderFactory(Schema schema) {
48+
this.schema = schema;
49+
}
50+
51+
public GenericAppenderFactory set(String property, String value) {
52+
config.put(property, value);
53+
return this;
54+
}
55+
56+
public GenericAppenderFactory setAll(Map<String, String> properties) {
57+
config.putAll(properties);
58+
return this;
59+
}
60+
61+
@Override
62+
public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFormat) {
63+
MetricsConfig metricsConfig = MetricsConfig.fromProperties(config);
64+
try {
65+
switch (fileFormat) {
66+
case AVRO:
67+
return Avro.write(outputFile)
68+
.schema(schema)
69+
.createWriterFunc(DataWriter::create)
70+
.setAll(config)
71+
.overwrite()
72+
.build();
73+
74+
case PARQUET:
75+
return Parquet.write(outputFile)
76+
.schema(schema)
77+
.createWriterFunc(GenericParquetWriter::buildWriter)
78+
.setAll(config)
79+
.metricsConfig(metricsConfig)
80+
.overwrite()
81+
.build();
82+
83+
case ORC:
84+
return ORC.write(outputFile)
85+
.schema(schema)
86+
.createWriterFunc(GenericOrcWriter::buildWriter)
87+
.setAll(config)
88+
.overwrite()
89+
.build();
90+
91+
default:
92+
throw new UnsupportedOperationException("Cannot write format: " + fileFormat);
93+
}
94+
} catch (IOException e) {
95+
throw new UncheckedIOException(e);
96+
}
97+
}
98+
}

data/src/test/java/org/apache/iceberg/TestSplitScan.java

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,12 @@
2424
import java.util.List;
2525
import java.util.Locale;
2626
import org.apache.hadoop.conf.Configuration;
27-
import org.apache.iceberg.avro.Avro;
27+
import org.apache.iceberg.data.GenericAppenderFactory;
2828
import org.apache.iceberg.data.IcebergGenerics;
2929
import org.apache.iceberg.data.RandomGenericData;
3030
import org.apache.iceberg.data.Record;
31-
import org.apache.iceberg.data.avro.DataWriter;
32-
import org.apache.iceberg.data.parquet.GenericParquetWriter;
3331
import org.apache.iceberg.hadoop.HadoopTables;
3432
import org.apache.iceberg.io.FileAppender;
35-
import org.apache.iceberg.parquet.Parquet;
3633
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3734
import org.apache.iceberg.types.Types;
3835
import org.junit.Assert;
@@ -122,28 +119,10 @@ private File writeToFile(List<Record> records, FileFormat fileFormat) throws IOE
122119
File file = temp.newFile();
123120
Assert.assertTrue(file.delete());
124121

125-
switch (fileFormat) {
126-
case AVRO:
127-
try (FileAppender<Record> appender = Avro.write(Files.localOutput(file))
128-
.schema(SCHEMA)
129-
.createWriterFunc(DataWriter::create)
130-
.named(fileFormat.name())
131-
.build()) {
132-
appender.addAll(records);
133-
}
134-
break;
135-
case PARQUET:
136-
try (FileAppender<Record> appender = Parquet.write(Files.localOutput(file))
137-
.schema(SCHEMA)
138-
.createWriterFunc(GenericParquetWriter::buildWriter)
139-
.named(fileFormat.name())
140-
.set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(SPLIT_SIZE))
141-
.build()) {
142-
appender.addAll(records);
143-
}
144-
break;
145-
default:
146-
throw new UnsupportedOperationException("Cannot write format: " + fileFormat);
122+
GenericAppenderFactory factory = new GenericAppenderFactory(SCHEMA).set(
123+
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(SPLIT_SIZE));
124+
try (FileAppender<Record> appender = factory.newAppender(Files.localOutput(file), fileFormat)) {
125+
appender.addAll(records);
147126
}
148127
return file;
149128
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.data;
21+
22+
import java.io.File;
23+
import java.io.IOException;
24+
import java.util.List;
25+
import org.apache.iceberg.AppendFiles;
26+
import org.apache.iceberg.DataFile;
27+
import org.apache.iceberg.DataFiles;
28+
import org.apache.iceberg.FileFormat;
29+
import org.apache.iceberg.Files;
30+
import org.apache.iceberg.StructLike;
31+
import org.apache.iceberg.Table;
32+
import org.apache.iceberg.io.FileAppender;
33+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
34+
import org.junit.Assert;
35+
import org.junit.rules.TemporaryFolder;
36+
37+
/**
38+
* Helper for appending {@link DataFile} to a table or appending {@link Record}s to a table.
39+
*/
40+
public class GenericAppenderHelper {
41+
42+
private final Table table;
43+
private final FileFormat fileFormat;
44+
private final TemporaryFolder tmp;
45+
46+
public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp) {
47+
this.table = table;
48+
this.fileFormat = fileFormat;
49+
this.tmp = tmp;
50+
}
51+
52+
public void appendToTable(DataFile... dataFiles) {
53+
Preconditions.checkNotNull(table, "table not set");
54+
55+
AppendFiles append = table.newAppend();
56+
57+
for (DataFile dataFile : dataFiles) {
58+
append = append.appendFile(dataFile);
59+
}
60+
61+
append.commit();
62+
}
63+
64+
public void appendToTable(List<Record> records) throws IOException {
65+
appendToTable(null, records);
66+
}
67+
68+
public void appendToTable(StructLike partition, List<Record> records) throws IOException {
69+
appendToTable(writeFile(partition, records));
70+
}
71+
72+
public DataFile writeFile(StructLike partition, List<Record> records) throws IOException {
73+
Preconditions.checkNotNull(table, "table not set");
74+
File file = tmp.newFile();
75+
Assert.assertTrue(file.delete());
76+
return appendToLocalFile(table, file, fileFormat, partition, records);
77+
}
78+
79+
private static DataFile appendToLocalFile(
80+
Table table, File file, FileFormat format, StructLike partition, List<Record> records)
81+
throws IOException {
82+
FileAppender<Record> appender = new GenericAppenderFactory(table.schema()).newAppender(
83+
Files.localOutput(file), format);
84+
try (FileAppender<Record> fileAppender = appender) {
85+
fileAppender.addAll(records);
86+
}
87+
88+
return DataFiles.builder(table.spec())
89+
.withRecordCount(records.size())
90+
.withFileSizeInBytes(file.length())
91+
.withPath(file.toURI().toString())
92+
.withMetrics(appender.metrics())
93+
.withFormat(format)
94+
.withPartition(partition)
95+
.build();
96+
}
97+
}

data/src/test/java/org/apache/iceberg/data/TestLocalScan.java

Lines changed: 10 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,10 @@
3939
import org.apache.iceberg.Table;
4040
import org.apache.iceberg.TableProperties;
4141
import org.apache.iceberg.Tables;
42-
import org.apache.iceberg.avro.Avro;
43-
import org.apache.iceberg.data.avro.DataWriter;
44-
import org.apache.iceberg.data.orc.GenericOrcWriter;
45-
import org.apache.iceberg.data.parquet.GenericParquetWriter;
4642
import org.apache.iceberg.expressions.Expressions;
4743
import org.apache.iceberg.hadoop.HadoopInputFile;
4844
import org.apache.iceberg.hadoop.HadoopTables;
4945
import org.apache.iceberg.io.FileAppender;
50-
import org.apache.iceberg.orc.ORC;
51-
import org.apache.iceberg.parquet.Parquet;
5246
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
5347
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
5448
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -458,59 +452,17 @@ private DataFile writeFile(String location, String filename, Schema schema, List
458452
Path path = new Path(location, filename);
459453
FileFormat fileFormat = FileFormat.fromFileName(filename);
460454
Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename);
461-
switch (fileFormat) {
462-
case AVRO:
463-
FileAppender<Record> avroAppender = Avro.write(fromPath(path, CONF))
464-
.schema(schema)
465-
.createWriterFunc(DataWriter::create)
466-
.named(fileFormat.name())
467-
.build();
468-
try {
469-
avroAppender.addAll(records);
470-
} finally {
471-
avroAppender.close();
472-
}
473-
474-
return DataFiles.builder(PartitionSpec.unpartitioned())
475-
.withInputFile(HadoopInputFile.fromPath(path, CONF))
476-
.withMetrics(avroAppender.metrics())
477-
.build();
478-
479-
case PARQUET:
480-
FileAppender<Record> parquetAppender = Parquet.write(fromPath(path, CONF))
481-
.schema(schema)
482-
.createWriterFunc(GenericParquetWriter::buildWriter)
483-
.build();
484-
try {
485-
parquetAppender.addAll(records);
486-
} finally {
487-
parquetAppender.close();
488-
}
489-
490-
return DataFiles.builder(PartitionSpec.unpartitioned())
491-
.withInputFile(HadoopInputFile.fromPath(path, CONF))
492-
.withMetrics(parquetAppender.metrics())
493-
.build();
494-
495-
case ORC:
496-
FileAppender<Record> orcAppender = ORC.write(fromPath(path, CONF))
497-
.schema(schema)
498-
.createWriterFunc(GenericOrcWriter::buildWriter)
499-
.build();
500-
try {
501-
orcAppender.addAll(records);
502-
} finally {
503-
orcAppender.close();
504-
}
505-
506-
return DataFiles.builder(PartitionSpec.unpartitioned())
507-
.withInputFile(HadoopInputFile.fromPath(path, CONF))
508-
.withMetrics(orcAppender.metrics())
509-
.build();
510-
511-
default:
512-
throw new UnsupportedOperationException("Cannot write format: " + fileFormat);
455+
456+
FileAppender<Record> fileAppender = new GenericAppenderFactory(schema).newAppender(
457+
fromPath(path, CONF), fileFormat);
458+
try (FileAppender<Record> appender = fileAppender) {
459+
appender.addAll(records);
513460
}
461+
462+
return DataFiles.builder(PartitionSpec.unpartitioned())
463+
.withInputFile(HadoopInputFile.fromPath(path, CONF))
464+
.withMetrics(fileAppender.metrics())
465+
.build();
514466
}
515467

516468
@Test

spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,9 @@
3131
import org.apache.iceberg.PartitionSpec;
3232
import org.apache.iceberg.Schema;
3333
import org.apache.iceberg.Table;
34-
import org.apache.iceberg.avro.Avro;
34+
import org.apache.iceberg.data.GenericAppenderFactory;
3535
import org.apache.iceberg.data.Record;
36-
import org.apache.iceberg.data.avro.DataWriter;
37-
import org.apache.iceberg.data.orc.GenericOrcWriter;
38-
import org.apache.iceberg.data.parquet.GenericParquetWriter;
3936
import org.apache.iceberg.io.FileAppender;
40-
import org.apache.iceberg.orc.ORC;
41-
import org.apache.iceberg.parquet.Parquet;
4237
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
4338
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
4439
import org.apache.iceberg.spark.SparkValueConverter;
@@ -111,33 +106,9 @@ protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema
111106
// When tables are created, the column ids are reassigned.
112107
Schema tableSchema = table.schema();
113108

114-
switch (format) {
115-
case AVRO:
116-
try (FileAppender<Record> writer = Avro.write(localOutput(testFile))
117-
.createWriterFunc(DataWriter::create)
118-
.schema(tableSchema)
119-
.build()) {
120-
writer.add(record);
121-
}
122-
break;
123-
124-
case PARQUET:
125-
try (FileAppender<Record> writer = Parquet.write(localOutput(testFile))
126-
.createWriterFunc(GenericParquetWriter::buildWriter)
127-
.schema(tableSchema)
128-
.build()) {
129-
writer.add(record);
130-
}
131-
break;
132-
133-
case ORC:
134-
try (FileAppender<org.apache.iceberg.data.Record> writer = ORC.write(localOutput(testFile))
135-
.createWriterFunc(GenericOrcWriter::buildWriter)
136-
.schema(tableSchema)
137-
.build()) {
138-
writer.add(record);
139-
}
140-
break;
109+
try (FileAppender<Record> writer = new GenericAppenderFactory(tableSchema).newAppender(
110+
localOutput(testFile), format)) {
111+
writer.add(record);
141112
}
142113

143114
DataFile file = DataFiles.builder(PartitionSpec.unpartitioned())

0 commit comments

Comments
 (0)