diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java index 82d8145a2927..8a77a85d78d5 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java @@ -217,6 +217,19 @@ public boolean rename(Path src, Path dst) throws IOException { } } + @Override + public boolean copyFileUtf8(Path sourcePath, Path targetPath) throws IOException { + if (Files.exists(toPath(targetPath))) { + return false; + } + Files.copy(toPath(sourcePath), toPath(targetPath), StandardCopyOption.COPY_ATTRIBUTES); + return true; + } + + private java.nio.file.Path toPath(Path path) { + return toFile(path).toPath(); + } + /** * Converts the given Path to a File for this file system. If the path is empty, we will return * new File(".") instead of new File(""), since the latter returns diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index 3cd7bb3b6959..6d358b15ce0d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -32,6 +32,9 @@ import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.RecordAttributeManager; + +import javax.annotation.Nullable; import java.util.Comparator; import java.util.List; @@ -87,12 +90,14 @@ public RawFileSplitRead newRead() { @Override public AppendOnlyFileStoreWrite newWrite(String commitUser) { - return newWrite(commitUser, null); + return newWrite(commitUser, null, null); } @Override public AppendOnlyFileStoreWrite newWrite( - String commitUser, ManifestCacheFilter manifestFilter) { + String commitUser, + ManifestCacheFilter manifestFilter, + @Nullable RecordAttributeManager recordAttributeManager) { return new AppendOnlyFileStoreWrite( fileIO, newRead(), diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index e943d38cf5e1..7af99cb3ba9e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -37,6 +37,7 @@ import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.RecordAttributeManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -76,7 +77,10 @@ public interface FileStore extends Serializable { FileStoreWrite newWrite(String commitUser); - FileStoreWrite newWrite(String commitUser, ManifestCacheFilter manifestFilter); + FileStoreWrite newWrite( + String commitUser, + ManifestCacheFilter manifestFilter, + @Nullable RecordAttributeManager recordAttributeManager); FileStoreCommit newCommit(String commitUser); diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index b1b7fc211c1a..fd9b5320a890 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -42,9 +42,12 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.KeyComparatorSupplier; +import org.apache.paimon.utils.RecordAttributeManager; import org.apache.paimon.utils.UserDefinedSeqComparator; import org.apache.paimon.utils.ValueEqualiserSupplier; +import javax.annotation.Nullable; + import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -152,11 +155,14 @@ public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() { @Override public KeyValueFileStoreWrite newWrite(String commitUser) { - return newWrite(commitUser, null); + return newWrite(commitUser, null, null); } @Override - public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter manifestFilter) { + public KeyValueFileStoreWrite newWrite( + String commitUser, + ManifestCacheFilter manifestFilter, + @Nullable RecordAttributeManager recordAttributeManager) { IndexMaintainer.Factory indexFactory = null; if (bucketMode() == BucketMode.HASH_DYNAMIC) { indexFactory = new HashIndexMaintainer.Factory(newIndexFileHandler()); @@ -185,7 +191,8 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma deletionVectorsMaintainerFactory, options, keyValueFieldsExtractor, - tableName); + tableName, + recordAttributeManager); } private Map format2PathFactory() { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 7b2785261887..9200ad29165e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -349,6 +349,26 @@ public DataFileMeta upgrade(int newLevel) { fileSource); } + public DataFileMeta rename(String newFileName) { + return new DataFileMeta( + newFileName, + fileSize, + rowCount, + minKey, + maxKey, + keyStats, + valueStats, + minSequenceNumber, + maxSequenceNumber, + schemaId, + level, + extraFiles, + creationTime, + deleteRowCount, + embeddedIndex, + fileSource); + } + public List collectFiles(DataFilePathFactory pathFactory) { List paths = new ArrayList<>(); paths.add(pathFactory.toPath(fileName)); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index 72f9b3f65153..4a773380520a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -36,6 +36,7 @@ import javax.annotation.Nullable; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -123,6 +124,21 @@ public void deleteFile(String filename, int level) { fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename)); } + public void copyFile(String sourceFileName, String targetFileName, int level) + throws IOException { + Path sourcePath = formatContext.pathFactory(level).toPath(sourceFileName); + Path targetPath = formatContext.pathFactory(level).toPath(targetFileName); + fileIO.copyFileUtf8(sourcePath, targetPath); + } + + public FileIO getFileIO() { + return fileIO; + } + + public Path newChangelogPath(int level) { + return formatContext.pathFactory(level).newChangelogPath(); + } + public static Builder builder( FileIO fileIO, long schemaId, diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index cf2e6c25159b..9a2323b6cc42 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -26,6 +26,8 @@ import org.apache.paimon.compact.CompactResult; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; @@ -37,6 +39,7 @@ import org.apache.paimon.mergetree.compact.MergeFunction; import org.apache.paimon.options.MemorySize; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BucketRecordAttributeManager; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.RecordWriter; @@ -78,6 +81,9 @@ public class MergeTreeWriter implements RecordWriter, MemoryOwner { private final LinkedHashMap compactBefore; private final LinkedHashSet compactAfter; private final LinkedHashSet compactChangelog; + @Nullable private final BucketRecordAttributeManager recordAttributeManager; + + private final boolean canChangelogOptimizedToCopyPreconditions; @Nullable private CompactDeletionFile compactDeletionFile; @@ -98,7 +104,8 @@ public MergeTreeWriter( boolean commitForceCompact, ChangelogProducer changelogProducer, @Nullable CommitIncrement increment, - @Nullable FieldsComparator userDefinedSeqComparator) { + @Nullable FieldsComparator userDefinedSeqComparator, + @Nullable BucketRecordAttributeManager recordAttributeManager) { this.writeBufferSpillable = writeBufferSpillable; this.maxDiskSize = maxDiskSize; this.sortMaxFan = sortMaxFan; @@ -114,6 +121,7 @@ public MergeTreeWriter( this.commitForceCompact = commitForceCompact; this.changelogProducer = changelogProducer; this.userDefinedSeqComparator = userDefinedSeqComparator; + this.recordAttributeManager = recordAttributeManager; this.newFiles = new LinkedHashSet<>(); this.deletedFiles = new LinkedHashSet<>(); @@ -133,6 +141,25 @@ public MergeTreeWriter( compactChangelog.addAll(increment.compactIncrement().changelogFiles()); updateCompactDeletionFile(increment.compactDeletionFile()); } + + // TODO: Verify the performance of the default implementation of copy, + // and remove the following code block if proved to be of no regression. + boolean isCopyMethodOverridden; + try { + isCopyMethodOverridden = + !writerFactory + .getFileIO() + .getClass() + .getMethod("copyFileUtf8", Path.class, Path.class) + .getDeclaringClass() + .equals(FileIO.class); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + this.canChangelogOptimizedToCopyPreconditions = + isCopyMethodOverridden + && (changelogProducer == ChangelogProducer.INPUT) + && recordAttributeManager != null; } private long newSequenceNumber() { @@ -213,7 +240,7 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul } final RollingFileWriter changelogWriter = - changelogProducer == ChangelogProducer.INPUT + changelogProducer == ChangelogProducer.INPUT & !canChangelogOptimizedToCopy() ? writerFactory.createRollingChangelogFileWriter(0) : null; final RollingFileWriter dataWriter = @@ -232,22 +259,49 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul dataWriter.close(); } + List fileMetas = dataWriter.result(); if (changelogWriter != null) { newFilesChangelog.addAll(changelogWriter.result()); + } else if (canChangelogOptimizedToCopy()) { + List changelogMetas = getChangelogFileMetaFromDataFile(fileMetas); + for (int i = 0; i < changelogMetas.size(); i++) { + writerFactory.copyFile( + fileMetas.get(i).fileName(), changelogMetas.get(i).fileName(), 0); + } + newFilesChangelog.addAll(changelogMetas); } - for (DataFileMeta fileMeta : dataWriter.result()) { + for (DataFileMeta fileMeta : fileMetas) { newFiles.add(fileMeta); compactManager.addNewFile(fileMeta); } writeBuffer.clear(); + if (recordAttributeManager != null) { + recordAttributeManager.onFlush(); + } } trySyncLatestCompaction(waitForLatestCompaction); compactManager.triggerCompaction(forcedFullCompaction); } + private List getChangelogFileMetaFromDataFile( + List dataFileMetaList) { + List changelogFileMetaList = new ArrayList<>(); + for (DataFileMeta dataFileMeta : dataFileMetaList) { + DataFileMeta changelogFileMeta = + dataFileMeta.rename(writerFactory.newChangelogPath(0).getName()); + changelogFileMetaList.add(changelogFileMeta); + } + return changelogFileMetaList; + } + + private boolean canChangelogOptimizedToCopy() { + return canChangelogOptimizedToCopyPreconditions + && recordAttributeManager.areAllRecordsInsertOnlySinceLastFlush(); + } + @Override public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception { flushWriteBuffer(waitCompaction, false); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index d693c431b102..d64df137f8e6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -67,6 +67,7 @@ import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.RecordAttributeManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.UserDefinedSeqComparator; @@ -101,6 +102,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite { private final RowType keyType; private final RowType valueType; @Nullable private final RecordLevelExpire recordLevelExpire; + @Nullable private final RecordAttributeManager recordAttributeManager; public KeyValueFileStoreWrite( FileIO fileIO, @@ -121,7 +123,8 @@ public KeyValueFileStoreWrite( @Nullable DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory, CoreOptions options, KeyValueFieldsExtractor extractor, - String tableName) { + String tableName, + @Nullable RecordAttributeManager recordAttributeManager) { super( commitUser, snapshotManager, @@ -134,6 +137,7 @@ public KeyValueFileStoreWrite( this.keyType = keyType; this.valueType = valueType; this.udsComparatorSupplier = udsComparatorSupplier; + this.recordAttributeManager = recordAttributeManager; this.readerFactoryBuilder = KeyValueFileReaderFactory.builder( fileIO, @@ -210,7 +214,11 @@ protected MergeTreeWriter createWriter( options.commitForceCompact(), options.changelogProducer(), restoreIncrement, - UserDefinedSeqComparator.create(valueType, options)); + UserDefinedSeqComparator.create(valueType, options), + recordAttributeManager == null + ? null + : recordAttributeManager.getBucketRecordAttributeManager( + partition, bucket)); } @VisibleForTesting diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java index ca2ad04a232d..d23e8393bf77 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java @@ -40,6 +40,7 @@ import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.RecordAttributeManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -128,9 +129,12 @@ public FileStoreWrite newWrite(String commitUser) { } @Override - public FileStoreWrite newWrite(String commitUser, ManifestCacheFilter manifestFilter) { + public FileStoreWrite newWrite( + String commitUser, + ManifestCacheFilter manifestFilter, + @Nullable RecordAttributeManager recordAttributeManager) { privilegeChecker.assertCanInsert(identifier); - return wrapped.newWrite(commitUser, manifestFilter); + return wrapped.newWrite(commitUser, manifestFilter, recordAttributeManager); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index 548ae69ee5cc..0fad65b20f51 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -39,9 +39,12 @@ import org.apache.paimon.table.source.StreamDataTableScan; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.RecordAttributeManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import javax.annotation.Nullable; + import java.time.Duration; import java.util.Map; import java.util.Objects; @@ -265,9 +268,12 @@ public TableWriteImpl newWrite(String commitUser) { } @Override - public TableWriteImpl newWrite(String commitUser, ManifestCacheFilter manifestFilter) { + public TableWriteImpl newWrite( + String commitUser, + ManifestCacheFilter manifestFilter, + @Nullable RecordAttributeManager recordAttributeManager) { privilegeChecker.assertCanInsert(identifier); - return wrapped.newWrite(commitUser, manifestFilter); + return wrapped.newWrite(commitUser, manifestFilter, recordAttributeManager); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index f45865d018c8..55be5651f593 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -42,6 +42,9 @@ import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.types.RowKind; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.RecordAttributeManager; + +import javax.annotation.Nullable; import java.io.IOException; import java.util.function.BiConsumer; @@ -135,15 +138,18 @@ public RecordReader reader(Split split) throws IOException { @Override public TableWriteImpl newWrite(String commitUser) { - return newWrite(commitUser, null); + return newWrite(commitUser, null, null); } @Override public TableWriteImpl newWrite( - String commitUser, ManifestCacheFilter manifestFilter) { + String commitUser, + ManifestCacheFilter manifestFilter, + @Nullable RecordAttributeManager recordAttributeManager) { // if this table is unaware-bucket table, we skip compaction and restored files searching AppendOnlyFileStoreWrite writer = - store().newWrite(commitUser, manifestFilter).withBucketMode(bucketMode()); + store().newWrite(commitUser, manifestFilter, recordAttributeManager) + .withBucketMode(bucketMode()); return new TableWriteImpl<>( writer, createRowKeyExtractor(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java index 212555d7b1d6..ef25200d0969 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java @@ -29,6 +29,9 @@ import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.RecordAttributeManager; + +import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -92,7 +95,10 @@ default Optional comment() { @Override TableWriteImpl newWrite(String commitUser); - TableWriteImpl newWrite(String commitUser, ManifestCacheFilter manifestFilter); + TableWriteImpl newWrite( + String commitUser, + ManifestCacheFilter manifestFilter, + @Nullable RecordAttributeManager recordAttributeManager); @Override TableCommitImpl newCommit(String commitUser); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index 8c6925d73a51..ab65fffe8309 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -40,6 +40,9 @@ import org.apache.paimon.table.source.MergeTreeSplitGenerator; import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.RecordAttributeManager; + +import javax.annotation.Nullable; import java.util.List; import java.util.function.BiConsumer; @@ -158,15 +161,17 @@ public InnerTableRead newRead() { @Override public TableWriteImpl newWrite(String commitUser) { - return newWrite(commitUser, null); + return newWrite(commitUser, null, null); } @Override public TableWriteImpl newWrite( - String commitUser, ManifestCacheFilter manifestFilter) { + String commitUser, + ManifestCacheFilter manifestFilter, + @Nullable RecordAttributeManager recordAttributeManager) { KeyValue kv = new KeyValue(); return new TableWriteImpl<>( - store().newWrite(commitUser, manifestFilter), + store().newWrite(commitUser, manifestFilter, recordAttributeManager), createRowKeyExtractor(), (record, rowKind) -> kv.replace( diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BucketRecordAttributeManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BucketRecordAttributeManager.java new file mode 100644 index 000000000000..bcdeca739670 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BucketRecordAttributeManager.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +/** + * Manager of the attributes of the internal records during runtime. + * + *

Different from {@link RecordAttributeManager}, this class manages the attributes need to be + * set or acquired at bucket's granularity. + */ +public class BucketRecordAttributeManager { + private boolean isInsertOnly; + private boolean areAllRecordsInsertOnlySinceLastFlush; + + public BucketRecordAttributeManager() { + this.isInsertOnly = false; + this.areAllRecordsInsertOnlySinceLastFlush = false; + } + + /** + * This method is called when the insert only status of the records changes. + * + * @param isInsertOnly If true, all the following records would be of {@link + * org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary + * key. + */ + void onInsertOnlyChanged(boolean isInsertOnly) { + this.isInsertOnly = isInsertOnly; + if (!isInsertOnly) { + areAllRecordsInsertOnlySinceLastFlush = false; + } + } + + /** + * This method is called when the internal records are flushed to disk. It denotes that the + * attributes of previously added records are no longer needed. + */ + public void onFlush() { + areAllRecordsInsertOnlySinceLastFlush = isInsertOnly; + } + + /** @return whether all records added since last flush are insert-only. */ + public boolean areAllRecordsInsertOnlySinceLastFlush() { + return areAllRecordsInsertOnlySinceLastFlush; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/RecordAttributeManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/RecordAttributeManager.java new file mode 100644 index 000000000000..266d68fe4648 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/RecordAttributeManager.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.data.BinaryRow; + +import java.util.HashMap; +import java.util.Map; + +/** Manager of the attributes of the internal records during runtime. */ +public class RecordAttributeManager { + private final Map, BucketRecordAttributeManager> bucketManagerMap = + new HashMap<>(); + + private boolean isInsertOnly = false; + + /** + * This method is called when the insert only status of the records changes. + * + * @param isInsertOnly If true, all the following records would be of {@link + * org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary + * key. + */ + public void onInsertOnlyChanged(boolean isInsertOnly) { + this.isInsertOnly = isInsertOnly; + bucketManagerMap.values().forEach(x -> x.onInsertOnlyChanged(isInsertOnly)); + } + + /** + * @return A {@link BucketRecordAttributeManager} that manages the internal record attributes of + * a specific bucket. + */ + public BucketRecordAttributeManager getBucketRecordAttributeManager( + BinaryRow partition, int bucket) { + return bucketManagerMap.computeIfAbsent( + Pair.of(partition, bucket), + pair -> { + BucketRecordAttributeManager manager = new BucketRecordAttributeManager(); + manager.onInsertOnlyChanged(isInsertOnly); + return manager; + }); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 92f14b3ed41c..54360eb99a69 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -58,6 +58,7 @@ import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BucketRecordAttributeManager; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.ExceptionUtils; import org.apache.paimon.utils.FileStorePathFactory; @@ -367,6 +368,35 @@ public void testWriteMany() throws Exception { doTestWriteRead(3, 20_000); } + @Test + public void testChangelog() throws Exception { + writer = + createMergeTreeWriter( + Collections.emptyList(), + createCompactManager(service, Collections.emptyList()), + ChangelogProducer.INPUT, + null); + + doTestWriteReadWithChangelog(8, 200, false); + } + + @Test + public void testChangelogFromCopyingData() throws Exception { + writer = + createMergeTreeWriter( + Collections.emptyList(), + createCompactManager(service, Collections.emptyList()), + ChangelogProducer.INPUT, + new BucketRecordAttributeManager() { + @Override + public boolean areAllRecordsInsertOnlySinceLastFlush() { + return true; + } + }); + + doTestWriteReadWithChangelog(8, 200, true); + } + private void doTestWriteRead(int batchNumber) throws Exception { doTestWriteRead(batchNumber, 200); } @@ -413,12 +443,78 @@ private void doTestWriteRead(int batchNumber, int perBatch) throws Exception { assertThat(files).isEqualTo(Collections.emptySet()); } + private void doTestWriteReadWithChangelog( + int batchNumber, int perBatch, boolean isChangelogEqualToData) throws Exception { + List expected = new ArrayList<>(); + List newFiles = new ArrayList<>(); + List changelogFiles = new ArrayList<>(); + Set newFileNames = new HashSet<>(); + List compactedFiles = new ArrayList<>(); + + // write batch and commit + for (int i = 0; i <= batchNumber; i++) { + if (i < batchNumber) { + expected.addAll(writeBatch(perBatch)); + } else { + writer.sync(); + } + + CommitIncrement increment = writer.prepareCommit(true); + newFiles.addAll(increment.newFilesIncrement().newFiles()); + changelogFiles.addAll(increment.newFilesIncrement().changelogFiles()); + mergeCompacted(newFileNames, compactedFiles, increment); + } + + // assert records from writer + assertRecords(expected); + + // assert records from increment new files + assertRecords(expected, newFiles, false); + assertRecords(expected, newFiles, true); + + // assert records from changelog files + if (isChangelogEqualToData) { + assertRecords(expected, changelogFiles, false); + assertRecords(expected, changelogFiles, true); + } else { + List actual = new ArrayList<>(); + for (DataFileMeta changelogFile : changelogFiles) { + actual.addAll(readAll(Collections.singletonList(changelogFile), false)); + } + assertThat(actual).containsExactlyInAnyOrder(expected.toArray(new TestRecord[0])); + } + + // assert records from increment compacted files + assertRecords(expected, compactedFiles, true); + + writer.close(); + + Path bucketDir = writerFactory.pathFactory(0).toPath("ignore").getParent(); + Set files = + Arrays.stream(LocalFileIO.create().listStatus(bucketDir)) + .map(FileStatus::getPath) + .map(Path::getName) + .collect(Collectors.toSet()); + newFiles.stream().map(DataFileMeta::fileName).forEach(files::remove); + changelogFiles.stream().map(DataFileMeta::fileName).forEach(files::remove); + compactedFiles.stream().map(DataFileMeta::fileName).forEach(files::remove); + assertThat(files).isEqualTo(Collections.emptySet()); + } + private MergeTreeWriter createMergeTreeWriter(List files) { return createMergeTreeWriter(files, createCompactManager(service, files)); } private MergeTreeWriter createMergeTreeWriter( List files, MergeTreeCompactManager compactManager) { + return createMergeTreeWriter(files, compactManager, ChangelogProducer.NONE, null); + } + + private MergeTreeWriter createMergeTreeWriter( + List files, + MergeTreeCompactManager compactManager, + ChangelogProducer changelogProducer, + BucketRecordAttributeManager bucketRecordAttributeManager) { long maxSequenceNumber = files.stream().map(DataFileMeta::maxSequenceNumber).max(Long::compare).orElse(-1L); MergeTreeWriter writer = @@ -434,9 +530,10 @@ private MergeTreeWriter createMergeTreeWriter( DeduplicateMergeFunction.factory().create(), writerFactory, options.commitForceCompact(), - ChangelogProducer.NONE, + changelogProducer, null, - null); + null, + bucketRecordAttributeManager); writer.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); return writer; diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/RecordAttributeManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/RecordAttributeManagerTest.java new file mode 100644 index 000000000000..dd58e695de3e --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/RecordAttributeManagerTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link RecordAttributeManager}. */ +public class RecordAttributeManagerTest { + @Test + public void test() { + BinaryRow partition = new BinaryRow(1); + { + BinaryRowWriter writer = new BinaryRowWriter(partition); + writer.writeInt(0, 0); + writer.complete(); + } + + RecordAttributeManager manager = new RecordAttributeManager(); + BucketRecordAttributeManager bucketManager0 = + manager.getBucketRecordAttributeManager(partition, 0); + BucketRecordAttributeManager bucketManager0Reclaimed = + manager.getBucketRecordAttributeManager(partition, 0); + BucketRecordAttributeManager bucketManager1 = + manager.getBucketRecordAttributeManager(partition, 1); + + assertThat(bucketManager0.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + assertThat(bucketManager0Reclaimed.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + assertThat(bucketManager1.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + + manager.onInsertOnlyChanged(true); + assertThat(bucketManager0.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + assertThat(bucketManager0Reclaimed.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + assertThat(bucketManager1.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + + bucketManager0.onFlush(); + assertThat(bucketManager0.areAllRecordsInsertOnlySinceLastFlush()).isTrue(); + assertThat(bucketManager0Reclaimed.areAllRecordsInsertOnlySinceLastFlush()).isTrue(); + assertThat(bucketManager1.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + + bucketManager1.onFlush(); + assertThat(bucketManager0.areAllRecordsInsertOnlySinceLastFlush()).isTrue(); + assertThat(bucketManager0Reclaimed.areAllRecordsInsertOnlySinceLastFlush()).isTrue(); + assertThat(bucketManager1.areAllRecordsInsertOnlySinceLastFlush()).isTrue(); + + manager.onInsertOnlyChanged(false); + bucketManager0.onFlush(); + assertThat(bucketManager0.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + assertThat(bucketManager0Reclaimed.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + assertThat(bucketManager1.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + } +} diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java index 4d86c12a6e52..2f09a9249f0a 100644 --- a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java @@ -102,7 +102,7 @@ public boolean rename(Path src, Path dst) throws IOException { return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst); } - private org.apache.hadoop.fs.Path path(Path path) { + protected org.apache.hadoop.fs.Path path(Path path) { return new org.apache.hadoop.fs.Path(path.toUri()); } diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java index 6ecc76da3f76..0bb606f8e2c1 100644 --- a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java +++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java @@ -20,9 +20,11 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem; import org.slf4j.Logger; @@ -30,6 +32,8 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.URI; import java.util.HashMap; import java.util.Map; @@ -70,6 +74,28 @@ public class OSSFileIO extends HadoopCompliantFileIO { private Options hadoopOptions; + @Override + public boolean copyFileUtf8(Path sourcePath, Path targetPath) throws IOException { + org.apache.hadoop.fs.Path srcPath = path(sourcePath); + org.apache.hadoop.fs.Path dstPath = path(targetPath); + + try (FileSystem fs = createFileSystem(srcPath)) { + AliyunOSSFileSystem ossFs = (AliyunOSSFileSystem) fs; + FileStatus sourceStatus = ossFs.getFileStatus(srcPath); + + Method method = + AliyunOSSFileSystem.class.getDeclaredMethod( + "copyFile", + org.apache.hadoop.fs.Path.class, + long.class, + org.apache.hadoop.fs.Path.class); + method.setAccessible(true); + return (boolean) method.invoke(ossFs, srcPath, sourceStatus.getLen(), dstPath); + } catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + @Override public boolean isObjectStore() { return true; diff --git a/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OSSFileIOTest.java b/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OSSFileIOTest.java new file mode 100644 index 000000000000..30ef0aa3c347 --- /dev/null +++ b/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OSSFileIOTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.oss; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem; +import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link OSSFileIO}. */ +public class OSSFileIOTest { + @Test + public void testCopy() throws Exception { + TestAliyunOssFileSystemStore store = new TestAliyunOssFileSystemStore(); + TestAliyunOssFileSystem fileSystem = new TestAliyunOssFileSystem(store); + TestOSSFileIO fileIO = new TestOSSFileIO(fileSystem); + fileIO.copyFileUtf8( + new org.apache.paimon.fs.Path("sourceFoo"), + new org.apache.paimon.fs.Path("targetBar")); + assertThat(store.isCopyInvokedAndVerified).isTrue(); + } + + private static class TestOSSFileIO extends OSSFileIO { + private final TestAliyunOssFileSystem fileSystem; + + private TestOSSFileIO(TestAliyunOssFileSystem fileSystem) { + this.fileSystem = fileSystem; + } + + @Override + protected FileSystem createFileSystem(Path path) { + return fileSystem; + } + } + + private static class TestAliyunOssFileSystem extends AliyunOSSFileSystem { + public TestAliyunOssFileSystem(AliyunOSSFileSystemStore store) throws Exception { + Field storeField = AliyunOSSFileSystem.class.getDeclaredField("store"); + storeField.setAccessible(true); + storeField.set(this, store); + + Field workingDirField = AliyunOSSFileSystem.class.getDeclaredField("workingDir"); + workingDirField.setAccessible(true); + workingDirField.set(this, new Path("/")); + } + + @Override + public FileStatus getFileStatus(Path path) { + return new FileStatus(); + } + + @Override + public void close() {} + } + + private static class TestAliyunOssFileSystemStore extends AliyunOSSFileSystemStore { + private boolean isCopyInvokedAndVerified = false; + + @Override + public boolean copyFile(String srcKey, long srcLen, String dstKey) { + assertThat(srcKey).isEqualTo("sourceFoo"); + assertThat(srcLen).isZero(); + assertThat(dstKey).isEqualTo("targetBar"); + isCopyInvokedAndVerified = true; + return true; + } + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java index 9af7eabdaaad..581156930b9c 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java @@ -256,7 +256,13 @@ private OneInputStreamOperatorTestHarness createTestHarn CdcRecordStoreWriteOperator operator = new CdcRecordStoreWriteOperator( table, - (t, commitUser, state, ioManager, memoryPool, metricGroup) -> + (t, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> new StoreSinkWriteImpl( t, commitUser, @@ -266,7 +272,8 @@ private OneInputStreamOperatorTestHarness createTestHarn false, true, memoryPool, - metricGroup), + metricGroup, + recordAttributeManager), commitUser); TypeSerializer inputSerializer = new JavaSerializer<>(); TypeSerializer outputSerializer = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java index b4cf7aa78de2..023ab4983145 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java @@ -61,7 +61,8 @@ public AsyncLookupSinkWrite( waitCompaction, isStreaming, memoryPool, - metricGroup); + metricGroup, + null); this.tableName = table.name(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java index 147e7527ff89..f9dbd19741ce 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java @@ -50,7 +50,7 @@ public DataStreamSink build(DataStream input, @Nullable Integer // bucket-assigner HashBucketAssignerOperator assignerOperator = - new HashBucketAssignerOperator<>( + createHashBucketAssignerOperator( initialCommitUser, table, null, extractorFunction(), true); TupleTypeInfo> rowWithBucketType = new TupleTypeInfo<>(input.getType(), BasicTypeInfo.INT_TYPE_INFO); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java index f04043ce41bc..cf697108fd32 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java @@ -20,6 +20,7 @@ import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.ChannelComputer; import org.apache.paimon.table.sink.PartitionKeyExtractor; import org.apache.paimon.utils.SerializableFunction; @@ -54,6 +55,16 @@ public DynamicBucketSink( protected abstract SerializableFunction> extractorFunction(); + protected HashBucketAssignerOperator createHashBucketAssignerOperator( + String commitUser, + Table table, + Integer numAssigners, + SerializableFunction> extractorFunction, + boolean overwrite) { + return new HashBucketAssignerOperator<>( + commitUser, table, numAssigners, extractorFunction, overwrite); + } + public DataStreamSink build(DataStream input, @Nullable Integer parallelism) { String initialCommitUser = createCommitUser(table.coreOptions().toConfiguration()); @@ -73,7 +84,7 @@ public DataStreamSink build(DataStream input, @Nullable Integer parallelis // 2. bucket-assigner HashBucketAssignerOperator assignerOperator = - new HashBucketAssignerOperator<>( + createHashBucketAssignerOperator( initialCommitUser, table, numAssigners, extractorFunction(), false); TupleTypeInfo> rowWithBucketType = new TupleTypeInfo<>(partitionByKeyHash.getType(), BasicTypeInfo.INT_TYPE_INFO); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index f369ec31c3d5..f5d207893ba8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -117,7 +117,13 @@ private StoreSinkWrite.Provider createWriteProvider( if (changelogProducer == ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) { int finalDeltaCommits = Math.max(deltaCommits, 1); - return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { + return (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> { assertNoSinkMaterializer.run(); return new GlobalFullCompactionSinkWrite( table, @@ -136,7 +142,13 @@ private StoreSinkWrite.Provider createWriteProvider( if (changelogProducer == ChangelogProducer.LOOKUP && !coreOptions.prepareCommitWaitCompaction()) { - return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { + return (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> { assertNoSinkMaterializer.run(); return new AsyncLookupSinkWrite( table, @@ -151,7 +163,13 @@ private StoreSinkWrite.Provider createWriteProvider( }; } - return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { + return (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> { assertNoSinkMaterializer.run(); return new StoreSinkWriteImpl( table, @@ -162,7 +180,8 @@ private StoreSinkWrite.Provider createWriteProvider( waitCompaction, isStreaming, memoryPool, - metricGroup); + metricGroup, + recordAttributeManager); }; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 8baee5ac1b91..dd73bd5590b6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -21,6 +21,7 @@ import org.apache.paimon.annotation.Public; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.FlinkRowWrapper; import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink; import org.apache.paimon.flink.sorter.TableSortInfo; import org.apache.paimon.flink.sorter.TableSorter; @@ -70,11 +71,11 @@ public class FlinkSinkBuilder { private static final Logger LOG = LoggerFactory.getLogger(FlinkSinkBuilder.class); - private final FileStoreTable table; + protected final FileStoreTable table; private DataStream input; - @Nullable private Map overwritePartition; - @Nullable private Integer parallelism; + @Nullable protected Map overwritePartition; + @Nullable protected Integer parallelism; private Boolean boundedInput = null; @Nullable private TableSortInfo tableSortInfo; @@ -221,7 +222,7 @@ public FlinkSinkBuilder clusteringIfPossible( /** Build {@link DataStreamSink}. */ public DataStreamSink build() { input = trySortInput(input); - DataStream input = MapToInternalRow.map(this.input, table.rowType()); + DataStream input = mapToInternalRow(this.input, table.rowType()); if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) { input = input.forward() @@ -247,7 +248,14 @@ public DataStreamSink build() { } } - private DataStreamSink buildDynamicBucketSink( + protected DataStream mapToInternalRow( + DataStream input, org.apache.paimon.types.RowType rowType) { + return input.map((MapFunction) FlinkRowWrapper::new) + .setParallelism(input.getParallelism()) + .returns(org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType(rowType)); + } + + protected DataStreamSink buildDynamicBucketSink( DataStream input, boolean globalIndex) { checkArgument(logSinkFunction == null, "Dynamic bucket mode can not work with log system."); return compactSink && !globalIndex @@ -260,7 +268,7 @@ private DataStreamSink buildDynamicBucketSink( .build(input, parallelism); } - private DataStreamSink buildForFixedBucket(DataStream input) { + protected DataStreamSink buildForFixedBucket(DataStream input) { DataStream partitioned = partition( input, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java index 62341a180dab..e6958cd5d541 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java @@ -85,7 +85,8 @@ public GlobalFullCompactionSinkWrite( waitCompaction, isStreaming, memoryPool, - metricGroup); + metricGroup, + null); this.deltaCommits = deltaCommits; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MapToInternalRow.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MapToInternalRow.java deleted file mode 100644 index a9bf74401099..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MapToInternalRow.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.sink; - -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.flink.FlinkRowWrapper; -import org.apache.paimon.flink.utils.InternalTypeInfo; -import org.apache.paimon.types.RowType; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.table.data.RowData; - -/** An util to convert {@link RowData} stream to {@link InternalRow} stream. */ -public class MapToInternalRow { - - public static DataStream map(DataStream input, RowType rowType) { - return input.map((MapFunction) FlinkRowWrapper::new) - .setParallelism(input.getParallelism()) - .returns(InternalTypeInfo.fromRowType(rowType)); - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index 52e494b5a9db..eda787f17a58 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -156,7 +156,8 @@ public void processElement(StreamRecord element) throws Exception { state, getContainingTask().getEnvironment().getIOManager(), memoryPool, - getMetricGroup())); + getMetricGroup(), + null)); if (write.streamingMode()) { write.notifyNewFiles(snapshotId, partition, bucket, files); @@ -256,7 +257,13 @@ private StoreSinkWrite.Provider createWriteProvider( if (changelogProducer == CoreOptions.ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) { int finalDeltaCommits = Math.max(deltaCommits, 1); - return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + return (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> new GlobalFullCompactionSinkWrite( table, commitUser, @@ -273,7 +280,13 @@ private StoreSinkWrite.Provider createWriteProvider( if (changelogProducer == CoreOptions.ChangelogProducer.LOOKUP && !coreOptions.prepareCommitWaitCompaction()) { - return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + return (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> new AsyncLookupSinkWrite( table, commitUser, @@ -286,7 +299,13 @@ private StoreSinkWrite.Provider createWriteProvider( metricGroup); } - return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + return (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> new StoreSinkWriteImpl( table, commitUser, @@ -296,6 +315,7 @@ private StoreSinkWrite.Provider createWriteProvider( waitCompaction, isStreaming, memoryPool, - metricGroup); + metricGroup, + recordAttributeManager); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java index 184288490772..dac0a29ca149 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java @@ -109,7 +109,7 @@ void initStateAndWriter( write = storeSinkWriteProvider.provide( - table, commitUser, state, ioManager, memoryPool, getMetricGroup()); + table, commitUser, state, ioManager, memoryPool, getMetricGroup(), null); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java index 6001721b71f3..703f4cf0bfb4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java @@ -26,6 +26,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.SinkRecord; import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.utils.RecordAttributeManager; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -81,7 +82,8 @@ StoreSinkWrite provide( StoreSinkWriteState state, IOManager ioManager, @Nullable MemorySegmentPool memoryPool, - @Nullable MetricGroup metricGroup); + @Nullable MetricGroup metricGroup, + @Nullable RecordAttributeManager recordAttributeManager); } /** Provider of {@link StoreSinkWrite} that uses given write buffer. */ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java index 3ecc80bb6f13..2cd70cb32eca 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java @@ -32,6 +32,7 @@ import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.SinkRecord; import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.utils.RecordAttributeManager; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -64,6 +65,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite { protected TableWriteImpl write; @Nullable private final MetricGroup metricGroup; + @Nullable private final RecordAttributeManager recordAttributeManager; public StoreSinkWriteImpl( FileStoreTable table, @@ -74,7 +76,8 @@ public StoreSinkWriteImpl( boolean waitCompaction, boolean isStreamingMode, @Nullable MemorySegmentPool memoryPool, - @Nullable MetricGroup metricGroup) { + @Nullable MetricGroup metricGroup, + @Nullable RecordAttributeManager recordAttributeManager) { this( table, commitUser, @@ -85,7 +88,8 @@ public StoreSinkWriteImpl( isStreamingMode, memoryPool, null, - metricGroup); + metricGroup, + recordAttributeManager); } public StoreSinkWriteImpl( @@ -108,7 +112,8 @@ public StoreSinkWriteImpl( isStreamingMode, null, memoryPoolFactory, - metricGroup); + metricGroup, + null); } private StoreSinkWriteImpl( @@ -121,7 +126,8 @@ private StoreSinkWriteImpl( boolean isStreamingMode, @Nullable MemorySegmentPool memoryPool, @Nullable MemoryPoolFactory memoryPoolFactory, - @Nullable MetricGroup metricGroup) { + @Nullable MetricGroup metricGroup, + @Nullable RecordAttributeManager recordAttributeManager) { this.commitUser = commitUser; this.state = state; this.paimonIOManager = new IOManagerImpl(ioManager.getSpillingDirectoriesPaths()); @@ -131,6 +137,7 @@ private StoreSinkWriteImpl( this.memoryPool = memoryPool; this.memoryPoolFactory = memoryPoolFactory; this.metricGroup = metricGroup; + this.recordAttributeManager = recordAttributeManager; this.write = newTableWrite(table); } @@ -143,7 +150,8 @@ private TableWriteImpl newTableWrite(FileStoreTable table) { table.newWrite( commitUser, (part, bucket) -> - state.stateValueFilter().filter(table.name(), part, bucket)) + state.stateValueFilter().filter(table.name(), part, bucket), + recordAttributeManager) .withIOManager(paimonIOManager) .withIgnorePreviousFiles(ignorePreviousFiles) .withExecutionMode(isStreamingMode) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java index f38e0ad6bfb5..cf2f9e15c564 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java @@ -23,6 +23,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.ChannelComputer; +import org.apache.paimon.utils.RecordAttributeManager; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.state.StateInitializationContext; @@ -40,6 +41,7 @@ public abstract class TableWriteOperator extends PrepareCommitOperator createTestHarnes protected StoreCompactOperator createCompactOperator(FileStoreTable table) { return new StoreCompactOperator( table, - (t, commitUser, state, ioManager, memoryPool, metricGroup) -> + (t, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> new StoreSinkWriteImpl( t, commitUser, @@ -254,7 +260,8 @@ protected StoreCompactOperator createCompactOperator(FileStoreTable table) { false, false, memoryPool, - metricGroup), + metricGroup, + recordAttributeManager), "test"); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java index 9cd2a73920fb..744982c0857f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java @@ -53,8 +53,13 @@ public void testCompactExactlyOnce(boolean streamingMode) throws Exception { StoreCompactOperator storeCompactOperator = new StoreCompactOperator( (FileStoreTable) getTableDefault(), - (table, commitUser, state, ioManager, memoryPool, metricGroup) -> - compactRememberStoreWrite, + (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> compactRememberStoreWrite, "10086"); storeCompactOperator.open(); StateInitializationContextImpl context = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java index a6a4d3e5088b..33fc310403c5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java @@ -113,7 +113,13 @@ private void testMetricsImpl(FileStoreTable fileStoreTable) throws Exception { new RowDataStoreWriteOperator( fileStoreTable, null, - (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> new StoreSinkWriteImpl( table, commitUser, @@ -123,7 +129,8 @@ private void testMetricsImpl(FileStoreTable fileStoreTable) throws Exception { false, true, memoryPool, - metricGroup), + metricGroup, + recordAttributeManager), "test"); OneInputStreamOperatorTestHarness harness = createHarness(operator); @@ -254,7 +261,13 @@ private RowDataStoreWriteOperator getAsyncLookupWriteOperator( return new RowDataStoreWriteOperator( fileStoreTable, null, - (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> new AsyncLookupSinkWrite( table, commitUser, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index edd6688da66d..f9ceff8ca329 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -185,7 +185,8 @@ public RecordWriter createMergeTreeWriter(BinaryRow partition, int buc null, options, EXTRACTOR, - tablePath.getName()) + tablePath.getName(), + null) .createWriterContainer(partition, bucket, true) .writer; ((MemoryOwner) writer)