Skip to content

Commit

Permalink
[core] Generate changelog by copying data when records are insert-only
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Jun 22, 2024
1 parent c46a256 commit 019d7ee
Show file tree
Hide file tree
Showing 41 changed files with 1,005 additions and 101 deletions.
18 changes: 18 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.fs.hadoop.HadoopFileIOLoader;
import org.apache.paimon.fs.local.LocalFileIO;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -267,6 +268,22 @@ default void overwriteFileUtf8(Path path, String content) throws IOException {
}
}

/**
* Copy content of one file into another.
*
* @return false if targetPath file exists
*/
default boolean copyFile(Path src, Path dst) throws IOException {
if (exists(dst)) {
return false;
}
try (SeekableInputStream is = newInputStream(src);
PositionOutputStream os = newOutputStream(dst, false)) {
IOUtils.copy(is, os);
}
return true;
}

/**
* Read file to UTF_8 decoding, then write content to one file atomically, initially writes to
* temp hidden file and only renames to the target file once temp file is closed.
Expand All @@ -275,6 +292,7 @@ default void overwriteFileUtf8(Path path, String content) throws IOException {
*/
default boolean copyFileUtf8(Path sourcePath, Path targetPath) throws IOException {
String content = readFileUtf8(sourcePath);
System.out.println("copyFileUtf8 " + content.length());
return writeFileUtf8(targetPath, content);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,19 @@ public boolean rename(Path src, Path dst) throws IOException {
}
}

@Override
public boolean copyFile(Path src, Path dst) throws IOException {
if (exists(dst)) {
return false;
}
Files.copy(toPath(src), toPath(dst), StandardCopyOption.COPY_ATTRIBUTES);
return true;
}

private java.nio.file.Path toPath(Path path) {
return toFile(path).toPath();
}

