diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveCommit.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveCommit.java index fd307a4241..45c2cfa7ba 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveCommit.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveCommit.java @@ -149,6 +149,10 @@ private DataFile moveTargetFiles(DataFile targetFile, String hiveLocation) { String newFilePath = TableFileUtils.getNewFilePath(hiveLocation, oldFilePath); if (!arcticTable.io().exists(newFilePath)) { + if (!arcticTable.io().exists(hiveLocation)) { + LOG.debug("{} hive location {} does not exist and need to mkdir before rename", arcticTable.id(), hiveLocation); + arcticTable.io().mkdirs(hiveLocation); + } arcticTable.io().rename(oldFilePath, newFilePath); LOG.debug("{} move file from {} to {}", arcticTable.id(), oldFilePath, newFilePath); } diff --git a/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java b/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java index 4a1486c261..963471b1fa 100644 --- a/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java +++ b/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java @@ -42,6 +42,7 @@ public interface ArcticFileIO extends FileIO { * Check if a path exists. * * @param path source pathmkdir + * @return true if the path exists; */ boolean exists(String path); @@ -49,18 +50,16 @@ public interface ArcticFileIO extends FileIO { * Create a new directory. * * @param path source path - * @return true if the create success; */ - boolean mkdirs(String path); + void mkdirs(String path); /** * Rename file from old path to new path * * @param oldpath source path * @param newPath target path - * @return true if the rename success; */ - boolean rename(String oldpath, String newPath); + void rename(String oldpath, String newPath); /** Delete a file. * diff --git a/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java b/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java index 464a2d7b3b..b2c2f47740 100644 --- a/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java +++ b/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java @@ -31,6 +31,8 @@ import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UncheckedIOException; @@ -41,6 +43,8 @@ * Implementation of {@link ArcticFileIO} for hadoop file system with authentication. */ public class ArcticHadoopFileIO extends HadoopFileIO implements ArcticFileIO { + private static final Logger LOG = LoggerFactory.getLogger(ArcticHadoopFileIO.class); + private final TableMetaStore tableMetaStore; public ArcticHadoopFileIO(TableMetaStore tableMetaStore) { @@ -67,7 +71,7 @@ public void deleteFile(String path) { try { fs.delete(toDelete, false); } catch (IOException e) { - throw new UncheckedIOException("Failed to delete file: " + path, e); + throw new UncheckedIOException("Fail to delete file: " + path, e); } return null; }); @@ -84,6 +88,9 @@ public boolean deleteFileWithResult(String path, boolean recursive) { } catch (IOException e) { result = false; } + if (!result) { + LOG.warn("Fail to delete file " + path + " and file system return false, need to check the hdfs path"); + } return result; }); } @@ -159,16 +166,20 @@ public boolean isEmptyDirectory(String location) { } @Override - public boolean rename(String src, String dts) { - return tableMetaStore.doAs(() -> { + public void rename(String src, String dts) { + tableMetaStore.doAs(() -> { Path srcPath = new Path(src); Path dtsPath = new Path(dts); FileSystem fs = getFs(srcPath); try { - return fs.rename(srcPath, dtsPath); + if (!fs.rename(srcPath, dtsPath)) { + throw new IOException("Fail to rename: from " + src + " to " + dts + + " and file system return false, need to check the hdfs path"); + } } catch (IOException e) { - throw new UncheckedIOException("Failed to rename: from " + src + " to " + dts, e); + throw new UncheckedIOException("Fail to rename: from " + src + " to " + dts, e); } + return null; }); } @@ -185,21 +196,25 @@ public boolean exists(String path) { try { return fs.exists(filePath); } catch (IOException e) { - throw new UncheckedIOException("Failed to check file exist for " + path, e); + throw new UncheckedIOException("Fail to check file exist for " + path, e); } }); } @Override - public boolean mkdirs(String path) { - return tableMetaStore.doAs(() -> { + public void mkdirs(String path) { + tableMetaStore.doAs(() -> { Path filePath = new Path(path); FileSystem fs = getFs(filePath); try { - return fs.mkdirs(filePath); + if (!fs.mkdirs(filePath)) { + throw new IOException("Fail to mkdirs: path " + path + + " and file system return false,, need to check the hdfs path"); + } } catch (IOException e) { - throw new UncheckedIOException("Failed to mkdirs: path " + path, e); + throw new UncheckedIOException("Fail to mkdirs: path " + path, e); } + return null; }); } diff --git a/core/src/test/java/com/netease/arctic/io/ArcticFileIoDummy.java b/core/src/test/java/com/netease/arctic/io/ArcticFileIoDummy.java index a0c80f66dd..921a008a1c 100644 --- a/core/src/test/java/com/netease/arctic/io/ArcticFileIoDummy.java +++ b/core/src/test/java/com/netease/arctic/io/ArcticFileIoDummy.java @@ -49,13 +49,13 @@ public boolean exists(String path) { } @Override - public boolean mkdirs(String path) { - return false; + public void mkdirs(String path) { + } @Override - public boolean rename(String oldpath, String newPath) { - return false; + public void rename(String oldpath, String newPath) { + } @Override