From 1812496ce28be98a111cb456664630774b515be0 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Thu, 27 Jun 2024 09:18:24 +0800 Subject: [PATCH 1/2] [core] Support non-utf8 copy on FileIO --- .../java/org/apache/paimon/fs/FileIO.java | 17 ++ .../apache/paimon/fs/local/LocalFileIO.java | 13 ++ .../java/org/apache/paimon/fs/FileIOTest.java | 188 +++++++++++++++++- .../paimon/fs/local/LocalFleIOTest.java | 66 ++++++ .../src/test/resources/test-data.orc | Bin 0 -> 369 bytes 5 files changed, 283 insertions(+), 1 deletion(-) create mode 100644 paimon-common/src/test/java/org/apache/paimon/fs/local/LocalFleIOTest.java create mode 100644 paimon-common/src/test/resources/test-data.orc diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index f31b00af1726..173b72fd8b19 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -23,6 +23,7 @@ import org.apache.paimon.fs.hadoop.HadoopFileIOLoader; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -267,6 +268,22 @@ default void overwriteFileUtf8(Path path, String content) throws IOException { } } + /** + * Copy content of one file into another. + * + * @return false if targetPath file exists + */ + default boolean copyFile(Path sourcePath, Path targetPath) throws IOException { + if (exists(targetPath)) { + return false; + } + try (SeekableInputStream is = newInputStream(sourcePath); + PositionOutputStream os = newOutputStream(targetPath, false)) { + IOUtils.copy(is, os); + } + return true; + } + /** * Read file to UTF_8 decoding, then write content to one file atomically, initially writes to * temp hidden file and only renames to the target file once temp file is closed. diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java index 82d8145a2927..61154ad62801 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java @@ -217,6 +217,19 @@ public boolean rename(Path src, Path dst) throws IOException { } } + @Override + public boolean copyFile(Path sourcePath, Path targetPath) throws IOException { + if (exists(targetPath)) { + return false; + } + Files.copy(toPath(sourcePath), toPath(targetPath), StandardCopyOption.COPY_ATTRIBUTES); + return true; + } + + private java.nio.file.Path toPath(Path path) { + return toFile(path).toPath(); + } + /** * Converts the given Path to a File for this file system. If the path is empty, we will return * new File(".") instead of new File(""), since the latter returns diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java index c1b86a0b20ea..1617fe90e9f0 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java @@ -19,19 +19,31 @@ package org.apache.paimon.fs; import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; +import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.AccessDeniedException; +import java.nio.file.DirectoryNotEmptyException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.StandardCopyOption; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import static org.apache.paimon.utils.Preconditions.checkState; import static org.assertj.core.api.Assertions.assertThat; -/** Test {@link FileIO}. */ +/** Test static methods and methods with default implementations of {@link FileIO}. */ public class FileIOTest { @TempDir java.nio.file.Path tempDir; @@ -59,6 +71,36 @@ public void testRequireOptions() throws IOException { assertThat(fileIO).isInstanceOf(RequireOptionsFileIOLoader.MyFileIO.class); } + @Test + public void testCopy() throws Exception { + Path srcFile = new Path(tempDir.resolve("src.txt").toUri()); + Path dstFile = new Path(tempDir.resolve("dst.txt").toUri()); + + FileIO fileIO = new DummyFileIO(); + fileIO.writeFileUtf8(srcFile, "foobar"); + + assertThat(fileIO.copyFileUtf8(srcFile, dstFile)).isTrue(); + assertThat(fileIO.readFileUtf8(dstFile)).isEqualTo("foobar"); + fileIO.deleteQuietly(dstFile); + + assertThat(fileIO.copyFile(srcFile, dstFile)).isTrue(); + assertThat(fileIO.readFileUtf8(dstFile)).isEqualTo("foobar"); + fileIO.deleteQuietly(dstFile); + + fileIO.deleteQuietly(srcFile); + srcFile = new Path(this.getClass().getClassLoader().getResource("test-data.orc").toURI()); + + fileIO.copyFileUtf8(srcFile, dstFile); + assertThat(FileUtils.contentEquals(new File(srcFile.toUri()), new File(dstFile.toUri()))) + .isFalse(); + fileIO.deleteQuietly(dstFile); + + fileIO.copyFile(srcFile, dstFile); + assertThat(FileUtils.contentEquals(new File(srcFile.toUri()), new File(dstFile.toUri()))) + .isTrue(); + fileIO.deleteQuietly(dstFile); + } + public static void testOverwriteFileUtf8(Path file, FileIO fileIO) throws InterruptedException { AtomicReference exception = new AtomicReference<>(); final int max = 10; @@ -106,4 +148,148 @@ public static void testOverwriteFileUtf8(Path file, FileIO fileIO) throws Interr assertThat(exception.get()).isNull(); } + + /** A {@link FileIO} on local filesystem to test the default copy implementation. */ + private static class DummyFileIO implements FileIO { + private static final ReentrantLock RENAME_LOCK = new ReentrantLock(); + + @Override + public boolean isObjectStore() { + throw new UnsupportedOperationException(); + } + + @Override + public void configure(CatalogContext context) { + throw new UnsupportedOperationException(); + } + + @Override + public SeekableInputStream newInputStream(Path path) throws FileNotFoundException { + return new LocalFileIO.LocalSeekableInputStream(toFile(path)); + } + + @Override + public PositionOutputStream newOutputStream(Path path, boolean overwrite) + throws IOException { + if (exists(path) && !overwrite) { + throw new FileAlreadyExistsException("File already exists: " + path); + } + + Path parent = path.getParent(); + if (parent != null && !mkdirs(parent)) { + throw new IOException("Mkdirs failed to create " + parent); + } + + return new LocalFileIO.LocalPositionOutputStream(toFile(path)); + } + + @Override + public FileStatus getFileStatus(Path path) { + throw new UnsupportedOperationException(); + } + + @Override + public FileStatus[] listStatus(Path path) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean exists(Path path) { + return toFile(path).exists(); + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + File file = toFile(path); + if (file.isFile()) { + return file.delete(); + } else if ((!recursive) && file.isDirectory()) { + File[] containedFiles = file.listFiles(); + if (containedFiles == null) { + throw new IOException( + "Directory " + file + " does not exist or an I/O error occurred"); + } else if (containedFiles.length != 0) { + throw new IOException("Directory " + file + " is not empty"); + } + } + + return delete(file); + } + + private boolean delete(final File f) { + if (f.isDirectory()) { + final File[] files = f.listFiles(); + if (files != null) { + for (File file : files) { + final boolean del = delete(file); + if (!del) { + return false; + } + } + } + } else { + return f.delete(); + } + + // Now directory is empty + return f.delete(); + } + + @Override + public boolean mkdirs(Path path) throws IOException { + return mkdirsInternal(toFile(path)); + } + + private boolean mkdirsInternal(File file) throws IOException { + if (file.isDirectory()) { + return true; + } else if (file.exists() && !file.isDirectory()) { + // Important: The 'exists()' check above must come before the 'isDirectory()' check + // to + // be safe when multiple parallel instances try to create the directory + + // exists and is not a directory -> is a regular file + throw new FileAlreadyExistsException(file.getAbsolutePath()); + } else { + File parent = file.getParentFile(); + return (parent == null || mkdirsInternal(parent)) + && (file.mkdir() || file.isDirectory()); + } + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + File srcFile = toFile(src); + File dstFile = toFile(dst); + File dstParent = dstFile.getParentFile(); + dstParent.mkdirs(); + try { + RENAME_LOCK.lock(); + if (dstFile.exists()) { + return false; + } + Files.move(srcFile.toPath(), dstFile.toPath(), StandardCopyOption.ATOMIC_MOVE); + return true; + } catch (NoSuchFileException + | AccessDeniedException + | DirectoryNotEmptyException + | SecurityException e) { + return false; + } finally { + RENAME_LOCK.unlock(); + } + } + + private File toFile(Path path) { + // remove scheme + String localPath = path.toUri().getPath(); + checkState(localPath != null, "Cannot convert a null path to File"); + + if (localPath.length() == 0) { + return new File("."); + } + + return new File(localPath); + } + } } diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/local/LocalFleIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/local/LocalFleIOTest.java new file mode 100644 index 000000000000..5fb4445468ef --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/fs/local/LocalFleIOTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs.local; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; + +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link LocalFileIO}. */ +public class LocalFleIOTest { + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testCopy() throws Exception { + Path srcFile = new Path(tempDir.resolve("src.txt").toUri()); + Path dstFile = new Path(tempDir.resolve("dst.txt").toUri()); + + FileIO fileIO = new LocalFileIO(); + fileIO.writeFileUtf8(srcFile, "foobar"); + + assertThat(fileIO.copyFileUtf8(srcFile, dstFile)).isTrue(); + assertThat(fileIO.readFileUtf8(dstFile)).isEqualTo("foobar"); + fileIO.deleteQuietly(dstFile); + + assertThat(fileIO.copyFile(srcFile, dstFile)).isTrue(); + assertThat(fileIO.readFileUtf8(dstFile)).isEqualTo("foobar"); + fileIO.deleteQuietly(dstFile); + + fileIO.deleteQuietly(srcFile); + srcFile = new Path(this.getClass().getClassLoader().getResource("test-data.orc").toURI()); + + fileIO.copyFileUtf8(srcFile, dstFile); + assertThat(FileUtils.contentEquals(new File(srcFile.toUri()), new File(dstFile.toUri()))) + .isFalse(); + fileIO.deleteQuietly(dstFile); + + fileIO.copyFile(srcFile, dstFile); + assertThat(FileUtils.contentEquals(new File(srcFile.toUri()), new File(dstFile.toUri()))) + .isTrue(); + fileIO.deleteQuietly(dstFile); + } +} diff --git a/paimon-common/src/test/resources/test-data.orc b/paimon-common/src/test/resources/test-data.orc new file mode 100644 index 0000000000000000000000000000000000000000..8a8c39f4e9500e998dba3887656443407ae9b46f GIT binary patch literal 369 zcmeZI%3@>@ODrqO*DFrWNX<=L!xF8OSDKTfq*JX_Qdy9yWTjM;nw(#hqNJmgmzaye zFD^(-1_|aDrRyaE*%_&N1&Nut`FVO^L(^09Qi~ExQbF3&GE;L>ij}OQt6?T(14SX0 zz${hD%qvlf)lo`GO-n4zDN(Wlxuq897O36v!LEU!u71w0@qVGcPOd?41C(;|^U{@& zRfakGgu2Fid-}N`6oE`fQd0((!Eb+TZLA2`JIVPesmVfB#U+V(DTzfX496_MU|?cm01_A~0Q%m2i2wiq literal 0 HcmV?d00001 From 62d34d35edfd01c72a5bcca982a18cfb5e3a3aea Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Tue, 25 Jun 2024 16:25:17 +0800 Subject: [PATCH 2/2] [core] Generate changelog by copying data when records are insert-only --- .../paimon/append/AppendOnlyWriter.java | 3 + .../org/apache/paimon/io/DataFileMeta.java | 20 ++++ .../paimon/io/KeyValueFileWriterFactory.java | 16 +++ .../paimon/mergetree/MergeTreeWriter.java | 28 +++++- .../operation/AbstractFileStoreWrite.java | 12 +++ .../paimon/operation/FileStoreWrite.java | 9 ++ .../apache/paimon/table/sink/TableWrite.java | 9 ++ .../paimon/table/sink/TableWriteImpl.java | 5 + .../org/apache/paimon/utils/RecordWriter.java | 9 ++ .../paimon/mergetree/MergeTreeTestBase.java | 90 ++++++++++++++++- .../paimon/flink/sink/StoreSinkWrite.java | 9 ++ .../paimon/flink/sink/StoreSinkWriteImpl.java | 5 + .../flink/sink/StoreCompactOperatorTest.java | 3 + .../paimon/flink/sink/WriterOperatorTest.java | 97 +++++++++++++++++++ 14 files changed, 310 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 2878a6061fc4..382261e59679 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -207,6 +207,9 @@ public void sync() throws Exception { trySyncLatestCompaction(true); } + @Override + public void withInsertOnly(boolean insertOnly) {} + @Override public void close() throws Exception { // cancel compaction so that it does not block job cancelling diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 7b2785261887..9200ad29165e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -349,6 +349,26 @@ public DataFileMeta upgrade(int newLevel) { fileSource); } + public DataFileMeta rename(String newFileName) { + return new DataFileMeta( + newFileName, + fileSize, + rowCount, + minKey, + maxKey, + keyStats, + valueStats, + minSequenceNumber, + maxSequenceNumber, + schemaId, + level, + extraFiles, + creationTime, + deleteRowCount, + embeddedIndex, + fileSource); + } + public List collectFiles(DataFilePathFactory pathFactory) { List paths = new ArrayList<>(); paths.add(pathFactory.toPath(fileName)); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index 72f9b3f65153..78c37cf455ca 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -36,6 +36,7 @@ import javax.annotation.Nullable; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -123,6 +124,21 @@ public void deleteFile(String filename, int level) { fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename)); } + public void copyFile(String sourceFileName, String targetFileName, int level) + throws IOException { + Path sourcePath = formatContext.pathFactory(level).toPath(sourceFileName); + Path targetPath = formatContext.pathFactory(level).toPath(targetFileName); + fileIO.copyFile(sourcePath, targetPath); + } + + public FileIO getFileIO() { + return fileIO; + } + + public Path newChangelogPath(int level) { + return formatContext.pathFactory(level).newChangelogPath(); + } + public static Builder builder( FileIO fileIO, long schemaId, diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index cf2e6c25159b..aa60c884660f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -83,6 +83,7 @@ public class MergeTreeWriter implements RecordWriter, MemoryOwner { private long newSequenceNumber; private WriteBuffer writeBuffer; + private boolean isInsertOnly; public MergeTreeWriter( boolean writeBufferSpillable, @@ -213,7 +214,7 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul } final RollingFileWriter changelogWriter = - changelogProducer == ChangelogProducer.INPUT + (changelogProducer == ChangelogProducer.INPUT && !isInsertOnly) ? writerFactory.createRollingChangelogFileWriter(0) : null; final RollingFileWriter dataWriter = @@ -232,13 +233,23 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul dataWriter.close(); } + List dataMetas = dataWriter.result(); if (changelogWriter != null) { newFilesChangelog.addAll(changelogWriter.result()); + } else if (changelogProducer == ChangelogProducer.INPUT && isInsertOnly) { + List changelogMetas = new ArrayList<>(); + for (DataFileMeta dataMeta : dataMetas) { + DataFileMeta changelogMeta = + dataMeta.rename(writerFactory.newChangelogPath(0).getName()); + writerFactory.copyFile(dataMeta.fileName(), changelogMeta.fileName(), 0); + changelogMetas.add(changelogMeta); + } + newFilesChangelog.addAll(changelogMetas); } - for (DataFileMeta fileMeta : dataWriter.result()) { - newFiles.add(fileMeta); - compactManager.addNewFile(fileMeta); + for (DataFileMeta dataMeta : dataMetas) { + newFiles.add(dataMeta); + compactManager.addNewFile(dataMeta); } writeBuffer.clear(); @@ -276,6 +287,15 @@ public void sync() throws Exception { trySyncLatestCompaction(true); } + @Override + public void withInsertOnly(boolean insertOnly) { + if (insertOnly && writeBuffer != null && writeBuffer.size() > 0) { + throw new IllegalStateException( + "Insert-only can only be set before any record is received."); + } + this.isInsertOnly = insertOnly; + } + private CommitIncrement drainIncrement() { DataIncrement dataIncrement = new DataIncrement( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 0fd062d64ca7..459fec027f73 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -82,6 +82,7 @@ public abstract class AbstractFileStoreWrite implements FileStoreWrite { protected CompactionMetrics compactionMetrics = null; protected final String tableName; + private boolean isInsertOnly; protected AbstractFileStoreWrite( String commitUser, @@ -123,6 +124,16 @@ public void withCompactExecutor(ExecutorService compactExecutor) { this.closeCompactExecutorWhenLeaving = false; } + @Override + public void withInsertOnly(boolean insertOnly) { + this.isInsertOnly = insertOnly; + for (Map> containerMap : writers.values()) { + for (WriterContainer container : containerMap.values()) { + container.writer.withInsertOnly(insertOnly); + } + } + } + @Override public void write(BinaryRow partition, int bucket, T data) throws Exception { WriterContainer container = getWriterWrapper(partition, bucket); @@ -404,6 +415,7 @@ public WriterContainer createWriterContainer( null, compactExecutor(), deletionVectorsMaintainer); + writer.withInsertOnly(isInsertOnly); notifyNewWriter(writer); return new WriterContainer<>( writer, indexMaintainer, deletionVectorsMaintainer, latestSnapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java index 7f8fee45faeb..993e07d532a3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java @@ -85,6 +85,15 @@ default FileStoreWrite withMemoryPool(MemorySegmentPool memoryPool) { void withCompactExecutor(ExecutorService compactExecutor); + /** + * This method is called when the insert only status of the records changes. + * + * @param insertOnly If true, all the following records would be of {@link + * org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary + * key. + */ + void withInsertOnly(boolean insertOnly); + /** * Write the data to the store according to the partition and bucket. * diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java index 0f25f8219eb2..d88aa2b8ad01 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java @@ -40,6 +40,15 @@ public interface TableWrite extends AutoCloseable { /** With {@link MemorySegmentPool} for the current table write. */ TableWrite withMemoryPool(MemorySegmentPool memoryPool); + /** + * This method is called when the insert only status of the records changes. + * + * @param insertOnly If true, all the following records would be of {@link + * org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary + * key. + */ + void withInsertOnly(boolean insertOnly); + /** Calculate which partition {@code row} belongs to. */ BinaryRow getPartition(InternalRow row); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index cd1af85d3be8..6e2194646d2a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -108,6 +108,11 @@ public TableWriteImpl withBucketMode(BucketMode bucketMode) { return this; } + @Override + public void withInsertOnly(boolean insertOnly) { + write.withInsertOnly(insertOnly); + } + @Override public BinaryRow getPartition(InternalRow row) { keyAndBucketExtractor.setRecord(row); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java index 5795290f1d96..2ccf4cc861f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java @@ -73,6 +73,15 @@ public interface RecordWriter { */ void sync() throws Exception; + /** + * This method is called when the insert only status of the records changes. + * + * @param insertOnly If true, all the following records would be of {@link + * org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary + * key. + */ + void withInsertOnly(boolean insertOnly); + /** Close this writer, the call will delete newly generated but not committed files. */ void close() throws Exception; } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 92f14b3ed41c..8818c1290c5d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -367,6 +367,29 @@ public void testWriteMany() throws Exception { doTestWriteRead(3, 20_000); } + @Test + public void testChangelog() throws Exception { + writer = + createMergeTreeWriter( + Collections.emptyList(), + createCompactManager(service, Collections.emptyList()), + ChangelogProducer.INPUT); + + doTestWriteReadWithChangelog(8, 200, false); + } + + @Test + public void testChangelogFromCopyingData() throws Exception { + writer = + createMergeTreeWriter( + Collections.emptyList(), + createCompactManager(service, Collections.emptyList()), + ChangelogProducer.INPUT); + writer.withInsertOnly(true); + + doTestWriteReadWithChangelog(8, 200, true); + } + private void doTestWriteRead(int batchNumber) throws Exception { doTestWriteRead(batchNumber, 200); } @@ -413,12 +436,77 @@ private void doTestWriteRead(int batchNumber, int perBatch) throws Exception { assertThat(files).isEqualTo(Collections.emptySet()); } + private void doTestWriteReadWithChangelog( + int batchNumber, int perBatch, boolean isChangelogEqualToData) throws Exception { + List expected = new ArrayList<>(); + List newFiles = new ArrayList<>(); + List changelogFiles = new ArrayList<>(); + Set newFileNames = new HashSet<>(); + List compactedFiles = new ArrayList<>(); + + // write batch and commit + for (int i = 0; i <= batchNumber; i++) { + if (i < batchNumber) { + expected.addAll(writeBatch(perBatch)); + } else { + writer.sync(); + } + + CommitIncrement increment = writer.prepareCommit(true); + newFiles.addAll(increment.newFilesIncrement().newFiles()); + changelogFiles.addAll(increment.newFilesIncrement().changelogFiles()); + mergeCompacted(newFileNames, compactedFiles, increment); + } + + // assert records from writer + assertRecords(expected); + + // assert records from increment new files + assertRecords(expected, newFiles, false); + assertRecords(expected, newFiles, true); + + // assert records from changelog files + if (isChangelogEqualToData) { + assertRecords(expected, changelogFiles, false); + assertRecords(expected, changelogFiles, true); + } else { + List actual = new ArrayList<>(); + for (DataFileMeta changelogFile : changelogFiles) { + actual.addAll(readAll(Collections.singletonList(changelogFile), false)); + } + assertThat(actual).containsExactlyInAnyOrder(expected.toArray(new TestRecord[0])); + } + + // assert records from increment compacted files + assertRecords(expected, compactedFiles, true); + + writer.close(); + + Path bucketDir = writerFactory.pathFactory(0).toPath("ignore").getParent(); + Set files = + Arrays.stream(LocalFileIO.create().listStatus(bucketDir)) + .map(FileStatus::getPath) + .map(Path::getName) + .collect(Collectors.toSet()); + newFiles.stream().map(DataFileMeta::fileName).forEach(files::remove); + changelogFiles.stream().map(DataFileMeta::fileName).forEach(files::remove); + compactedFiles.stream().map(DataFileMeta::fileName).forEach(files::remove); + assertThat(files).isEqualTo(Collections.emptySet()); + } + private MergeTreeWriter createMergeTreeWriter(List files) { return createMergeTreeWriter(files, createCompactManager(service, files)); } private MergeTreeWriter createMergeTreeWriter( List files, MergeTreeCompactManager compactManager) { + return createMergeTreeWriter(files, compactManager, ChangelogProducer.NONE); + } + + private MergeTreeWriter createMergeTreeWriter( + List files, + MergeTreeCompactManager compactManager, + ChangelogProducer changelogProducer) { long maxSequenceNumber = files.stream().map(DataFileMeta::maxSequenceNumber).max(Long::compare).orElse(-1L); MergeTreeWriter writer = @@ -434,7 +522,7 @@ private MergeTreeWriter createMergeTreeWriter( DeduplicateMergeFunction.factory().create(), writerFactory, options.commitForceCompact(), - ChangelogProducer.NONE, + changelogProducer, null, null); writer.setMemoryPool( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java index 6001721b71f3..a432a55454ab 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java @@ -39,6 +39,15 @@ /** Helper class of {@link PrepareCommitOperator} for different types of paimon sinks. */ public interface StoreSinkWrite { + /** + * This method is called when the insert only status of the records changes. + * + * @param insertOnly If true, all the following records would be of {@link + * org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary + * key. + */ + void withInsertOnly(boolean insertOnly); + @Nullable SinkRecord write(InternalRow rowData) throws Exception; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java index 3ecc80bb6f13..bdaf7bc327be 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java @@ -169,6 +169,11 @@ public void withCompactExecutor(ExecutorService compactExecutor) { write.withCompactExecutor(compactExecutor); } + @Override + public void withInsertOnly(boolean insertOnly) { + write.withInsertOnly(insertOnly); + } + @Override @Nullable public SinkRecord write(InternalRow rowData) throws Exception { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java index 9cd2a73920fb..eb7756718738 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java @@ -102,6 +102,9 @@ public CompactRememberStoreWrite(boolean streamingMode) { this.streamingMode = streamingMode; } + @Override + public void withInsertOnly(boolean insertOnly) {} + @Override public SinkRecord write(InternalRow rowData) { return null; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java index a6a4d3e5088b..d294dad79d4a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java @@ -40,6 +40,7 @@ import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.flink.api.common.ExecutionConfig; @@ -54,6 +55,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -283,6 +285,101 @@ private void commitAll( commit.commit(commitIdentifier, commitMessages); } + @Test + public void testChangelog() throws Exception { + testChangelog(false); + } + + @Test + public void testChangelogWithInsertOnly() throws Exception { + testChangelog(true); + } + + private void testChangelog(boolean insertOnly) throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.INT()}, + new String[] {"pt", "k", "v"}); + + Options options = new Options(); + options.set("bucket", "1"); + options.set("changelog-producer", "input"); + + FileStoreTable fileStoreTable = + createFileStoreTable( + rowType, Arrays.asList("pt", "k"), Collections.singletonList("k"), options); + + RowDataStoreWriteOperator operator = + new RowDataStoreWriteOperator( + fileStoreTable, + null, + (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + new StoreSinkWriteImpl( + table, + commitUser, + state, + ioManager, + false, + false, + true, + memoryPool, + metricGroup), + "test"); + + OneInputStreamOperatorTestHarness harness = + createHarness(operator); + + TableCommitImpl commit = fileStoreTable.newCommit("test"); + + TypeSerializer serializer = + new CommittableTypeInfo().createSerializer(new ExecutionConfig()); + harness.setup(serializer); + harness.open(); + + if (insertOnly) { + Field field = TableWriteOperator.class.getDeclaredField("write"); + field.setAccessible(true); + StoreSinkWrite write = (StoreSinkWrite) field.get(operator); + write.withInsertOnly(true); + } + + // write basic records + harness.processElement(GenericRow.ofKind(RowKind.INSERT, 1, 10, 100), 1); + harness.processElement(GenericRow.ofKind(RowKind.DELETE, 1, 10, 200), 2); + harness.processElement(GenericRow.ofKind(RowKind.INSERT, 1, 10, 300), 3); + harness.prepareSnapshotPreBarrier(1); + harness.snapshot(1, 10); + harness.notifyOfCompletedCheckpoint(1); + commitAll(harness, commit, 1); + harness.close(); + commit.close(); + + // check result + ReadBuilder readBuilder = fileStoreTable.newReadBuilder(); + StreamTableScan scan = readBuilder.newStreamScan(); + scan.restore(1L); + List splits = scan.plan().splits(); + TableRead read = readBuilder.newRead(); + RecordReader reader = read.createReader(splits); + List actual = new ArrayList<>(); + reader.forEachRemaining( + row -> + actual.add( + String.format( + "%s[%d, %d, %d]", + row.getRowKind().shortString(), + row.getInt(0), + row.getInt(1), + row.getInt(2)))); + if (insertOnly) { + assertThat(actual).containsExactlyInAnyOrder("+I[1, 10, 300]"); + } else { + assertThat(actual) + .containsExactlyInAnyOrder( + "+I[1, 10, 100]", "-D[1, 10, 200]", "+I[1, 10, 300]"); + } + } + private FileStoreTable createFileStoreTable( RowType rowType, List primaryKeys, List partitionKeys, Options conf) throws Exception {