Skip to content

Commit

Permalink
[AMORO-2261] Extract the deleting dangling files from the cleaning or…
Browse files Browse the repository at this point in the history
…phan files (#2403)

* [Improvement]: Extract the deleting dangling files from the cleaning orphan files

* [Improvement]: Extract the deleting dangling files from the cleaning orphan files

* [Improvement]: Extract the deleting dangling files from the cleaning orphan files
  • Loading branch information
xujiangfeng001 authored Dec 12, 2023
1 parent 827b92a commit cf9c193
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 4 deletions.
4 changes: 4 additions & 0 deletions ams/dist/src/main/arctic-bin/conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ ams:
enabled: true
thread-count: 10

clean-dangling-delete-files:
enabled: true
thread-count: 10

sync-hive-tables:
enabled: true
thread-count: 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ public class ArcticManagementConf {
.defaultValue(10)
.withDescription("The number of threads used for orphan files cleaning.");

public static final ConfigOption<Boolean> CLEAN_DANGLING_DELETE_FILES_ENABLED =
ConfigOptions.key("clean-dangling-delete-files.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Enable dangling delete files cleaning.");

public static final ConfigOption<Integer> CLEAN_DANGLING_DELETE_FILES_THREAD_COUNT =
ConfigOptions.key("clean-dangling-delete-files.thread-count")
.intType()
.defaultValue(10)
.withDescription("The number of threads used for dangling delete files cleaning.");

public static final ConfigOption<Boolean> SYNC_HIVE_TABLES_ENABLED =
ConfigOptions.key("sync-hive-tables.enabled")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public void startService() throws Exception {
addHandlerChain(AsyncTableExecutors.getInstance().getDataExpiringExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getSnapshotsExpiringExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getOrphanFilesCleaningExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getOptimizingCommitExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getOptimizingExpiringExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getBlockerExpiringExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,16 @@ public void cleanOrphanFiles(TableRuntime tableRuntime) {

// clear metadata files
cleanMetadata(System.currentTimeMillis() - keepTime);
}

@Override
public void cleanDanglingDeleteFiles(TableRuntime tableRuntime) {
TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration();

if (!tableConfiguration.isDeleteDanglingDeleteFilesEnabled()) {
return;
}

// refresh
table.refresh();
Snapshot currentSnapshot = table.currentSnapshot();
java.util.Optional<String> totalDeleteFiles =
java.util.Optional.ofNullable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@
// TODO TableMaintainer should not be in this optimizing.xxx package.
public interface TableMaintainer {

/** Clean table orphan files. Includes: data files, metadata files, dangling delete files. */
/** Clean table orphan files. Includes: data files, metadata files. */
void cleanOrphanFiles(TableRuntime tableRuntime);

/** Clean table dangling delete files. */
default void cleanDanglingDeleteFiles(TableRuntime tableRuntime) {
// DO nothing by default
}
/**
* Expire snapshots. The optimizing based on the snapshot that the current table relies on will
* not expire according to TableRuntime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class AsyncTableExecutors {
private SnapshotsExpiringExecutor snapshotsExpiringExecutor;
private TableRuntimeRefreshExecutor tableRefreshingExecutor;
private OrphanFilesCleaningExecutor orphanFilesCleaningExecutor;
private DanglingDeleteFilesCleaningExecutor danglingDeleteFilesCleaningExecutor;
private BlockerExpiringExecutor blockerExpiringExecutor;
private OptimizingCommitExecutor optimizingCommitExecutor;
private OptimizingExpiringExecutor optimizingExpiringExecutor;
Expand All @@ -32,6 +33,12 @@ public void setup(TableManager tableManager, Configurations conf) {
new OrphanFilesCleaningExecutor(
tableManager, conf.getInteger(ArcticManagementConf.CLEAN_ORPHAN_FILES_THREAD_COUNT));
}
if (conf.getBoolean(ArcticManagementConf.CLEAN_DANGLING_DELETE_FILES_ENABLED)) {
this.danglingDeleteFilesCleaningExecutor =
new DanglingDeleteFilesCleaningExecutor(
tableManager,
conf.getInteger(ArcticManagementConf.CLEAN_DANGLING_DELETE_FILES_THREAD_COUNT));
}
this.optimizingCommitExecutor =
new OptimizingCommitExecutor(
tableManager, conf.getInteger(ArcticManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT));
Expand Down Expand Up @@ -75,6 +82,10 @@ public OrphanFilesCleaningExecutor getOrphanFilesCleaningExecutor() {
return orphanFilesCleaningExecutor;
}

public DanglingDeleteFilesCleaningExecutor getDanglingDeleteFilesCleaningExecutor() {
return danglingDeleteFilesCleaningExecutor;
}

public BlockerExpiringExecutor getBlockerExpiringExecutor() {
return blockerExpiringExecutor;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.server.table.executor;

import static com.netease.arctic.server.optimizing.maintainer.TableMaintainer.ofTable;

import com.netease.arctic.AmoroTable;
import com.netease.arctic.server.optimizing.maintainer.TableMaintainer;
import com.netease.arctic.server.table.TableConfiguration;
import com.netease.arctic.server.table.TableManager;
import com.netease.arctic.server.table.TableRuntime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Clean table dangling delete files */
public class DanglingDeleteFilesCleaningExecutor extends BaseTableExecutor {

private static final Logger LOG = LoggerFactory.getLogger(OrphanFilesCleaningExecutor.class);

private static final long INTERVAL = 24 * 60 * 60 * 1000L;

protected DanglingDeleteFilesCleaningExecutor(TableManager tableManager, int poolSize) {
super(tableManager, poolSize);
}

@Override
protected long getNextExecutingTime(TableRuntime tableRuntime) {
return INTERVAL;
}

@Override
protected boolean enabled(TableRuntime tableRuntime) {
return tableRuntime.getTableConfiguration().isDeleteDanglingDeleteFilesEnabled();
}

@Override
public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) {
scheduleIfNecessary(tableRuntime, getStartDelay());
}

@Override
protected void execute(TableRuntime tableRuntime) {
try {
LOG.info("{} start cleaning dangling delete files", tableRuntime.getTableIdentifier());
AmoroTable<?> amoroTable = loadTable(tableRuntime);
TableMaintainer tableMaintainer = ofTable(amoroTable);
tableMaintainer.cleanDanglingDeleteFiles(tableRuntime);
} catch (Throwable t) {
LOG.error("{} failed to clean dangling delete file", tableRuntime.getTableIdentifier(), t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void cleanDanglingDeleteFiles() throws IOException {
.commit();
assertDanglingDeleteFiles(testTable, 1);

MixedTableMaintainer tableMaintainer = new MixedTableMaintainer(testTable);
IcebergTableMaintainer tableMaintainer = new IcebergTableMaintainer(testTable);
tableMaintainer.cleanDanglingDeleteFiles();

assertDanglingDeleteFiles(testTable, 0);
Expand Down

0 comments on commit cf9c193

Please sign in to comment.