Skip to content

Commit

Permalink
[core] Generate changelog by copying data when records are insert-only (
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub authored Jun 29, 2024
1 parent 82e6e59 commit 96a2db7
Show file tree
Hide file tree
Showing 19 changed files with 593 additions and 6 deletions.
17 changes: 17 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.fs.hadoop.HadoopFileIOLoader;
import org.apache.paimon.fs.local.LocalFileIO;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -267,6 +268,22 @@ default void overwriteFileUtf8(Path path, String content) throws IOException {
}
}

/**
* Copy content of one file into another.
*
* @return false if targetPath file exists
*/
default boolean copyFile(Path sourcePath, Path targetPath) throws IOException {
if (exists(targetPath)) {
return false;
}
try (SeekableInputStream is = newInputStream(sourcePath);
PositionOutputStream os = newOutputStream(targetPath, false)) {
IOUtils.copy(is, os);
}
return true;
}

/**
* Read file to UTF_8 decoding, then write content to one file atomically, initially writes to
* temp hidden file and only renames to the target file once temp file is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,19 @@ public boolean rename(Path src, Path dst) throws IOException {
}
}

@Override
public boolean copyFile(Path sourcePath, Path targetPath) throws IOException {
if (exists(targetPath)) {
return false;
}
Files.copy(toPath(sourcePath), toPath(targetPath), StandardCopyOption.COPY_ATTRIBUTES);
return true;
}

private java.nio.file.Path toPath(Path path) {
return toFile(path).toPath();
}

/**
* Converts the given Path to a File for this file system. If the path is empty, we will return
* <tt>new File(".")</tt> instead of <tt>new File("")</tt>, since the latter returns
Expand Down
188 changes: 187 additions & 1 deletion paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,31 @@
package org.apache.paimon.fs;

import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;

import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.StandardCopyOption;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.paimon.utils.Preconditions.checkState;
import static org.assertj.core.api.Assertions.assertThat;

/** Test {@link FileIO}. */
/** Test static methods and methods with default implementations of {@link FileIO}. */
public class FileIOTest {

@TempDir java.nio.file.Path tempDir;
Expand Down Expand Up @@ -59,6 +71,36 @@ public void testRequireOptions() throws IOException {
assertThat(fileIO).isInstanceOf(RequireOptionsFileIOLoader.MyFileIO.class);
}

@Test
public void testCopy() throws Exception {
Path srcFile = new Path(tempDir.resolve("src.txt").toUri());
Path dstFile = new Path(tempDir.resolve("dst.txt").toUri());

FileIO fileIO = new DummyFileIO();
fileIO.writeFileUtf8(srcFile, "foobar");

assertThat(fileIO.copyFileUtf8(srcFile, dstFile)).isTrue();
assertThat(fileIO.readFileUtf8(dstFile)).isEqualTo("foobar");
fileIO.deleteQuietly(dstFile);

assertThat(fileIO.copyFile(srcFile, dstFile)).isTrue();
assertThat(fileIO.readFileUtf8(dstFile)).isEqualTo("foobar");
fileIO.deleteQuietly(dstFile);

fileIO.deleteQuietly(srcFile);
srcFile = new Path(this.getClass().getClassLoader().getResource("test-data.orc").toURI());

fileIO.copyFileUtf8(srcFile, dstFile);
assertThat(FileUtils.contentEquals(new File(srcFile.toUri()), new File(dstFile.toUri())))
.isFalse();
fileIO.deleteQuietly(dstFile);

fileIO.copyFile(srcFile, dstFile);
assertThat(FileUtils.contentEquals(new File(srcFile.toUri()), new File(dstFile.toUri())))
.isTrue();
fileIO.deleteQuietly(dstFile);
}

public static void testOverwriteFileUtf8(Path file, FileIO fileIO) throws InterruptedException {
AtomicReference<Exception> exception = new AtomicReference<>();
final int max = 10;
Expand Down Expand Up @@ -106,4 +148,148 @@ public static void testOverwriteFileUtf8(Path file, FileIO fileIO) throws Interr

assertThat(exception.get()).isNull();
}

/** A {@link FileIO} on local filesystem to test the default copy implementation. */
private static class DummyFileIO implements FileIO {
private static final ReentrantLock RENAME_LOCK = new ReentrantLock();

@Override
public boolean isObjectStore() {
throw new UnsupportedOperationException();
}

@Override
public void configure(CatalogContext context) {
throw new UnsupportedOperationException();
}

@Override
public SeekableInputStream newInputStream(Path path) throws FileNotFoundException {
return new LocalFileIO.LocalSeekableInputStream(toFile(path));
}

@Override
public PositionOutputStream newOutputStream(Path path, boolean overwrite)
throws IOException {
if (exists(path) && !overwrite) {
throw new FileAlreadyExistsException("File already exists: " + path);
}

Path parent = path.getParent();
if (parent != null && !mkdirs(parent)) {
throw new IOException("Mkdirs failed to create " + parent);
}

return new LocalFileIO.LocalPositionOutputStream(toFile(path));
}

@Override
public FileStatus getFileStatus(Path path) {
throw new UnsupportedOperationException();
}

@Override
public FileStatus[] listStatus(Path path) {
throw new UnsupportedOperationException();
}

@Override
public boolean exists(Path path) {
return toFile(path).exists();
}

@Override
public boolean delete(Path path, boolean recursive) throws IOException {
File file = toFile(path);
if (file.isFile()) {
return file.delete();
} else if ((!recursive) && file.isDirectory()) {
File[] containedFiles = file.listFiles();
if (containedFiles == null) {
throw new IOException(
"Directory " + file + " does not exist or an I/O error occurred");
} else if (containedFiles.length != 0) {
throw new IOException("Directory " + file + " is not empty");
}
}

return delete(file);
}

private boolean delete(final File f) {
if (f.isDirectory()) {
final File[] files = f.listFiles();
if (files != null) {
for (File file : files) {
final boolean del = delete(file);
if (!del) {
return false;
}
}
}
} else {
return f.delete();
}

// Now directory is empty
return f.delete();
}

@Override
public boolean mkdirs(Path path) throws IOException {
return mkdirsInternal(toFile(path));
}

private boolean mkdirsInternal(File file) throws IOException {
if (file.isDirectory()) {
return true;
} else if (file.exists() && !file.isDirectory()) {
// Important: The 'exists()' check above must come before the 'isDirectory()' check
// to
// be safe when multiple parallel instances try to create the directory

// exists and is not a directory -> is a regular file
throw new FileAlreadyExistsException(file.getAbsolutePath());
} else {
File parent = file.getParentFile();
return (parent == null || mkdirsInternal(parent))
&& (file.mkdir() || file.isDirectory());
}
}

@Override
public boolean rename(Path src, Path dst) throws IOException {
File srcFile = toFile(src);
File dstFile = toFile(dst);
File dstParent = dstFile.getParentFile();
dstParent.mkdirs();
try {
RENAME_LOCK.lock();
if (dstFile.exists()) {
return false;
}
Files.move(srcFile.toPath(), dstFile.toPath(), StandardCopyOption.ATOMIC_MOVE);
return true;
} catch (NoSuchFileException
| AccessDeniedException
| DirectoryNotEmptyException
| SecurityException e) {
return false;
} finally {
RENAME_LOCK.unlock();
}
}

private File toFile(Path path) {
// remove scheme
String localPath = path.toUri().getPath();
checkState(localPath != null, "Cannot convert a null path to File");

if (localPath.length() == 0) {
return new File(".");
}

return new File(localPath);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fs.local;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;

import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link LocalFileIO}. */
public class LocalFleIOTest {

@TempDir java.nio.file.Path tempDir;

@Test
public void testCopy() throws Exception {
Path srcFile = new Path(tempDir.resolve("src.txt").toUri());
Path dstFile = new Path(tempDir.resolve("dst.txt").toUri());

FileIO fileIO = new LocalFileIO();
fileIO.writeFileUtf8(srcFile, "foobar");

assertThat(fileIO.copyFileUtf8(srcFile, dstFile)).isTrue();
assertThat(fileIO.readFileUtf8(dstFile)).isEqualTo("foobar");
fileIO.deleteQuietly(dstFile);

assertThat(fileIO.copyFile(srcFile, dstFile)).isTrue();
assertThat(fileIO.readFileUtf8(dstFile)).isEqualTo("foobar");
fileIO.deleteQuietly(dstFile);

fileIO.deleteQuietly(srcFile);
srcFile = new Path(this.getClass().getClassLoader().getResource("test-data.orc").toURI());

fileIO.copyFileUtf8(srcFile, dstFile);
assertThat(FileUtils.contentEquals(new File(srcFile.toUri()), new File(dstFile.toUri())))
.isFalse();
fileIO.deleteQuietly(dstFile);

fileIO.copyFile(srcFile, dstFile);
assertThat(FileUtils.contentEquals(new File(srcFile.toUri()), new File(dstFile.toUri())))
.isTrue();
fileIO.deleteQuietly(dstFile);
}
}
Binary file added paimon-common/src/test/resources/test-data.orc
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ public void sync() throws Exception {
trySyncLatestCompaction(true);
}

@Override
public void withInsertOnly(boolean insertOnly) {}

@Override
public void close() throws Exception {
// cancel compaction so that it does not block job cancelling
Expand Down
20 changes: 20 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,26 @@ public DataFileMeta upgrade(int newLevel) {
fileSource);
}

public DataFileMeta rename(String newFileName) {
return new DataFileMeta(
newFileName,
fileSize,
rowCount,
minKey,
maxKey,
keyStats,
valueStats,
minSequenceNumber,
maxSequenceNumber,
schemaId,
level,
extraFiles,
creationTime,
deleteRowCount,
embeddedIndex,
fileSource);
}

public List<Path> collectFiles(DataFilePathFactory pathFactory) {
List<Path> paths = new ArrayList<>();
paths.add(pathFactory.toPath(fileName));
Expand Down
Loading

0 comments on commit 96a2db7

Please sign in to comment.