Skip to content

Commit

Permalink
HBASE-26271 Cleanup the broken store files under data directory (#3786)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Josh Elser <elserj@apache.org>
Signed-off-by: Wellington Ramos Chevreuil <wchevreuil@apache.org>
  • Loading branch information
BukrosSzabolcs authored and joshelser committed Dec 22, 2021

Verified

This commit was signed with the committer’s verified signature.
lann Lann
1 parent 8bec26e commit a288365
Showing 23 changed files with 556 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -38,7 +38,6 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
@@ -286,7 +285,6 @@ private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throw
* </ol>
* @param fd File details
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @param throughputController The compaction throughput controller.
@@ -295,7 +293,7 @@ private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throw
* @return Whether compaction ended; false if it was interrupted for any reason.
*/
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
long bytesWrittenProgressForLog = 0;
@@ -665,7 +663,7 @@ private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId


@Override
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
protected List<Path> commitWriter(FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
Original file line number Diff line number Diff line change
@@ -110,7 +110,11 @@ public List<Path> abortWriters() {
return paths;
}

protected abstract Collection<StoreFileWriter> writers();
/**
* Returns all writers. This is used to prevent deleting currently writen storefiles
* during cleanup.
*/
public abstract Collection<StoreFileWriter> writers();

/**
* Subclasses override this method to be called at the end of a successful sequence of append; all
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This Chore, every time it runs, will clear the unsused HFiles in the data
* folder.
*/
@InterfaceAudience.Private
public class BrokenStoreFileCleaner extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(BrokenStoreFileCleaner.class);
public static final String BROKEN_STOREFILE_CLEANER_ENABLED =
"hbase.region.broken.storefilecleaner.enabled";
public static final boolean DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED = false;
public static final String BROKEN_STOREFILE_CLEANER_TTL =
"hbase.region.broken.storefilecleaner.ttl";
public static final long DEFAULT_BROKEN_STOREFILE_CLEANER_TTL = 1000 * 60 * 60 * 12; //12h
public static final String BROKEN_STOREFILE_CLEANER_DELAY =
"hbase.region.broken.storefilecleaner.delay";
public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY = 1000 * 60 * 60 * 2; //2h
public static final String BROKEN_STOREFILE_CLEANER_DELAY_JITTER =
"hbase.region.broken.storefilecleaner.delay.jitter";
public static final double DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER = 0.25D;
public static final String BROKEN_STOREFILE_CLEANER_PERIOD =
"hbase.region.broken.storefilecleaner.period";
public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD = 1000 * 60 * 60 * 6; //6h

private HRegionServer regionServer;
private final AtomicBoolean enabled = new AtomicBoolean(true);
private long fileTtl;

public BrokenStoreFileCleaner(final int delay, final int period, final Stoppable stopper,
Configuration conf, HRegionServer regionServer) {
super("BrokenStoreFileCleaner", stopper, period, delay);
this.regionServer = regionServer;
setEnabled(
conf.getBoolean(BROKEN_STOREFILE_CLEANER_ENABLED, DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED));
fileTtl = conf.getLong(BROKEN_STOREFILE_CLEANER_TTL, DEFAULT_BROKEN_STOREFILE_CLEANER_TTL);
}

public boolean setEnabled(final boolean enabled) {
return this.enabled.getAndSet(enabled);
}

public boolean getEnabled() {
return this.enabled.get();
}

@Override
public void chore() {
if (getEnabled()) {
long start = EnvironmentEdgeManager.currentTime();
AtomicLong deletedFiles = new AtomicLong(0);
AtomicLong failedDeletes = new AtomicLong(0);
for (HRegion region : regionServer.getRegions()) {
for (HStore store : region.getStores()) {
//only do cleanup in stores not using tmp directories
if (store.getStoreEngine().requireWritingToTmpDirFirst()) {
continue;
}
Path storePath =
new Path(region.getRegionFileSystem().getRegionDir(), store.getColumnFamilyName());

try {
List<FileStatus> fsStoreFiles =
Arrays.asList(region.getRegionFileSystem().fs.listStatus(storePath));
fsStoreFiles.forEach(
file -> cleanFileIfNeeded(file, store, deletedFiles, failedDeletes));
} catch (IOException e) {
LOG.warn("Failed to list files in {}, cleanup is skipped there",storePath);
continue;
}
}
}
LOG.debug(
"BrokenStoreFileCleaner on {} run for: {}ms. It deleted {} files and tried but failed "
+ "to delete {}",
regionServer.getServerName().getServerName(), EnvironmentEdgeManager.currentTime() - start,
deletedFiles.get(), failedDeletes.get());
} else {
LOG.trace("Broken storefile Cleaner chore disabled! Not cleaning.");
}
}

private void cleanFileIfNeeded(FileStatus file, HStore store,
AtomicLong deletedFiles, AtomicLong failedDeletes) {
if(file.isDirectory()){
LOG.trace("This is a Directory {}, skip cleanup", file.getPath());
return;
}

if(!validate(file.getPath())){
LOG.trace("Invalid file {}, skip cleanup", file.getPath());
return;
}

if(!isOldEnough(file)){
LOG.trace("Fresh file {}, skip cleanup", file.getPath());
return;
}

if(isActiveStorefile(file, store)){
LOG.trace("Actively used storefile file {}, skip cleanup", file.getPath());
return;
}

// Compacted files can still have readers and are cleaned by a separate chore, so they have to
// be skipped here
if(isCompactedFile(file, store)){
LOG.trace("Cleanup is done by a different chore for file {}, skip cleanup", file.getPath());
return;
}

if(isCompactionResultFile(file, store)){
LOG.trace("The file is the result of an ongoing compaction {}, skip cleanup", file.getPath());
return;
}

deleteFile(file, store, deletedFiles, failedDeletes);
}

private boolean isCompactionResultFile(FileStatus file, HStore store) {
return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath());
}

// Compacted files can still have readers and are cleaned by a separate chore, so they have to
// be skipped here
private boolean isCompactedFile(FileStatus file, HStore store) {
return store.getStoreEngine().getStoreFileManager().getCompactedfiles().stream()
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
}

private boolean isActiveStorefile(FileStatus file, HStore store) {
return store.getStoreEngine().getStoreFileManager().getStorefiles().stream()
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
}

boolean validate(Path file) {
if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
return true;
}
return StoreFileInfo.validateStoreFileName(file.getName());
}

