-
Notifications
You must be signed in to change notification settings - Fork 3k
Implement the flink stream writer to accept the row data and emit the complete data files event to downstream #1145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d271f6f
550345b
9e5aa9d
09f5f4e
b5790f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -66,7 +66,7 @@ public PartitionData copy() { | |
| private Map<Integer, Long> nullValueCounts = null; | ||
| private Map<Integer, ByteBuffer> lowerBounds = null; | ||
| private Map<Integer, ByteBuffer> upperBounds = null; | ||
| private List<Long> splitOffsets = null; | ||
| private Long[] splitOffsets = null; | ||
| private byte[] keyMetadata = null; | ||
|
|
||
| // cached schema | ||
|
|
@@ -134,7 +134,7 @@ public PartitionData copy() { | |
| this.nullValueCounts = nullValueCounts; | ||
| this.lowerBounds = SerializableByteBufferMap.wrap(lowerBounds); | ||
| this.upperBounds = SerializableByteBufferMap.wrap(upperBounds); | ||
| this.splitOffsets = copy(splitOffsets); | ||
| this.splitOffsets = splitOffsets == null ? null : splitOffsets.toArray(new Long[0]); | ||
| this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); | ||
| } | ||
|
|
||
|
|
@@ -168,7 +168,8 @@ public PartitionData copy() { | |
| } | ||
| this.fromProjectionPos = toCopy.fromProjectionPos; | ||
| this.keyMetadata = toCopy.keyMetadata == null ? null : Arrays.copyOf(toCopy.keyMetadata, toCopy.keyMetadata.length); | ||
| this.splitOffsets = copy(toCopy.splitOffsets); | ||
| this.splitOffsets = toCopy.splitOffsets == null ? null : | ||
| Arrays.copyOf(toCopy.splitOffsets, toCopy.splitOffsets.length); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -234,7 +235,7 @@ public void put(int i, Object value) { | |
| this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value); | ||
| return; | ||
| case 12: | ||
| this.splitOffsets = (List<Long>) value; | ||
| this.splitOffsets = value != null ? ((List<Long>) value).toArray(new Long[0]) : null; | ||
| return; | ||
| default: | ||
| // ignore the object, it must be from a newer version of the format | ||
|
|
@@ -279,7 +280,7 @@ public Object get(int i) { | |
| case 11: | ||
| return keyMetadata != null ? ByteBuffer.wrap(keyMetadata) : null; | ||
| case 12: | ||
| return splitOffsets; | ||
| return splitOffsets != null ? Lists.newArrayList(splitOffsets) : null; | ||
| default: | ||
| throw new UnsupportedOperationException("Unknown field ordinal: " + pos); | ||
| } | ||
|
|
@@ -357,7 +358,7 @@ public ByteBuffer keyMetadata() { | |
|
|
||
| @Override | ||
| public List<Long> splitOffsets() { | ||
| return splitOffsets; | ||
| return splitOffsets != null ? Lists.newArrayList(splitOffsets) : null; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this return an
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought that we use unmodified collection before because we don't wanna to change the contents inside splitOffsets. Now we've accomplished the same purpose. Returning it as modifiable or unmodifiable collection, both of them sounds good to me. |
||
| } | ||
|
|
||
| private static <K, V> Map<K, V> copy(Map<K, V> map) { | ||
|
|
@@ -369,15 +370,6 @@ private static <K, V> Map<K, V> copy(Map<K, V> map) { | |
| return null; | ||
| } | ||
|
|
||
| private static <E> List<E> copy(List<E> list) { | ||
| if (list != null) { | ||
| List<E> copy = Lists.newArrayListWithExpectedSize(list.size()); | ||
| copy.addAll(list); | ||
| return Collections.unmodifiableList(copy); | ||
openinx marked this conversation as resolved.
Show resolved
Hide resolved
openinx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return MoreObjects.toStringHelper(this) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.flink; | ||
|
|
||
| import java.util.Locale; | ||
| import java.util.Map; | ||
| import org.apache.flink.table.api.TableSchema; | ||
| import org.apache.flink.types.Row; | ||
| import org.apache.iceberg.FileFormat; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.types.TypeUtil; | ||
| import org.apache.iceberg.util.PropertyUtil; | ||
|
|
||
| import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; | ||
| import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; | ||
| import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; | ||
| import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; | ||
|
|
||
| class IcebergSinkUtil { | ||
| private IcebergSinkUtil() { | ||
| } | ||
|
|
||
| static IcebergStreamWriter<Row> createStreamWriter(Table table, TableSchema tableSchema) { | ||
| Preconditions.checkArgument(table != null, "Iceberg table should't be null"); | ||
|
|
||
| if (tableSchema != null) { | ||
| Schema writeSchema = FlinkSchemaUtil.convert(tableSchema); | ||
| // Reassign ids to match the existing table schema. | ||
| writeSchema = TypeUtil.reassignIds(writeSchema, table.schema()); | ||
| TypeUtil.validateWriteSchema(table.schema(), writeSchema, true, true); | ||
| } | ||
|
|
||
| Map<String, String> props = table.properties(); | ||
| long targetFileSize = getTargetFileSizeBytes(props); | ||
| FileFormat fileFormat = getFileFormat(props); | ||
|
|
||
| TaskWriterFactory<Row> taskWriterFactory = new RowTaskWriterFactory(table.schema(), table.spec(), | ||
| table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props); | ||
|
|
||
| return new IcebergStreamWriter<>(table.toString(), taskWriterFactory); | ||
| } | ||
|
|
||
| private static FileFormat getFileFormat(Map<String, String> properties) { | ||
| String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); | ||
| return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); | ||
| } | ||
|
|
||
| private static long getTargetFileSizeBytes(Map<String, String> properties) { | ||
| return PropertyUtil.propertyAsLong(properties, | ||
| WRITE_TARGET_FILE_SIZE_BYTES, | ||
| WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.flink; | ||
|
|
||
| import java.io.IOException; | ||
| import org.apache.flink.streaming.api.operators.AbstractStreamOperator; | ||
| import org.apache.flink.streaming.api.operators.BoundedOneInput; | ||
| import org.apache.flink.streaming.api.operators.OneInputStreamOperator; | ||
| import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; | ||
| import org.apache.iceberg.DataFile; | ||
| import org.apache.iceberg.io.TaskWriter; | ||
| import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; | ||
|
|
||
| class IcebergStreamWriter<T> extends AbstractStreamOperator<DataFile> | ||
| implements OneInputStreamOperator<T, DataFile>, BoundedOneInput { | ||
|
|
||
| private static final long serialVersionUID = 1L; | ||
|
|
||
| private final String fullTableName; | ||
|
|
||
| private transient TaskWriterFactory<T> taskWriterFactory; | ||
| private transient TaskWriter<T> writer; | ||
| private transient int subTaskId; | ||
| private transient int attemptId; | ||
|
|
||
| IcebergStreamWriter(String fullTableName, TaskWriterFactory<T> taskWriterFactory) { | ||
| this.fullTableName = fullTableName; | ||
| this.taskWriterFactory = taskWriterFactory; | ||
| } | ||
|
|
||
| @Override | ||
| public void open() { | ||
| this.subTaskId = getRuntimeContext().getIndexOfThisSubtask(); | ||
| this.attemptId = getRuntimeContext().getAttemptNumber(); | ||
|
|
||
| // Initialize the task writer factory. | ||
| this.taskWriterFactory.initialize(subTaskId, attemptId); | ||
|
|
||
| // Initialize the task writer. | ||
| this.writer = taskWriterFactory.create(); | ||
| } | ||
|
|
||
| @Override | ||
| public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this guaranteed to be called from the same thread as
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not the same thread, but:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In recent versions, Flink has refactored the lock model, using queue and single thread. |
||
| // close all open files and emit files to downstream committer operator | ||
| for (DataFile dataFile : writer.complete()) { | ||
| emit(dataFile); | ||
| } | ||
|
|
||
| this.writer = taskWriterFactory.create(); | ||
| } | ||
|
|
||
| @Override | ||
| public void processElement(StreamRecord<T> element) throws Exception { | ||
| writer.write(element.getValue()); | ||
| } | ||
|
|
||
| @Override | ||
| public void dispose() throws Exception { | ||
| super.dispose(); | ||
| if (writer != null) { | ||
| writer.close(); | ||
openinx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| writer = null; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void endInput() throws IOException { | ||
| // For bounded stream, it may don't enable the checkpoint mechanism so we'd better to emit the remaining | ||
| // data files to downstream before closing the writer so that we won't miss any of them. | ||
| for (DataFile dataFile : writer.complete()) { | ||
| emit(dataFile); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return MoreObjects.toStringHelper(this) | ||
| .add("table_name", fullTableName) | ||
| .add("subtask_id", subTaskId) | ||
| .add("attempt_id", attemptId) | ||
| .toString(); | ||
| } | ||
|
|
||
| private void emit(DataFile dataFile) { | ||
| output.collect(new StreamRecord<>(dataFile)); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,145 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.flink; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.UncheckedIOException; | ||
| import java.util.Map; | ||
| import org.apache.flink.types.Row; | ||
| import org.apache.iceberg.FileFormat; | ||
| import org.apache.iceberg.MetricsConfig; | ||
| import org.apache.iceberg.PartitionKey; | ||
| import org.apache.iceberg.PartitionSpec; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.encryption.EncryptionManager; | ||
| import org.apache.iceberg.flink.data.FlinkParquetWriters; | ||
| import org.apache.iceberg.io.FileAppender; | ||
| import org.apache.iceberg.io.FileAppenderFactory; | ||
| import org.apache.iceberg.io.FileIO; | ||
| import org.apache.iceberg.io.LocationProvider; | ||
| import org.apache.iceberg.io.OutputFile; | ||
| import org.apache.iceberg.io.OutputFileFactory; | ||
| import org.apache.iceberg.io.TaskWriter; | ||
| import org.apache.iceberg.io.UnpartitionedWriter; | ||
| import org.apache.iceberg.parquet.Parquet; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
|
|
||
| class RowTaskWriterFactory implements TaskWriterFactory<Row> { | ||
| private final Schema schema; | ||
| private final PartitionSpec spec; | ||
| private final LocationProvider locations; | ||
| private final FileIO io; | ||
| private final EncryptionManager encryptionManager; | ||
| private final long targetFileSizeBytes; | ||
| private final FileFormat format; | ||
| private final FileAppenderFactory<Row> appenderFactory; | ||
|
|
||
| private OutputFileFactory outputFileFactory; | ||
|
|
||
| RowTaskWriterFactory(Schema schema, | ||
| PartitionSpec spec, | ||
| LocationProvider locations, | ||
| FileIO io, | ||
| EncryptionManager encryptionManager, | ||
| long targetFileSizeBytes, | ||
| FileFormat format, | ||
| Map<String, String> tableProperties) { | ||
| this.schema = schema; | ||
| this.spec = spec; | ||
| this.locations = locations; | ||
| this.io = io; | ||
| this.encryptionManager = encryptionManager; | ||
| this.targetFileSizeBytes = targetFileSizeBytes; | ||
| this.format = format; | ||
| this.appenderFactory = new FlinkFileAppenderFactory(schema, tableProperties); | ||
| } | ||
|
|
||
| @Override | ||
| public void initialize(int taskId, int attemptId) { | ||
| this.outputFileFactory = new OutputFileFactory(spec, format, locations, io, encryptionManager, taskId, attemptId); | ||
| } | ||
|
|
||
| @Override | ||
| public TaskWriter<Row> create() { | ||
| Preconditions.checkNotNull(outputFileFactory, | ||
| "The outputFileFactory shouldn't be null if we have invoked the initialize()."); | ||
|
|
||
| if (spec.fields().isEmpty()) { | ||
| return new UnpartitionedWriter<>(spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes); | ||
| } else { | ||
| return new RowPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory, | ||
| io, targetFileSizeBytes, schema); | ||
| } | ||
| } | ||
|
|
||
| private static class RowPartitionedFanoutWriter extends PartitionedFanoutWriter<Row> { | ||
|
|
||
| private final PartitionKey partitionKey; | ||
| private final RowWrapper rowWrapper; | ||
|
|
||
| RowPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<Row> appenderFactory, | ||
| OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema) { | ||
| super(spec, format, appenderFactory, fileFactory, io, targetFileSize); | ||
| this.partitionKey = new PartitionKey(spec, schema); | ||
| this.rowWrapper = new RowWrapper(schema.asStruct()); | ||
| } | ||
|
|
||
| @Override | ||
| protected PartitionKey partition(Row row) { | ||
| partitionKey.partition(rowWrapper.wrap(row)); | ||
| return partitionKey; | ||
| } | ||
| } | ||
|
|
||
| private static class FlinkFileAppenderFactory implements FileAppenderFactory<Row> { | ||
| private final Schema schema; | ||
| private final Map<String, String> props; | ||
|
|
||
| private FlinkFileAppenderFactory(Schema schema, Map<String, String> props) { | ||
| this.schema = schema; | ||
| this.props = props; | ||
| } | ||
|
|
||
| @Override | ||
| public FileAppender<Row> newAppender(OutputFile outputFile, FileFormat format) { | ||
| MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); | ||
| try { | ||
| switch (format) { | ||
| case PARQUET: | ||
| return Parquet.write(outputFile) | ||
| .createWriterFunc(FlinkParquetWriters::buildWriter) | ||
| .setAll(props) | ||
| .metricsConfig(metricsConfig) | ||
| .schema(schema) | ||
| .overwrite() | ||
| .build(); | ||
|
|
||
| case AVRO: | ||
| // TODO add the Avro writer building once RowDataWrapper is ready. | ||
| case ORC: | ||
| default: | ||
| throw new UnsupportedOperationException("Cannot write unknown file format: " + format); | ||
| } | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException(e); | ||
| } | ||
| } | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The corresponding
getimplementation also needs to be updated. That's what is causing test failures.