diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 2878a6061fc4..382261e59679 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -207,6 +207,9 @@ public void sync() throws Exception { trySyncLatestCompaction(true); } + @Override + public void withInsertOnly(boolean insertOnly) {} + @Override public void close() throws Exception { // cancel compaction so that it does not block job cancelling diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 7b2785261887..9200ad29165e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -349,6 +349,26 @@ public DataFileMeta upgrade(int newLevel) { fileSource); } + public DataFileMeta rename(String newFileName) { + return new DataFileMeta( + newFileName, + fileSize, + rowCount, + minKey, + maxKey, + keyStats, + valueStats, + minSequenceNumber, + maxSequenceNumber, + schemaId, + level, + extraFiles, + creationTime, + deleteRowCount, + embeddedIndex, + fileSource); + } + public List collectFiles(DataFilePathFactory pathFactory) { List paths = new ArrayList<>(); paths.add(pathFactory.toPath(fileName)); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index 72f9b3f65153..78c37cf455ca 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -36,6 +36,7 @@ import javax.annotation.Nullable; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -123,6 +124,21 @@ public void deleteFile(String filename, int level) { fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename)); } + public void copyFile(String sourceFileName, String targetFileName, int level) + throws IOException { + Path sourcePath = formatContext.pathFactory(level).toPath(sourceFileName); + Path targetPath = formatContext.pathFactory(level).toPath(targetFileName); + fileIO.copyFile(sourcePath, targetPath); + } + + public FileIO getFileIO() { + return fileIO; + } + + public Path newChangelogPath(int level) { + return formatContext.pathFactory(level).newChangelogPath(); + } + public static Builder builder( FileIO fileIO, long schemaId, diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index cf2e6c25159b..13044d241cae 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -83,6 +83,8 @@ public class MergeTreeWriter implements RecordWriter, MemoryOwner { private long newSequenceNumber; private WriteBuffer writeBuffer; + private boolean isInsertOnly; + private boolean areAllRecordsInsertOnlySinceLastFlush; public MergeTreeWriter( boolean writeBufferSpillable, @@ -213,7 +215,7 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul } final RollingFileWriter changelogWriter = - changelogProducer == ChangelogProducer.INPUT + changelogProducer == ChangelogProducer.INPUT & !canChangelogOptimizedToCopy() ? writerFactory.createRollingChangelogFileWriter(0) : null; final RollingFileWriter dataWriter = @@ -232,16 +234,25 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul dataWriter.close(); } + List fileMetas = dataWriter.result(); if (changelogWriter != null) { newFilesChangelog.addAll(changelogWriter.result()); + } else if (canChangelogOptimizedToCopy()) { + List changelogMetas = getChangelogFileMetaFromDataFile(fileMetas); + for (int i = 0; i < changelogMetas.size(); i++) { + writerFactory.copyFile( + fileMetas.get(i).fileName(), changelogMetas.get(i).fileName(), 0); + } + newFilesChangelog.addAll(changelogMetas); } - for (DataFileMeta fileMeta : dataWriter.result()) { + for (DataFileMeta fileMeta : fileMetas) { newFiles.add(fileMeta); compactManager.addNewFile(fileMeta); } writeBuffer.clear(); + areAllRecordsInsertOnlySinceLastFlush = isInsertOnly; } trySyncLatestCompaction(waitForLatestCompaction); @@ -276,6 +287,32 @@ public void sync() throws Exception { trySyncLatestCompaction(true); } + @Override + public void withInsertOnly(boolean insertOnly) { + this.isInsertOnly = insertOnly; + if (writeBuffer == null || writeBuffer.size() == 0) { + areAllRecordsInsertOnlySinceLastFlush = isInsertOnly; + } else if (!isInsertOnly) { + areAllRecordsInsertOnlySinceLastFlush = false; + } + } + + private List getChangelogFileMetaFromDataFile( + List dataFileMetaList) { + List changelogFileMetaList = new ArrayList<>(); + for (DataFileMeta dataFileMeta : dataFileMetaList) { + DataFileMeta changelogFileMeta = + dataFileMeta.rename(writerFactory.newChangelogPath(0).getName()); + changelogFileMetaList.add(changelogFileMeta); + } + return changelogFileMetaList; + } + + private boolean canChangelogOptimizedToCopy() { + return changelogProducer == ChangelogProducer.INPUT + && areAllRecordsInsertOnlySinceLastFlush; + } + private CommitIncrement drainIncrement() { DataIncrement dataIncrement = new DataIncrement( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 0fd062d64ca7..459fec027f73 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -82,6 +82,7 @@ public abstract class AbstractFileStoreWrite implements FileStoreWrite { protected CompactionMetrics compactionMetrics = null; protected final String tableName; + private boolean isInsertOnly; protected AbstractFileStoreWrite( String commitUser, @@ -123,6 +124,16 @@ public void withCompactExecutor(ExecutorService compactExecutor) { this.closeCompactExecutorWhenLeaving = false; } + @Override + public void withInsertOnly(boolean insertOnly) { + this.isInsertOnly = insertOnly; + for (Map> containerMap : writers.values()) { + for (WriterContainer container : containerMap.values()) { + container.writer.withInsertOnly(insertOnly); + } + } + } + @Override public void write(BinaryRow partition, int bucket, T data) throws Exception { WriterContainer container = getWriterWrapper(partition, bucket); @@ -404,6 +415,7 @@ public WriterContainer createWriterContainer( null, compactExecutor(), deletionVectorsMaintainer); + writer.withInsertOnly(isInsertOnly); notifyNewWriter(writer); return new WriterContainer<>( writer, indexMaintainer, deletionVectorsMaintainer, latestSnapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java index 7f8fee45faeb..993e07d532a3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java @@ -85,6 +85,15 @@ default FileStoreWrite withMemoryPool(MemorySegmentPool memoryPool) { void withCompactExecutor(ExecutorService compactExecutor); + /** + * This method is called when the insert only status of the records changes. + * + * @param insertOnly If true, all the following records would be of {@link + * org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary + * key. + */ + void withInsertOnly(boolean insertOnly); + /** * Write the data to the store according to the partition and bucket. * diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java index 0f25f8219eb2..d88aa2b8ad01 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java @@ -40,6 +40,15 @@ public interface TableWrite extends AutoCloseable { /** With {@link MemorySegmentPool} for the current table write. */ TableWrite withMemoryPool(MemorySegmentPool memoryPool); + /** + * This method is called when the insert only status of the records changes. + * + * @param insertOnly If true, all the following records would be of {@link + * org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary + * key. + */ + void withInsertOnly(boolean insertOnly); + /** Calculate which partition {@code row} belongs to. */ BinaryRow getPartition(InternalRow row); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index cd1af85d3be8..6e2194646d2a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -108,6 +108,11 @@ public TableWriteImpl withBucketMode(BucketMode bucketMode) { return this; } + @Override + public void withInsertOnly(boolean insertOnly) { + write.withInsertOnly(insertOnly); + } + @Override public BinaryRow getPartition(InternalRow row) { keyAndBucketExtractor.setRecord(row); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java index 5795290f1d96..2ccf4cc861f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java @@ -73,6 +73,15 @@ public interface RecordWriter { */ void sync() throws Exception; + /** + * This method is called when the insert only status of the records changes. + * + * @param insertOnly If true, all the following records would be of {@link + * org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary + * key. + */ + void withInsertOnly(boolean insertOnly); + /** Close this writer, the call will delete newly generated but not committed files. */ void close() throws Exception; } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 92f14b3ed41c..8818c1290c5d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -367,6 +367,29 @@ public void testWriteMany() throws Exception { doTestWriteRead(3, 20_000); } + @Test + public void testChangelog() throws Exception { + writer = + createMergeTreeWriter( + Collections.emptyList(), + createCompactManager(service, Collections.emptyList()), + ChangelogProducer.INPUT); + + doTestWriteReadWithChangelog(8, 200, false); + } + + @Test + public void testChangelogFromCopyingData() throws Exception { + writer = + createMergeTreeWriter( + Collections.emptyList(), + createCompactManager(service, Collections.emptyList()), + ChangelogProducer.INPUT); + writer.withInsertOnly(true); + + doTestWriteReadWithChangelog(8, 200, true); + } + private void doTestWriteRead(int batchNumber) throws Exception { doTestWriteRead(batchNumber, 200); } @@ -413,12 +436,77 @@ private void doTestWriteRead(int batchNumber, int perBatch) throws Exception { assertThat(files).isEqualTo(Collections.emptySet()); } + private void doTestWriteReadWithChangelog( + int batchNumber, int perBatch, boolean isChangelogEqualToData) throws Exception { + List expected = new ArrayList<>(); + List newFiles = new ArrayList<>(); + List changelogFiles = new ArrayList<>(); + Set newFileNames = new HashSet<>(); + List compactedFiles = new ArrayList<>(); + + // write batch and commit + for (int i = 0; i <= batchNumber; i++) { + if (i < batchNumber) { + expected.addAll(writeBatch(perBatch)); + } else { + writer.sync(); + } + + CommitIncrement increment = writer.prepareCommit(true); + newFiles.addAll(increment.newFilesIncrement().newFiles()); + changelogFiles.addAll(increment.newFilesIncrement().changelogFiles()); + mergeCompacted(newFileNames, compactedFiles, increment); + } + + // assert records from writer + assertRecords(expected); + + // assert records from increment new files + assertRecords(expected, newFiles, false); + assertRecords(expected, newFiles, true); + + // assert records from changelog files + if (isChangelogEqualToData) { + assertRecords(expected, changelogFiles, false); + assertRecords(expected, changelogFiles, true); + } else { + List actual = new ArrayList<>(); + for (DataFileMeta changelogFile : changelogFiles) { + actual.addAll(readAll(Collections.singletonList(changelogFile), false)); + } + assertThat(actual).containsExactlyInAnyOrder(expected.toArray(new TestRecord[0])); + } + + // assert records from increment compacted files + assertRecords(expected, compactedFiles, true); + + writer.close(); + + Path bucketDir = writerFactory.pathFactory(0).toPath("ignore").getParent(); + Set files = + Arrays.stream(LocalFileIO.create().listStatus(bucketDir)) + .map(FileStatus::getPath) + .map(Path::getName) + .collect(Collectors.toSet()); + newFiles.stream().map(DataFileMeta::fileName).forEach(files::remove); + changelogFiles.stream().map(DataFileMeta::fileName).forEach(files::remove); + compactedFiles.stream().map(DataFileMeta::fileName).forEach(files::remove); + assertThat(files).isEqualTo(Collections.emptySet()); + } + private MergeTreeWriter createMergeTreeWriter(List files) { return createMergeTreeWriter(files, createCompactManager(service, files)); } private MergeTreeWriter createMergeTreeWriter( List files, MergeTreeCompactManager compactManager) { + return createMergeTreeWriter(files, compactManager, ChangelogProducer.NONE); + } + + private MergeTreeWriter createMergeTreeWriter( + List files, + MergeTreeCompactManager compactManager, + ChangelogProducer changelogProducer) { long maxSequenceNumber = files.stream().map(DataFileMeta::maxSequenceNumber).max(Long::compare).orElse(-1L); MergeTreeWriter writer = @@ -434,7 +522,7 @@ private MergeTreeWriter createMergeTreeWriter( DeduplicateMergeFunction.factory().create(), writerFactory, options.commitForceCompact(), - ChangelogProducer.NONE, + changelogProducer, null, null); writer.setMemoryPool( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java index 6001721b71f3..a432a55454ab 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java @@ -39,6 +39,15 @@ /** Helper class of {@link PrepareCommitOperator} for different types of paimon sinks. */ public interface StoreSinkWrite { + /** + * This method is called when the insert only status of the records changes. + * + * @param insertOnly If true, all the following records would be of {@link + * org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary + * key. + */ + void withInsertOnly(boolean insertOnly); + @Nullable SinkRecord write(InternalRow rowData) throws Exception; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java index 3ecc80bb6f13..bdaf7bc327be 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java @@ -169,6 +169,11 @@ public void withCompactExecutor(ExecutorService compactExecutor) { write.withCompactExecutor(compactExecutor); } + @Override + public void withInsertOnly(boolean insertOnly) { + write.withInsertOnly(insertOnly); + } + @Override @Nullable public SinkRecord write(InternalRow rowData) throws Exception { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java index 9cd2a73920fb..eb7756718738 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java @@ -102,6 +102,9 @@ public CompactRememberStoreWrite(boolean streamingMode) { this.streamingMode = streamingMode; } + @Override + public void withInsertOnly(boolean insertOnly) {} + @Override public SinkRecord write(InternalRow rowData) { return null; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java index a6a4d3e5088b..d294dad79d4a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java @@ -40,6 +40,7 @@ import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.flink.api.common.ExecutionConfig; @@ -54,6 +55,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -283,6 +285,101 @@ private void commitAll( commit.commit(commitIdentifier, commitMessages); } + @Test + public void testChangelog() throws Exception { + testChangelog(false); + } + + @Test + public void testChangelogWithInsertOnly() throws Exception { + testChangelog(true); + } + + private void testChangelog(boolean insertOnly) throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.INT()}, + new String[] {"pt", "k", "v"}); + + Options options = new Options(); + options.set("bucket", "1"); + options.set("changelog-producer", "input"); + + FileStoreTable fileStoreTable = + createFileStoreTable( + rowType, Arrays.asList("pt", "k"), Collections.singletonList("k"), options); + + RowDataStoreWriteOperator operator = + new RowDataStoreWriteOperator( + fileStoreTable, + null, + (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + new StoreSinkWriteImpl( + table, + commitUser, + state, + ioManager, + false, + false, + true, + memoryPool, + metricGroup), + "test"); + + OneInputStreamOperatorTestHarness harness = + createHarness(operator); + + TableCommitImpl commit = fileStoreTable.newCommit("test"); + + TypeSerializer serializer = + new CommittableTypeInfo().createSerializer(new ExecutionConfig()); + harness.setup(serializer); + harness.open(); + + if (insertOnly) { + Field field = TableWriteOperator.class.getDeclaredField("write"); + field.setAccessible(true); + StoreSinkWrite write = (StoreSinkWrite) field.get(operator); + write.withInsertOnly(true); + } + + // write basic records + harness.processElement(GenericRow.ofKind(RowKind.INSERT, 1, 10, 100), 1); + harness.processElement(GenericRow.ofKind(RowKind.DELETE, 1, 10, 200), 2); + harness.processElement(GenericRow.ofKind(RowKind.INSERT, 1, 10, 300), 3); + harness.prepareSnapshotPreBarrier(1); + harness.snapshot(1, 10); + harness.notifyOfCompletedCheckpoint(1); + commitAll(harness, commit, 1); + harness.close(); + commit.close(); + + // check result + ReadBuilder readBuilder = fileStoreTable.newReadBuilder(); + StreamTableScan scan = readBuilder.newStreamScan(); + scan.restore(1L); + List splits = scan.plan().splits(); + TableRead read = readBuilder.newRead(); + RecordReader reader = read.createReader(splits); + List actual = new ArrayList<>(); + reader.forEachRemaining( + row -> + actual.add( + String.format( + "%s[%d, %d, %d]", + row.getRowKind().shortString(), + row.getInt(0), + row.getInt(1), + row.getInt(2)))); + if (insertOnly) { + assertThat(actual).containsExactlyInAnyOrder("+I[1, 10, 300]"); + } else { + assertThat(actual) + .containsExactlyInAnyOrder( + "+I[1, 10, 100]", "-D[1, 10, 200]", "+I[1, 10, 300]"); + } + } + private FileStoreTable createFileStoreTable( RowType rowType, List primaryKeys, List partitionKeys, Options conf) throws Exception {