boolean isOldEnough(FileStatus file){
return file.getModificationTime() + fileTtl < EnvironmentEdgeManager.currentTime();
}

private void deleteFile(FileStatus file, HStore store, AtomicLong deletedFiles,
AtomicLong failedDeletes) {
Path filePath = file.getPath();
LOG.debug("Removing {} from store", filePath);
try {
boolean success = store.getFileSystem().delete(filePath, false);
if (!success) {
failedDeletes.incrementAndGet();
LOG.warn("Attempted to delete:" + filePath
+ ", but couldn't. Attempt to delete on next pass.");
}
else{
deletedFiles.incrementAndGet();
}
} catch (IOException e) {
e = e instanceof RemoteException ?
((RemoteException)e).unwrapRemoteException() : e;
LOG.warn("Error while deleting: " + filePath, e);
}
}

}
Original file line number Diff line number Diff line change
@@ -71,7 +71,7 @@ public void append(Cell cell) throws IOException {
}

@Override
protected Collection<StoreFileWriter> writers() {
public Collection<StoreFileWriter> writers() {
return lowerBoundary2Writer.values();
}

Original file line number Diff line number Diff line change
@@ -609,7 +609,7 @@ public Path commitDaughterRegion(final RegionInfo regionInfo, List<Path> allRegi
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
}
return regionDir;
}
Original file line number Diff line number Diff line change
@@ -432,6 +432,8 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
*/
final ServerNonceManager nonceManager;

private BrokenStoreFileCleaner brokenStoreFileCleaner;

@InterfaceAudience.Private
CompactedHFilesDischarger compactedFileDischarger;

@@ -1835,6 +1837,9 @@ private void startServices() throws IOException {
if (this.slowLogTableOpsChore != null) {
choreService.scheduleChore(slowLogTableOpsChore);
}
if (this.brokenStoreFileCleaner != null) {
choreService.scheduleChore(brokenStoreFileCleaner);
}

// Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit.
@@ -1914,6 +1919,22 @@ private void initializeThreads() {
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
onlyMetaRefresh, this, this);
}

int brokenStoreFileCleanerPeriod = conf.getInt(
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
int brokenStoreFileCleanerDelay = conf.getInt(
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
double brokenStoreFileCleanerDelayJitter = conf.getDouble(
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
double jitterRate = (RandomUtils.nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
this.brokenStoreFileCleaner =
new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
brokenStoreFileCleanerPeriod, this, conf, this);

registerConfigurationObservers();
}

@@ -3488,6 +3509,11 @@ protected boolean clusterMode() {
return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
}

@InterfaceAudience.Private
public BrokenStoreFileCleaner getBrokenStoreFileCleaner(){
return brokenStoreFileCleaner;
}

@Override
protected void stopChores() {
shutdownChore(nonceManagerChore);
@@ -3498,5 +3524,6 @@ protected void stopChores() {
shutdownChore(storefileRefresher);
shutdownChore(fsUtilizationChore);
shutdownChore(slowLogTableOpsChore);
shutdownChore(brokenStoreFileCleaner);
}
}
Original file line number Diff line number Diff line change
@@ -1156,6 +1156,12 @@ protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
}
}
replaceStoreFiles(filesToCompact, sfs, true);

// This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
// CleanerChore know that compaction is done and the file can be cleaned up if compaction
// have failed.
storeEngine.resetCompactionWriter();

if (cr.isMajor()) {
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
Original file line number Diff line number Diff line change
@@ -42,9 +42,11 @@
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -532,6 +534,25 @@ public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
}
}

/**
* Whether the implementation of the used storefile tracker requires you to write to temp
* directory first, i.e, does not allow broken store files under the actual data directory.
*/
public boolean requireWritingToTmpDirFirst() {
return storeFileTracker.requireWritingToTmpDirFirst();
}

/**
* Resets the compaction writer when the new file is committed and used as active storefile.
* This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
* CleanerChore know that compaction is done and the file can be cleaned up if compaction
* have failed. Currently called in
* @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List)
*/
public void resetCompactionWriter(){
compactor.resetWriter();
}

@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
allowedOnPath = ".*/TestHStore.java")
ReadWriteLock getLock() {
Original file line number Diff line number Diff line change
@@ -58,7 +58,7 @@ public void setNoStripeMetadata() {
}

@Override
protected Collection<StoreFileWriter> writers() {
public Collection<StoreFileWriter> writers() {
return existingWriters;
}

Loading

0 comments on commit a288365

Please sign in to comment.