From cf9c1931b4f148b934173870040f2c3b91a9b7a6 Mon Sep 17 00:00:00 2001 From: xujiangfeng001 <104614523+xujiangfeng001@users.noreply.github.com> Date: Tue, 12 Dec 2023 19:38:49 +0800 Subject: [PATCH] [AMORO-2261] Extract the deleting dangling files from the cleaning orphan files (#2403) * [Improvement]: Extract the deleting dangling files from the cleaning orphan files * [Improvement]: Extract the deleting dangling files from the cleaning orphan files * [Improvement]: Extract the deleting dangling files from the cleaning orphan files --- ams/dist/src/main/arctic-bin/conf/config.yaml | 4 ++ .../arctic/server/ArcticManagementConf.java | 12 ++++ .../arctic/server/ArcticServiceContainer.java | 1 + .../maintainer/IcebergTableMaintainer.java | 7 +- .../maintainer/TableMaintainer.java | 6 +- .../table/executor/AsyncTableExecutors.java | 11 +++ .../DanglingDeleteFilesCleaningExecutor.java | 68 +++++++++++++++++++ .../TestOrphanFileCleanIceberg.java | 2 +- 8 files changed, 107 insertions(+), 4 deletions(-) create mode 100644 ams/server/src/main/java/com/netease/arctic/server/table/executor/DanglingDeleteFilesCleaningExecutor.java diff --git a/ams/dist/src/main/arctic-bin/conf/config.yaml b/ams/dist/src/main/arctic-bin/conf/config.yaml index e876b72937..3ba116b5d5 100644 --- a/ams/dist/src/main/arctic-bin/conf/config.yaml +++ b/ams/dist/src/main/arctic-bin/conf/config.yaml @@ -47,6 +47,10 @@ ams: enabled: true thread-count: 10 + clean-dangling-delete-files: + enabled: true + thread-count: 10 + sync-hive-tables: enabled: true thread-count: 10 diff --git a/ams/server/src/main/java/com/netease/arctic/server/ArcticManagementConf.java b/ams/server/src/main/java/com/netease/arctic/server/ArcticManagementConf.java index b18e27712f..f9999ad569 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/ArcticManagementConf.java +++ b/ams/server/src/main/java/com/netease/arctic/server/ArcticManagementConf.java @@ -100,6 +100,18 @@ public class ArcticManagementConf { .defaultValue(10) .withDescription("The number of threads used for orphan files cleaning."); + public static final ConfigOption CLEAN_DANGLING_DELETE_FILES_ENABLED = + ConfigOptions.key("clean-dangling-delete-files.enabled") + .booleanType() + .defaultValue(true) + .withDescription("Enable dangling delete files cleaning."); + + public static final ConfigOption CLEAN_DANGLING_DELETE_FILES_THREAD_COUNT = + ConfigOptions.key("clean-dangling-delete-files.thread-count") + .intType() + .defaultValue(10) + .withDescription("The number of threads used for dangling delete files cleaning."); + public static final ConfigOption SYNC_HIVE_TABLES_ENABLED = ConfigOptions.key("sync-hive-tables.enabled") .booleanType() diff --git a/ams/server/src/main/java/com/netease/arctic/server/ArcticServiceContainer.java b/ams/server/src/main/java/com/netease/arctic/server/ArcticServiceContainer.java index ddee0f4fe1..502a95d9b3 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/ArcticServiceContainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/ArcticServiceContainer.java @@ -138,6 +138,7 @@ public void startService() throws Exception { addHandlerChain(AsyncTableExecutors.getInstance().getDataExpiringExecutor()); addHandlerChain(AsyncTableExecutors.getInstance().getSnapshotsExpiringExecutor()); addHandlerChain(AsyncTableExecutors.getInstance().getOrphanFilesCleaningExecutor()); + addHandlerChain(AsyncTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor()); addHandlerChain(AsyncTableExecutors.getInstance().getOptimizingCommitExecutor()); addHandlerChain(AsyncTableExecutors.getInstance().getOptimizingExpiringExecutor()); addHandlerChain(AsyncTableExecutors.getInstance().getBlockerExpiringExecutor()); diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java index 2ef29357db..04ee9d1ce1 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -127,13 +127,16 @@ public void cleanOrphanFiles(TableRuntime tableRuntime) { // clear metadata files cleanMetadata(System.currentTimeMillis() - keepTime); + } + + @Override + public void cleanDanglingDeleteFiles(TableRuntime tableRuntime) { + TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration(); if (!tableConfiguration.isDeleteDanglingDeleteFilesEnabled()) { return; } - // refresh - table.refresh(); Snapshot currentSnapshot = table.currentSnapshot(); java.util.Optional totalDeleteFiles = java.util.Optional.ofNullable( diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/TableMaintainer.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/TableMaintainer.java index f439512dc8..32cdb2528f 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/TableMaintainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/TableMaintainer.java @@ -33,9 +33,13 @@ // TODO TableMaintainer should not be in this optimizing.xxx package. public interface TableMaintainer { - /** Clean table orphan files. Includes: data files, metadata files, dangling delete files. */ + /** Clean table orphan files. Includes: data files, metadata files. */ void cleanOrphanFiles(TableRuntime tableRuntime); + /** Clean table dangling delete files. */ + default void cleanDanglingDeleteFiles(TableRuntime tableRuntime) { + // DO nothing by default + } /** * Expire snapshots. The optimizing based on the snapshot that the current table relies on will * not expire according to TableRuntime. diff --git a/ams/server/src/main/java/com/netease/arctic/server/table/executor/AsyncTableExecutors.java b/ams/server/src/main/java/com/netease/arctic/server/table/executor/AsyncTableExecutors.java index 518a0ee2a0..aecebbc4f0 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/table/executor/AsyncTableExecutors.java +++ b/ams/server/src/main/java/com/netease/arctic/server/table/executor/AsyncTableExecutors.java @@ -10,6 +10,7 @@ public class AsyncTableExecutors { private SnapshotsExpiringExecutor snapshotsExpiringExecutor; private TableRuntimeRefreshExecutor tableRefreshingExecutor; private OrphanFilesCleaningExecutor orphanFilesCleaningExecutor; + private DanglingDeleteFilesCleaningExecutor danglingDeleteFilesCleaningExecutor; private BlockerExpiringExecutor blockerExpiringExecutor; private OptimizingCommitExecutor optimizingCommitExecutor; private OptimizingExpiringExecutor optimizingExpiringExecutor; @@ -32,6 +33,12 @@ public void setup(TableManager tableManager, Configurations conf) { new OrphanFilesCleaningExecutor( tableManager, conf.getInteger(ArcticManagementConf.CLEAN_ORPHAN_FILES_THREAD_COUNT)); } + if (conf.getBoolean(ArcticManagementConf.CLEAN_DANGLING_DELETE_FILES_ENABLED)) { + this.danglingDeleteFilesCleaningExecutor = + new DanglingDeleteFilesCleaningExecutor( + tableManager, + conf.getInteger(ArcticManagementConf.CLEAN_DANGLING_DELETE_FILES_THREAD_COUNT)); + } this.optimizingCommitExecutor = new OptimizingCommitExecutor( tableManager, conf.getInteger(ArcticManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT)); @@ -75,6 +82,10 @@ public OrphanFilesCleaningExecutor getOrphanFilesCleaningExecutor() { return orphanFilesCleaningExecutor; } + public DanglingDeleteFilesCleaningExecutor getDanglingDeleteFilesCleaningExecutor() { + return danglingDeleteFilesCleaningExecutor; + } + public BlockerExpiringExecutor getBlockerExpiringExecutor() { return blockerExpiringExecutor; } diff --git a/ams/server/src/main/java/com/netease/arctic/server/table/executor/DanglingDeleteFilesCleaningExecutor.java b/ams/server/src/main/java/com/netease/arctic/server/table/executor/DanglingDeleteFilesCleaningExecutor.java new file mode 100644 index 0000000000..30bcce52f1 --- /dev/null +++ b/ams/server/src/main/java/com/netease/arctic/server/table/executor/DanglingDeleteFilesCleaningExecutor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.table.executor; + +import static com.netease.arctic.server.optimizing.maintainer.TableMaintainer.ofTable; + +import com.netease.arctic.AmoroTable; +import com.netease.arctic.server.optimizing.maintainer.TableMaintainer; +import com.netease.arctic.server.table.TableConfiguration; +import com.netease.arctic.server.table.TableManager; +import com.netease.arctic.server.table.TableRuntime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Clean table dangling delete files */ +public class DanglingDeleteFilesCleaningExecutor extends BaseTableExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(OrphanFilesCleaningExecutor.class); + + private static final long INTERVAL = 24 * 60 * 60 * 1000L; + + protected DanglingDeleteFilesCleaningExecutor(TableManager tableManager, int poolSize) { + super(tableManager, poolSize); + } + + @Override + protected long getNextExecutingTime(TableRuntime tableRuntime) { + return INTERVAL; + } + + @Override + protected boolean enabled(TableRuntime tableRuntime) { + return tableRuntime.getTableConfiguration().isDeleteDanglingDeleteFilesEnabled(); + } + + @Override + public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) { + scheduleIfNecessary(tableRuntime, getStartDelay()); + } + + @Override + protected void execute(TableRuntime tableRuntime) { + try { + LOG.info("{} start cleaning dangling delete files", tableRuntime.getTableIdentifier()); + AmoroTable amoroTable = loadTable(tableRuntime); + TableMaintainer tableMaintainer = ofTable(amoroTable); + tableMaintainer.cleanDanglingDeleteFiles(tableRuntime); + } catch (Throwable t) { + LOG.error("{} failed to clean dangling delete file", tableRuntime.getTableIdentifier(), t); + } + } +} diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java index 3942e3fbfb..7edbcbdc98 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java @@ -100,7 +100,7 @@ public void cleanDanglingDeleteFiles() throws IOException { .commit(); assertDanglingDeleteFiles(testTable, 1); - MixedTableMaintainer tableMaintainer = new MixedTableMaintainer(testTable); + IcebergTableMaintainer tableMaintainer = new IcebergTableMaintainer(testTable); tableMaintainer.cleanDanglingDeleteFiles(); assertDanglingDeleteFiles(testTable, 0);