From 0e4a2c6276e45c207e6f1ec25544e381f36d7d5e Mon Sep 17 00:00:00 2001 From: wangtaohz <103108928+wangtaohz@users.noreply.github.com> Date: Wed, 15 Mar 2023 14:20:44 +0800 Subject: [PATCH] [ARCTIC-1167][core][hive] fix Trash for restoring deleted files (#1223) * fix ArcticHadoopFileIO cast error * overwrite file in trash when move * [ARCTIC-1213] Optimizing of Mixed Format Table supports optimizing a part of partitions at a time (#1220) * support partition ordered by PartitionWeight for OptimizePlan * if not all partitions are optimized, current change snapshot id should set to -1 * fix checkstyle * TableTrashManager should extends Serializable --- .../com/netease/arctic/io/ArcticFileIO.java | 3 +- .../netease/arctic/io/ArcticHadoopFileIO.java | 3 - .../arctic/io/BasicTableTrashManager.java | 3 + .../arctic/io/RecoverableArcticFileIO.java | 11 +++ .../netease/arctic/io/TableTrashManager.java | 3 +- .../netease/arctic/io/ArcticFileIoDummy.java | 95 ------------------- .../arctic/io/BasicTableTrashManagerTest.java | 14 +++ .../hive/utils/HiveMetaSynchronizer.java | 5 +- 8 files changed, 34 insertions(+), 103 deletions(-) delete mode 100644 core/src/test/java/com/netease/arctic/io/ArcticFileIoDummy.java diff --git a/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java b/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java index fc6028a2b0..fa9ccbb7d7 100644 --- a/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java +++ b/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java @@ -18,6 +18,7 @@ package com.netease.arctic.io; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.fs.FileStatus; import org.apache.iceberg.io.FileIO; @@ -27,7 +28,7 @@ /** * Arctic extension from {@link FileIO}, adding more operations. */ -public interface ArcticFileIO extends FileIO { +public interface ArcticFileIO extends FileIO, Configurable { /** * Run the given action with login user. diff --git a/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java b/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java index 0a3b1bf737..8e7ca1b879 100644 --- a/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java +++ b/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java @@ -31,8 +31,6 @@ import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UncheckedIOException; @@ -43,7 +41,6 @@ * Implementation of {@link ArcticFileIO} for hadoop file system with authentication. */ public class ArcticHadoopFileIO extends HadoopFileIO implements ArcticFileIO { - private static final Logger LOG = LoggerFactory.getLogger(ArcticHadoopFileIO.class); private final TableMetaStore tableMetaStore; diff --git a/core/src/main/java/com/netease/arctic/io/BasicTableTrashManager.java b/core/src/main/java/com/netease/arctic/io/BasicTableTrashManager.java index 30554b3965..e6fb8bae5c 100644 --- a/core/src/main/java/com/netease/arctic/io/BasicTableTrashManager.java +++ b/core/src/main/java/com/netease/arctic/io/BasicTableTrashManager.java @@ -101,6 +101,9 @@ public void moveFileToTrash(String path) { if (!arcticFileIO.exists(targetFileDir)) { arcticFileIO.mkdirs(targetFileDir); } + if (arcticFileIO.exists(targetFileLocation)) { + arcticFileIO.deleteFile(targetFileLocation); + } arcticFileIO.rename(path, targetFileLocation); } catch (Exception e) { LOG.error("{} failed to move file to trash, {}", tableIdentifier, path, e); diff --git a/core/src/main/java/com/netease/arctic/io/RecoverableArcticFileIO.java b/core/src/main/java/com/netease/arctic/io/RecoverableArcticFileIO.java index aab36a5ba7..52e6f7db6c 100644 --- a/core/src/main/java/com/netease/arctic/io/RecoverableArcticFileIO.java +++ b/core/src/main/java/com/netease/arctic/io/RecoverableArcticFileIO.java @@ -1,5 +1,6 @@ package com.netease.arctic.io; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -144,4 +145,14 @@ private void moveToTrash(String filePath) { trashManager.moveFileToTrash(filePath); LOG.debug("Move file:{} to table trash", filePath); } + + @Override + public void setConf(Configuration conf) { + fileIO.setConf(conf); + } + + @Override + public Configuration getConf() { + return fileIO.getConf(); + } } diff --git a/core/src/main/java/com/netease/arctic/io/TableTrashManager.java b/core/src/main/java/com/netease/arctic/io/TableTrashManager.java index 082b6ea754..2d5ed1d08e 100644 --- a/core/src/main/java/com/netease/arctic/io/TableTrashManager.java +++ b/core/src/main/java/com/netease/arctic/io/TableTrashManager.java @@ -20,12 +20,13 @@ import com.netease.arctic.table.TableIdentifier; +import java.io.Serializable; import java.time.LocalDate; /** * Trash Manager for a table. */ -public interface TableTrashManager { +public interface TableTrashManager extends Serializable { /** * Table identifier. * A TableTrashManager only handle files in this table's location. diff --git a/core/src/test/java/com/netease/arctic/io/ArcticFileIoDummy.java b/core/src/test/java/com/netease/arctic/io/ArcticFileIoDummy.java deleted file mode 100644 index 1cca9a07a4..0000000000 --- a/core/src/test/java/com/netease/arctic/io/ArcticFileIoDummy.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netease.arctic.io; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFile; - -import java.util.List; -import java.util.concurrent.Callable; - -public class ArcticFileIoDummy implements ArcticFileIO { - - private FileIO fileIO; - - public ArcticFileIoDummy(FileIO fileIO) { - this.fileIO = fileIO; - } - - @Override - public T doAs(Callable callable) { - try { - return callable.call(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean exists(String path) { - return false; - } - - @Override - public void mkdirs(String path) { - - } - - @Override - public void rename(String oldpath, String newPath) { - - } - - @Override - public void deleteDirectoryRecursively(String path) { - - } - - @Override - public List list(String location) { - return null; - } - - @Override - public boolean isDirectory(String location) { - return false; - } - - @Override - public boolean isEmptyDirectory(String location) { - return false; - } - - @Override - public InputFile newInputFile(String path) { - return fileIO.newInputFile(path); - } - - @Override - public OutputFile newOutputFile(String path) { - return fileIO.newOutputFile(path); - } - - @Override - public void deleteFile(String path) { - - } -} diff --git a/core/src/test/java/com/netease/arctic/io/BasicTableTrashManagerTest.java b/core/src/test/java/com/netease/arctic/io/BasicTableTrashManagerTest.java index 5bae313063..15a63c4ecd 100644 --- a/core/src/test/java/com/netease/arctic/io/BasicTableTrashManagerTest.java +++ b/core/src/test/java/com/netease/arctic/io/BasicTableTrashManagerTest.java @@ -79,6 +79,20 @@ public void testDeleteAndRestore() { Assert.assertFalse(getArcticTable().io().exists(fileLocationInTrash)); } + @Test + public void testMoveAndOverwrite() { + TableTrashManager tableTrashManager = TableTrashManagers.build(getArcticTable()); + + String relativeFilePath = "base/test/test1.parquet"; + String path = createFile(getArcticTable().io(), getArcticTable().location() + File.separator + relativeFilePath); + + tableTrashManager.moveFileToTrash(path); + Assert.assertTrue(tableTrashManager.fileExistInTrash(path)); + createFile(getArcticTable().io(), getArcticTable().location() + File.separator + relativeFilePath); + tableTrashManager.moveFileToTrash(path); + Assert.assertTrue(tableTrashManager.fileExistInTrash(path)); + } + @Test public void testDeleteDirectory() { TableTrashManager tableTrashManager = TableTrashManagers.build(getArcticTable()); diff --git a/hive/src/main/java/com/netease/arctic/hive/utils/HiveMetaSynchronizer.java b/hive/src/main/java/com/netease/arctic/hive/utils/HiveMetaSynchronizer.java index 2dbf9f1e17..ec903aaf0f 100644 --- a/hive/src/main/java/com/netease/arctic/hive/utils/HiveMetaSynchronizer.java +++ b/hive/src/main/java/com/netease/arctic/hive/utils/HiveMetaSynchronizer.java @@ -21,7 +21,6 @@ import com.netease.arctic.hive.HMSClientPool; import com.netease.arctic.hive.HiveTableProperties; import com.netease.arctic.hive.op.OverwriteHiveFiles; -import com.netease.arctic.io.ArcticHadoopFileIO; import com.netease.arctic.op.OverwriteBaseFiles; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.TableIdentifier; @@ -219,11 +218,11 @@ public static void syncHiveDataToArctic(ArcticTable table, HMSClientPool hiveCli } private static List listHivePartitionFiles(ArcticTable arcticTable, Map partitionValueMap, - String partitionLocation) { + String partitionLocation) { return arcticTable.io().doAs(() -> TableMigrationUtil.listPartition(partitionValueMap, partitionLocation, arcticTable.properties().getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT), - arcticTable.spec(), ((ArcticHadoopFileIO)arcticTable.io()).getTableMetaStore().getConfiguration(), + arcticTable.spec(), arcticTable.io().getConf(), MetricsConfig.fromProperties(arcticTable.properties()), NameMappingParser.fromJson( arcticTable.properties().get(org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING)))); }