diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index 603fcb081056..a408c7a4df6c 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -66,7 +66,7 @@ public PartitionData copy() { private Map nullValueCounts = null; private Map lowerBounds = null; private Map upperBounds = null; - private List splitOffsets = null; + private Long[] splitOffsets = null; private byte[] keyMetadata = null; // cached schema @@ -134,7 +134,7 @@ public PartitionData copy() { this.nullValueCounts = nullValueCounts; this.lowerBounds = SerializableByteBufferMap.wrap(lowerBounds); this.upperBounds = SerializableByteBufferMap.wrap(upperBounds); - this.splitOffsets = copy(splitOffsets); + this.splitOffsets = splitOffsets == null ? null : splitOffsets.toArray(new Long[0]); this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); } @@ -168,7 +168,8 @@ public PartitionData copy() { } this.fromProjectionPos = toCopy.fromProjectionPos; this.keyMetadata = toCopy.keyMetadata == null ? null : Arrays.copyOf(toCopy.keyMetadata, toCopy.keyMetadata.length); - this.splitOffsets = copy(toCopy.splitOffsets); + this.splitOffsets = toCopy.splitOffsets == null ? null : + Arrays.copyOf(toCopy.splitOffsets, toCopy.splitOffsets.length); } /** @@ -234,7 +235,7 @@ public void put(int i, Object value) { this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value); return; case 12: - this.splitOffsets = (List) value; + this.splitOffsets = value != null ? ((List) value).toArray(new Long[0]) : null; return; default: // ignore the object, it must be from a newer version of the format @@ -279,7 +280,7 @@ public Object get(int i) { case 11: return keyMetadata != null ? ByteBuffer.wrap(keyMetadata) : null; case 12: - return splitOffsets; + return splitOffsets != null ? Lists.newArrayList(splitOffsets) : null; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } @@ -357,7 +358,7 @@ public ByteBuffer keyMetadata() { @Override public List splitOffsets() { - return splitOffsets; + return splitOffsets != null ? Lists.newArrayList(splitOffsets) : null; } private static Map copy(Map map) { @@ -369,15 +370,6 @@ private static Map copy(Map map) { return null; } - private static List copy(List list) { - if (list != null) { - List copy = Lists.newArrayListWithExpectedSize(list.size()); - copy.addAll(list); - return Collections.unmodifiableList(copy); - } - return null; - } - @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java b/flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java new file mode 100644 index 000000000000..a1d8fe97d176 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.util.Locale; +import java.util.Map; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +class IcebergSinkUtil { + private IcebergSinkUtil() { + } + + static IcebergStreamWriter createStreamWriter(Table table, TableSchema tableSchema) { + Preconditions.checkArgument(table != null, "Iceberg table should't be null"); + + if (tableSchema != null) { + Schema writeSchema = FlinkSchemaUtil.convert(tableSchema); + // Reassign ids to match the existing table schema. + writeSchema = TypeUtil.reassignIds(writeSchema, table.schema()); + TypeUtil.validateWriteSchema(table.schema(), writeSchema, true, true); + } + + Map props = table.properties(); + long targetFileSize = getTargetFileSizeBytes(props); + FileFormat fileFormat = getFileFormat(props); + + TaskWriterFactory taskWriterFactory = new RowTaskWriterFactory(table.schema(), table.spec(), + table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props); + + return new IcebergStreamWriter<>(table.toString(), taskWriterFactory); + } + + private static FileFormat getFileFormat(Map properties) { + String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); + } + + private static long getTargetFileSizeBytes(Map properties) { + return PropertyUtil.propertyAsLong(properties, + WRITE_TARGET_FILE_SIZE_BYTES, + WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java b/flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java new file mode 100644 index 000000000000..e18c29311496 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.IOException; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +class IcebergStreamWriter extends AbstractStreamOperator + implements OneInputStreamOperator, BoundedOneInput { + + private static final long serialVersionUID = 1L; + + private final String fullTableName; + + private transient TaskWriterFactory taskWriterFactory; + private transient TaskWriter writer; + private transient int subTaskId; + private transient int attemptId; + + IcebergStreamWriter(String fullTableName, TaskWriterFactory taskWriterFactory) { + this.fullTableName = fullTableName; + this.taskWriterFactory = taskWriterFactory; + } + + @Override + public void open() { + this.subTaskId = getRuntimeContext().getIndexOfThisSubtask(); + this.attemptId = getRuntimeContext().getAttemptNumber(); + + // Initialize the task writer factory. + this.taskWriterFactory.initialize(subTaskId, attemptId); + + // Initialize the task writer. + this.writer = taskWriterFactory.create(); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + // close all open files and emit files to downstream committer operator + for (DataFile dataFile : writer.complete()) { + emit(dataFile); + } + + this.writer = taskWriterFactory.create(); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + writer.write(element.getValue()); + } + + @Override + public void dispose() throws Exception { + super.dispose(); + if (writer != null) { + writer.close(); + writer = null; + } + } + + @Override + public void endInput() throws IOException { + // For bounded stream, it may don't enable the checkpoint mechanism so we'd better to emit the remaining + // data files to downstream before closing the writer so that we won't miss any of them. + for (DataFile dataFile : writer.complete()) { + emit(dataFile); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("table_name", fullTableName) + .add("subtask_id", subTaskId) + .add("attempt_id", attemptId) + .toString(); + } + + private void emit(DataFile dataFile) { + output.collect(new StreamRecord<>(dataFile)); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/RowTaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/RowTaskWriterFactory.java new file mode 100644 index 000000000000..40e3e73f97f7 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/RowTaskWriterFactory.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import org.apache.flink.types.Row; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.flink.data.FlinkParquetWriters; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +class RowTaskWriterFactory implements TaskWriterFactory { + private final Schema schema; + private final PartitionSpec spec; + private final LocationProvider locations; + private final FileIO io; + private final EncryptionManager encryptionManager; + private final long targetFileSizeBytes; + private final FileFormat format; + private final FileAppenderFactory appenderFactory; + + private OutputFileFactory outputFileFactory; + + RowTaskWriterFactory(Schema schema, + PartitionSpec spec, + LocationProvider locations, + FileIO io, + EncryptionManager encryptionManager, + long targetFileSizeBytes, + FileFormat format, + Map tableProperties) { + this.schema = schema; + this.spec = spec; + this.locations = locations; + this.io = io; + this.encryptionManager = encryptionManager; + this.targetFileSizeBytes = targetFileSizeBytes; + this.format = format; + this.appenderFactory = new FlinkFileAppenderFactory(schema, tableProperties); + } + + @Override + public void initialize(int taskId, int attemptId) { + this.outputFileFactory = new OutputFileFactory(spec, format, locations, io, encryptionManager, taskId, attemptId); + } + + @Override + public TaskWriter create() { + Preconditions.checkNotNull(outputFileFactory, + "The outputFileFactory shouldn't be null if we have invoked the initialize()."); + + if (spec.fields().isEmpty()) { + return new UnpartitionedWriter<>(spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes); + } else { + return new RowPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory, + io, targetFileSizeBytes, schema); + } + } + + private static class RowPartitionedFanoutWriter extends PartitionedFanoutWriter { + + private final PartitionKey partitionKey; + private final RowWrapper rowWrapper; + + RowPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + this.partitionKey = new PartitionKey(spec, schema); + this.rowWrapper = new RowWrapper(schema.asStruct()); + } + + @Override + protected PartitionKey partition(Row row) { + partitionKey.partition(rowWrapper.wrap(row)); + return partitionKey; + } + } + + private static class FlinkFileAppenderFactory implements FileAppenderFactory { + private final Schema schema; + private final Map props; + + private FlinkFileAppenderFactory(Schema schema, Map props) { + this.schema = schema; + this.props = props; + } + + @Override + public FileAppender newAppender(OutputFile outputFile, FileFormat format) { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + try { + switch (format) { + case PARQUET: + return Parquet.write(outputFile) + .createWriterFunc(FlinkParquetWriters::buildWriter) + .setAll(props) + .metricsConfig(metricsConfig) + .schema(schema) + .overwrite() + .build(); + + case AVRO: + // TODO add the Avro writer building once RowDataWrapper is ready. + case ORC: + default: + throw new UnsupportedOperationException("Cannot write unknown file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java index c432de5e7bab..c47da24951be 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java @@ -19,95 +19,28 @@ package org.apache.iceberg.flink; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.Map; -import org.apache.flink.types.Row; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.MetricsConfig; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.flink.data.FlinkParquetWriters; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.io.FileAppenderFactory; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.io.OutputFileFactory; +import java.io.Serializable; import org.apache.iceberg.io.TaskWriter; -import org.apache.iceberg.io.UnpartitionedWriter; -import org.apache.iceberg.parquet.Parquet; -class TaskWriterFactory { - private TaskWriterFactory() { - } - - static TaskWriter createTaskWriter(Schema schema, - PartitionSpec spec, - FileFormat format, - FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileIO io, - long targetFileSizeBytes) { - if (spec.fields().isEmpty()) { - return new UnpartitionedWriter<>(spec, format, appenderFactory, fileFactory, io, targetFileSizeBytes); - } else { - return new RowPartitionedFanoutWriter(spec, format, appenderFactory, fileFactory, - io, targetFileSizeBytes, schema); - } - } - - private static class RowPartitionedFanoutWriter extends PartitionedFanoutWriter { - - private final PartitionKey partitionKey; - private final RowWrapper rowWrapper; - - RowPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); - this.partitionKey = new PartitionKey(spec, schema); - this.rowWrapper = new RowWrapper(schema.asStruct()); - } - - @Override - protected PartitionKey partition(Row row) { - partitionKey.partition(rowWrapper.wrap(row)); - return partitionKey; - } - } - - static class FlinkFileAppenderFactory implements FileAppenderFactory { - private final Schema schema; - private final Map props; - - FlinkFileAppenderFactory(Schema schema, Map props) { - this.schema = schema; - this.props = props; - } - - @Override - public FileAppender newAppender(OutputFile outputFile, FileFormat format) { - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); - try { - switch (format) { - case PARQUET: - return Parquet.write(outputFile) - .createWriterFunc(FlinkParquetWriters::buildWriter) - .setAll(props) - .metricsConfig(metricsConfig) - .schema(schema) - .overwrite() - .build(); - - case AVRO: - // TODO add AVRO once the RowDataWrapper are ready. - case ORC: - default: - throw new UnsupportedOperationException("Cannot write unknown file format: " + format); - } - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - } +/** + * Factory to create {@link TaskWriter} + * + * @param data type of record. + */ +interface TaskWriterFactory extends Serializable { + + /** + * Initialize the factory with a given taskId and attemptId. + * + * @param taskId the identifier of task. + * @param attemptId the attempt id of this task. + */ + void initialize(int taskId, int attemptId); + + /** + * Initialize a {@link TaskWriter} with given task id and attempt id. + * + * @return a newly created task writer. + */ + TaskWriter create(); } diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index 2ac907a3658e..a81da4a14a82 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.types.Row; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -48,6 +50,11 @@ private SimpleDataUtil() { Types.NestedField.optional(2, "data", Types.StringType.get()) ); + static final TableSchema FLINK_SCHEMA = TableSchema.builder() + .field("id", DataTypes.INT()) + .field("data", DataTypes.STRING()) + .build(); + static final Record RECORD = GenericRecord.create(SCHEMA); static Table createTable(String path, Map properties, boolean partitioned) { diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java b/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java new file mode 100644 index 000000000000..a32075eb694c --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestIcebergStreamWriter { + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + private String tablePath; + private Table table; + + private final FileFormat format; + private final boolean partitioned; + + // TODO add AVRO/ORC unit test once the readers and writers are ready. + @Parameterized.Parameters(name = "format = {0}, partitioned = {1}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {"parquet", true}, + new Object[] {"parquet", false} + }; + } + + public TestIcebergStreamWriter(String format, boolean partitioned) { + this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); + this.partitioned = partitioned; + } + + @Before + public void before() throws IOException { + File folder = tempFolder.newFolder(); + tablePath = folder.getAbsolutePath(); + + // Construct the iceberg table. + Map props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()); + table = SimpleDataUtil.createTable(tablePath, props, partitioned); + } + + @Test + public void testWritingTable() throws Exception { + long checkpointId = 1L; + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + // The first checkpoint + testHarness.processElement(Row.of(1, "hello"), 1); + testHarness.processElement(Row.of(2, "world"), 1); + testHarness.processElement(Row.of(3, "hello"), 1); + + testHarness.prepareSnapshotPreBarrier(checkpointId); + long expectedDataFiles = partitioned ? 2 : 1; + Assert.assertEquals(expectedDataFiles, testHarness.extractOutputValues().size()); + + checkpointId = checkpointId + 1; + + // The second checkpoint + testHarness.processElement(Row.of(4, "foo"), 1); + testHarness.processElement(Row.of(5, "bar"), 2); + + testHarness.prepareSnapshotPreBarrier(checkpointId); + expectedDataFiles = partitioned ? 4 : 2; + Assert.assertEquals(expectedDataFiles, testHarness.extractOutputValues().size()); + + // Commit the iceberg transaction. + AppendFiles appendFiles = table.newAppend(); + testHarness.extractOutputValues().forEach(appendFiles::appendFile); + appendFiles.commit(); + + // Assert the table records. + SimpleDataUtil.assertTableRecords(tablePath, Lists.newArrayList( + SimpleDataUtil.createRecord(1, "hello"), + SimpleDataUtil.createRecord(2, "world"), + SimpleDataUtil.createRecord(3, "hello"), + SimpleDataUtil.createRecord(4, "foo"), + SimpleDataUtil.createRecord(5, "bar") + )); + } + } + + @Test + public void testSnapshotTwice() throws Exception { + long checkpointId = 1; + long timestamp = 1; + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + testHarness.processElement(Row.of(1, "hello"), timestamp++); + testHarness.processElement(Row.of(2, "world"), timestamp); + + testHarness.prepareSnapshotPreBarrier(checkpointId++); + long expectedDataFiles = partitioned ? 2 : 1; + Assert.assertEquals(expectedDataFiles, testHarness.extractOutputValues().size()); + + // snapshot again immediately. + for (int i = 0; i < 5; i++) { + testHarness.prepareSnapshotPreBarrier(checkpointId++); + Assert.assertEquals(expectedDataFiles, testHarness.extractOutputValues().size()); + } + } + } + + @Test + public void testTableWithoutSnapshot() throws Exception { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + Assert.assertEquals(0, testHarness.extractOutputValues().size()); + } + // Even if we closed the iceberg stream writer, there's no orphan data file. + Assert.assertEquals(0, scanDataFiles().size()); + + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + testHarness.processElement(Row.of(1, "hello"), 1); + // Still not emit the data file yet, because there is no checkpoint. + Assert.assertEquals(0, testHarness.extractOutputValues().size()); + } + // Once we closed the iceberg stream writer, there will left an orphan data file. + Assert.assertEquals(1, scanDataFiles().size()); + } + + private Set scanDataFiles() throws IOException { + Path dataDir = new Path(tablePath, "data"); + FileSystem fs = FileSystem.get(new Configuration()); + if (!fs.exists(dataDir)) { + return ImmutableSet.of(); + } else { + Set paths = Sets.newHashSet(); + RemoteIterator iterators = fs.listFiles(dataDir, true); + while (iterators.hasNext()) { + LocatedFileStatus status = iterators.next(); + if (status.isFile()) { + Path path = status.getPath(); + if (path.getName().endsWith("." + format.toString().toLowerCase())) { + paths.add(path.toString()); + } + } + } + return paths; + } + } + + @Test + public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + testHarness.processElement(Row.of(1, "hello"), 1); + testHarness.processElement(Row.of(2, "world"), 2); + + Assert.assertTrue(testHarness.getOneInputOperator() instanceof BoundedOneInput); + ((BoundedOneInput) testHarness.getOneInputOperator()).endInput(); + + long expectedDataFiles = partitioned ? 2 : 1; + Assert.assertEquals(expectedDataFiles, testHarness.extractOutputValues().size()); + + // invoke endInput again. + ((BoundedOneInput) testHarness.getOneInputOperator()).endInput(); + Assert.assertEquals(expectedDataFiles * 2, testHarness.extractOutputValues().size()); + } + } + + @Test + public void testTableWithTargetFileSize() throws Exception { + // Adjust the target-file-size in table properties. + table.updateProperties() + .set(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, "4") // ~4 bytes; low enough to trigger + .commit(); + + List rows = Lists.newArrayListWithCapacity(8000); + List records = Lists.newArrayListWithCapacity(8000); + for (int i = 0; i < 2000; i++) { + for (String data : new String[] {"a", "b", "c", "d"}) { + rows.add(Row.of(i, data)); + records.add(SimpleDataUtil.createRecord(i, data)); + } + } + + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + for (Row row : rows) { + testHarness.processElement(row, 1); + } + + // snapshot the operator. + testHarness.prepareSnapshotPreBarrier(1); + Assert.assertEquals(8, testHarness.extractOutputValues().size()); + + // Assert that the data file have the expected records. + for (DataFile serDataFile : testHarness.extractOutputValues()) { + Assert.assertEquals(1000, serDataFile.recordCount()); + } + + // Commit the iceberg transaction. + AppendFiles appendFiles = table.newAppend(); + testHarness.extractOutputValues().forEach(appendFiles::appendFile); + appendFiles.commit(); + } + + // Assert the table records. + SimpleDataUtil.assertTableRecords(tablePath, records); + } + + private OneInputStreamOperatorTestHarness createIcebergStreamWriter() throws Exception { + IcebergStreamWriter streamWriter = IcebergSinkUtil.createStreamWriter(table, SimpleDataUtil.FLINK_SCHEMA); + OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(streamWriter, + 1, 1, 0); + + harness.setup(); + harness.open(); + + return harness; + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java b/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java index 7df3e7ddf1c6..ee65eb463efb 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java @@ -35,7 +35,6 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.data.RandomData; -import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -229,13 +228,9 @@ public void testRandomData() throws IOException { } private TaskWriter createTaskWriter(long targetFileSize) { - TaskWriterFactory.FlinkFileAppenderFactory appenderFactory = - new TaskWriterFactory.FlinkFileAppenderFactory(table.schema(), table.properties()); - - OutputFileFactory outputFileFactory = new OutputFileFactory(table.spec(), format, table.locationProvider(), - table.io(), table.encryption(), 1, 1); - - return TaskWriterFactory.createTaskWriter(table.schema(), table.spec(), format, - appenderFactory, outputFileFactory, table.io(), targetFileSize); + TaskWriterFactory taskWriterFactory = new RowTaskWriterFactory(table.schema(), table.spec(), + table.locationProvider(), table.io(), table.encryption(), targetFileSize, format, table.properties()); + taskWriterFactory.initialize(1, 1); + return taskWriterFactory.create(); } }