Skip to content

Commit

Permalink
[core] Reduce File IO for snapshot read (#3849)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jul 30, 2024
1 parent eec2407 commit d5290fc
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 21 deletions.
13 changes: 9 additions & 4 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,17 @@ default void copyFiles(Path sourceDirectory, Path targetDirectory, boolean overw
/** Read file from {@link #overwriteFileUtf8} file. */
default Optional<String> readOverwrittenFileUtf8(Path path) throws IOException {
int retryNumber = 0;
IOException exception = null;
Exception exception = null;
while (retryNumber++ < 5) {
try {
return Optional.of(readFileUtf8(path));
} catch (FileNotFoundException e) {
return Optional.empty();
} catch (Exception e) {
if (!exists(path)) {
return Optional.empty();
}

return Optional.of(readFileUtf8(path));
} catch (IOException e) {
if (e.getClass()
.getName()
.endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException")) {
Expand All @@ -322,7 +324,10 @@ default Optional<String> readOverwrittenFileUtf8(Path path) throws IOException {
}
}

throw exception;
if (exception instanceof IOException) {
throw (IOException) exception;
}
throw new RuntimeException(exception);
}

// -------------------------------------------------------------------------
Expand Down
10 changes: 6 additions & 4 deletions paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,7 @@ public static Snapshot fromJson(String json) {

public static Snapshot fromPath(FileIO fileIO, Path path) {
try {
String json = fileIO.readFileUtf8(path);
return Snapshot.fromJson(json);
return fromPathThrowsException(fileIO, path);
} catch (IOException e) {
throw new RuntimeException("Fails to read snapshot from path " + path, e);
}
Expand All @@ -450,13 +449,16 @@ public static Snapshot fromPath(FileIO fileIO, Path path) {
@Nullable
public static Snapshot safelyFromPath(FileIO fileIO, Path path) throws IOException {
try {
String json = fileIO.readFileUtf8(path);
return Snapshot.fromJson(json);
return fromPathThrowsException(fileIO, path);
} catch (FileNotFoundException e) {
return null;
}
}

private static Snapshot fromPathThrowsException(FileIO fileIO, Path path) throws IOException {
return Snapshot.fromJson(fileIO.readFileUtf8(path));
}

@Override
public int hashCode() {
return Objects.hash(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ public ScanMode startingScanMode() {

@Override
public Result scan(SnapshotReader snapshotReader) {
Long startingSnapshotId = snapshotManager.latestSnapshotId();
if (startingSnapshotId == null) {
// try to get first snapshot again
startingSnapshotId = snapshotManager.latestSnapshotId();
}
if (startingSnapshotId == null) {
LOG.debug("There is currently no snapshot. Waiting for snapshot generation.");
return new NoSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,10 +611,6 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {

private @Nullable Long findLatest(Path dir, String prefix, Function<Long, Path> file)
throws IOException {
if (!fileIO.exists(dir)) {
return null;
}

Long snapshotId = readHint(LATEST, dir);
if (snapshotId != null && snapshotId > 0) {
long nextSnapshot = snapshotId + 1;
Expand All @@ -628,10 +624,6 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {

private @Nullable Long findEarliest(Path dir, String prefix, Function<Long, Path> file)
throws IOException {
if (!fileIO.exists(dir)) {
return null;
}

Long snapshotId = readHint(EARLIEST, dir);
// null and it is the earliest only it exists
if (snapshotId != null && fileIO.exists(file.apply(snapshotId))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
public class TableFormatReadWriteTest extends TableTestBase {

private Table createTable(String format) throws Exception {
catalog.createTable(identifier(), schema(format), true);
return catalog.getTable(identifier());
catalog.createTable(identifier(format), schema(format), true);
return catalog.getTable(identifier(format));
}

private Schema schema(String format) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
public class TableFormatReadWriteWithPkTest extends TableTestBase {

private Table createTable(String format) throws Exception {
catalog.createTable(identifier(), schema(format), true);
return catalog.getTable(identifier());
catalog.createTable(identifier(format), schema(format), true);
return catalog.getTable(identifier(format));
}

private Schema schema(String format) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark

import org.apache.paimon.Snapshot
import org.apache.paimon.hive.TestHiveMetastore

import org.apache.hadoop.conf.Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark

import org.apache.paimon.Snapshot
import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory, Identifier}
import org.apache.paimon.options.{CatalogOptions, Options}
import org.apache.paimon.spark.catalog.Catalogs
Expand Down

0 comments on commit d5290fc

Please sign in to comment.