Skip to content

Commit

Permalink
HBASE-27126 Support multi-threads cleaner for MOB files
Browse files Browse the repository at this point in the history
  • Loading branch information
chandrasekhar-188k committed Jun 8, 2024
1 parent f136f0a commit b482a58
Show file tree
Hide file tree
Showing 4 changed files with 349 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4503,4 +4503,8 @@ protected String getDescription() {
}
});
}

public MobFileCleanerChore getMobFileCleanerChore() {
return mobFileCleanerChore;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ public final class MobConstants {
public static final String MOB_COMPACTION_THREADS_MAX = "hbase.mob.compaction.threads.max";
public static final int DEFAULT_MOB_COMPACTION_THREADS_MAX = 1;

public static final String MOB_CLEANER_THREAD_COUNT = "hbase.master.mob.cleaner.threads";
public static final int DEFAULT_MOB_CLEANER_THREAD_COUNT = 1;
public static final String MOB_FILE_CLEANER_CHORE_TIME_OUT =
"hbase.master.mob.cleaner.chore.timeout";
public static final int DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT = 5 * 60; // 5 minutes

private MobConstants() {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,20 @@
*/
package org.apache.hadoop.hbase.mob;

import static org.apache.hadoop.hbase.mob.MobConstants.DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT;
import static org.apache.hadoop.hbase.mob.MobConstants.MOB_FILE_CLEANER_CHORE_TIME_OUT;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.TableDescriptors;
Expand All @@ -31,6 +42,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* The class MobFileCleanerChore for running cleaner regularly to remove the expired and obsolete
* (files which have no active references to) mob files.
Expand All @@ -39,8 +53,11 @@
public class MobFileCleanerChore extends ScheduledChore {

private static final Logger LOG = LoggerFactory.getLogger(MobFileCleanerChore.class);

private final HMaster master;
private ExpiredMobFileCleaner cleaner;
private final ExpiredMobFileCleaner cleaner;
private final ExecutorService threadPool;
private final int cleanerFutureTimeout;

public MobFileCleanerChore(HMaster master) {
super(master.getServerName() + "-MobFileCleanerChore", master,
Expand All @@ -52,7 +69,19 @@ public MobFileCleanerChore(HMaster master) {
this.master = master;
cleaner = new ExpiredMobFileCleaner();
cleaner.setConf(master.getConfiguration());
int threadCount = master.getConfiguration().getInt(MobConstants.MOB_CLEANER_THREAD_COUNT,
MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT);

ThreadFactory threadFactory =
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("mobfile-cleaner-pool-%d").build();
if (threadCount == 1) {
threadPool = MoreExecutors.newDirectExecutorService();
} else {
threadPool = Executors.newFixedThreadPool(threadCount, threadFactory);
}
checkObsoleteConfigurations();
cleanerFutureTimeout = master.getConfiguration().getInt(MOB_FILE_CLEANER_CHORE_TIME_OUT,
DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT);
}

private void checkObsoleteConfigurations() {
Expand Down Expand Up @@ -83,29 +112,43 @@ protected void chore() {
LOG.error("MobFileCleanerChore failed", e);
return;
}
List<Future> futureList = new ArrayList<>(map.size());
for (TableDescriptor htd : map.values()) {
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
try {
cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd);
} catch (IOException e) {
LOG.error("Failed to clean the expired mob files table={} family={}",
htd.getTableName().getNameAsString(), hcd.getNameAsString(), e);
}
}
}
Future<?> future = threadPool.submit(() -> handleOneTable(htd));
futureList.add(future);
}

for (Future future : futureList) {
try {
// Now clean obsolete files for a table
LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
try (final Admin admin = master.getConnection().getAdmin()) {
MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName(),
admin);
future.get(cleanerFutureTimeout, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.warn("Exception during the execution of MobFileCleanerChore", e);
}
}
}

private void handleOneTable(TableDescriptor htd) {
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
try {
cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd);
} catch (IOException e) {
LOG.error("Failed to clean the expired mob files table={} family={}",
htd.getTableName().getNameAsString(), hcd.getNameAsString(), e);
}
LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName());
} catch (IOException e) {
LOG.error("Failed to clean the obsolete mob files for table={}", htd.getTableName(), e);
}
}
try {
// Now clean obsolete files for a table
LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
try (final Admin admin = master.getConnection().getAdmin()) {
MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName(),
admin);
}
LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName());
} catch (IOException e) {
LOG.error("Failed to clean the obsolete mob files for table={}", htd.getTableName(), e);
}
}

}
Loading

0 comments on commit b482a58

Please sign in to comment.