/**
* Converts the given Path to a File for this file system. If the path is empty, we will return
* <tt>new File(".")</tt> instead of <tt>new File("")</tt>, since the latter returns
Expand Down
188 changes: 187 additions & 1 deletion paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,31 @@
package org.apache.paimon.fs;

import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;

import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.StandardCopyOption;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.paimon.utils.Preconditions.checkState;
import static org.assertj.core.api.Assertions.assertThat;

/** Test {@link FileIO}. */
/** Test static methods and methods with default implementations of {@link FileIO}. */
public class FileIOTest {

@TempDir java.nio.file.Path tempDir;
Expand Down Expand Up @@ -59,6 +71,36 @@ public void testRequireOptions() throws IOException {
assertThat(fileIO).isInstanceOf(RequireOptionsFileIOLoader.MyFileIO.class);
}

@Test
public void testCopy() throws Exception {
Path srcFile = new Path(tempDir.resolve("src.txt").toUri());
Path dstFile = new Path(tempDir.resolve("dst.txt").toUri());

FileIO fileIO = new DummyFileIO();
fileIO.writeFileUtf8(srcFile, "foobar");

assertThat(fileIO.copyFileUtf8(srcFile, dstFile)).isTrue();
assertThat(fileIO.readFileUtf8(dstFile)).isEqualTo("foobar");
fileIO.deleteQuietly(dstFile);

assertThat(fileIO.copyFile(srcFile, dstFile)).isTrue();
assertThat(fileIO.readFileUtf8(dstFile)).isEqualTo("foobar");
fileIO.deleteQuietly(dstFile);

fileIO.deleteQuietly(srcFile);
srcFile = new Path(this.getClass().getClassLoader().getResource("test-data.orc").toURI());

fileIO.copyFileUtf8(srcFile, dstFile);
assertThat(FileUtils.contentEquals(new File(srcFile.toUri()), new File(dstFile.toUri())))
.isFalse();
fileIO.deleteQuietly(dstFile);

fileIO.copyFile(srcFile, dstFile);
assertThat(FileUtils.contentEquals(new File(srcFile.toUri()), new File(dstFile.toUri())))
.isTrue();
fileIO.deleteQuietly(dstFile);
}

public static void testOverwriteFileUtf8(Path file, FileIO fileIO) throws InterruptedException {
AtomicReference<Exception> exception = new AtomicReference<>();
final int max = 10;
Expand Down Expand Up @@ -106,4 +148,148 @@ public static void testOverwriteFileUtf8(Path file, FileIO fileIO) throws Interr

assertThat(exception.get()).isNull();
}

/** A {@link FileIO} on local filesystem to test the default copy implementation. */
private static class DummyFileIO implements FileIO {
private static final ReentrantLock RENAME_LOCK = new ReentrantLock();

@Override
public boolean isObjectStore() {
throw new UnsupportedOperationException();
}

@Override
public void configure(CatalogContext context) {
throw new UnsupportedOperationException();
}

@Override
public SeekableInputStream newInputStream(Path path) throws FileNotFoundException {
return new LocalFileIO.LocalSeekableInputStream(toFile(path));
}

@Override
public PositionOutputStream newOutputStream(Path path, boolean overwrite)
throws IOException {
if (exists(path) && !overwrite) {
throw new FileAlreadyExistsException("File already exists: " + path);
}

Path parent = path.getParent();
if (parent != null && !mkdirs(parent)) {
throw new IOException("Mkdirs failed to create " + parent);
}

return new LocalFileIO.LocalPositionOutputStream(toFile(path));
}

@Override
public FileStatus getFileStatus(Path path) {
throw new UnsupportedOperationException();
}

@Override
public FileStatus[] listStatus(Path path) {
throw new UnsupportedOperationException();
}

@Override
public boolean exists(Path path) {
return toFile(path).exists();
}

@Override
public boolean delete(Path path, boolean recursive) throws IOException {
File file = toFile(path);
if (file.isFile()) {
return file.delete();
} else if ((!recursive) && file.isDirectory()) {
File[] containedFiles = file.listFiles();
if (containedFiles == null) {
throw new IOException(
"Directory " + file + " does not exist or an I/O error occurred");
} else if (containedFiles.length != 0) {
throw new IOException("Directory " + file + " is not empty");
}
}

return delete(file);
}

private boolean delete(final File f) {
if (f.isDirectory()) {
final File[] files = f.listFiles();
if (files != null) {
for (File file : files) {
final boolean del = delete(file);
if (!del) {
return false;
}
}
}
} else {
return f.delete();
}

// Now directory is empty
return f.delete();
}

@Override
public boolean mkdirs(Path path) throws IOException {
return mkdirsInternal(toFile(path));
}

private boolean mkdirsInternal(File file) throws IOException {
if (file.isDirectory()) {
return true;
} else if (file.exists() && !file.isDirectory()) {
// Important: The 'exists()' check above must come before the 'isDirectory()' check
// to
// be safe when multiple parallel instances try to create the directory

// exists and is not a directory -> is a regular file
throw new FileAlreadyExistsException(file.getAbsolutePath());
} else {
File parent = file.getParentFile();
return (parent == null || mkdirsInternal(parent))
&& (file.mkdir() || file.isDirectory());
}
}

@Override
public boolean rename(Path src, Path dst) throws IOException {
File srcFile = toFile(src);
File dstFile = toFile(dst);
File dstParent = dstFile.getParentFile();
dstParent.mkdirs();
try {
RENAME_LOCK.lock();
if (dstFile.exists()) {
return false;
}
Files.move(srcFile.toPath(), dstFile.toPath(), StandardCopyOption.ATOMIC_MOVE);
return true;
} catch (NoSuchFileException
| AccessDeniedException
| DirectoryNotEmptyException
| SecurityException e) {
return false;
} finally {
RENAME_LOCK.unlock();
}
}

private File toFile(Path path) {
// remove scheme
String localPath = path.toUri().getPath();
checkState(localPath != null, "Cannot convert a null path to File");

if (localPath.length() == 0) {
return new File(".");
}

return new File(localPath);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fs.local;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;

import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link LocalFileIO}. */
public class LocalFleIOTest {

@TempDir java.nio.file.Path tempDir;

@Test
public void testCopy() throws Exception {
Path srcFile = new Path(tempDir.resolve("src.txt").toUri());
Path dstFile = new Path(tempDir.resolve("dst.txt").toUri());

FileIO fileIO = new LocalFileIO();
fileIO.writeFileUtf8(srcFile, "foobar");

assertThat(fileIO.copyFileUtf8(srcFile, dstFile)).isTrue();
assertThat(fileIO.readFileUtf8(dstFile)).isEqualTo("foobar");
fileIO.deleteQuietly(dstFile);

assertThat(fileIO.copyFile(srcFile, dstFile)).isTrue();
assertThat(fileIO.readFileUtf8(dstFile)).isEqualTo("foobar");
fileIO.deleteQuietly(dstFile);

fileIO.deleteQuietly(srcFile);
srcFile = new Path(this.getClass().getClassLoader().getResource("test-data.orc").toURI());

fileIO.copyFileUtf8(srcFile, dstFile);
assertThat(FileUtils.contentEquals(new File(srcFile.toUri()), new File(dstFile.toUri())))
.isFalse();
fileIO.deleteQuietly(dstFile);

fileIO.copyFile(srcFile, dstFile);
assertThat(FileUtils.contentEquals(new File(srcFile.toUri()), new File(dstFile.toUri())))
.isTrue();
fileIO.deleteQuietly(dstFile);
}
}
Binary file added paimon-common/src/test/resources/test-data.orc
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.RecordAttributeManager;

import javax.annotation.Nullable;

import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -87,12 +90,14 @@ public RawFileSplitRead newRead() {

@Override
public AppendOnlyFileStoreWrite newWrite(String commitUser) {
return newWrite(commitUser, null);
return newWrite(commitUser, null, null);
}

@Override
public AppendOnlyFileStoreWrite newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
String commitUser,
ManifestCacheFilter manifestFilter,
@Nullable RecordAttributeManager recordAttributeManager) {
return new AppendOnlyFileStoreWrite(
fileIO,
newRead(),
Expand Down
6 changes: 5 additions & 1 deletion paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordAttributeManager;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

Expand Down Expand Up @@ -76,7 +77,10 @@ public interface FileStore<T> extends Serializable {

FileStoreWrite<T> newWrite(String commitUser);

FileStoreWrite<T> newWrite(String commitUser, ManifestCacheFilter manifestFilter);
FileStoreWrite<T> newWrite(
String commitUser,
ManifestCacheFilter manifestFilter,
@Nullable RecordAttributeManager recordAttributeManager);

FileStoreCommit newCommit(String commitUser);

Expand Down
Loading

0 comments on commit 019d7ee

Please sign in to comment.