Skip to content

Commit

Permalink
HBASE-25991 Do compaction on compaction server (#3425)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Michael Stack <stack@apache.org>
  • Loading branch information
nyl3532016 authored Jul 6, 2021
1 parent da0fa30 commit f19f92d
Show file tree
Hide file tree
Showing 22 changed files with 1,475 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.HMaster;
Expand All @@ -37,6 +39,7 @@
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
Expand All @@ -56,6 +59,8 @@
public abstract class AbstractServer extends Thread implements Server {
private static final Logger LOG = LoggerFactory.getLogger(AbstractServer.class);
protected Configuration conf;
protected volatile boolean dataFsOk;
protected HFileSystem dataFs;
// A sleeper that sleeps for msgInterval.
protected Sleeper sleeper;
protected int msgInterval;
Expand Down Expand Up @@ -170,6 +175,7 @@ protected AbstractServer(final Configuration conf, String processName) throws IO
super(processName); // thread name
this.startcode = System.currentTimeMillis();
this.conf = conf;
this.dataFsOk = true;
this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
this.userProvider = UserProvider.instantiate(conf);
this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
Expand Down Expand Up @@ -400,6 +406,28 @@ public void waitForServerOnline(){
}
}

/**
* Checks to see if the file system is still accessible. If not, sets abortRequested and
* stopRequested
* @return false if file system is not available
*/
public boolean checkFileSystem() {
if (this.dataFsOk && this.dataFs != null) {
try {
FSUtils.checkFileSystemAvailable(this.dataFs);
} catch (IOException e) {
abort("File System not available", e);
this.dataFsOk = false;
}
}
return this.dataFsOk;
}

@Override
public FileSystem getFileSystem() {
return dataFs;
}

protected abstract AbstractRpcServices getRpcService();

protected abstract String getProcessName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AbstractRpcServices;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -33,11 +36,13 @@

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;

@InterfaceAudience.Private
public class CSRpcServices extends AbstractRpcServices
Expand All @@ -46,8 +51,6 @@ public class CSRpcServices extends AbstractRpcServices

private final HCompactionServer compactionServer;

// Request counter.
final LongAdder requestCount = new LongAdder();
/** RPC scheduler to use for the compaction server. */
public static final String COMPACTION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
"hbase.compaction.server.rpc.scheduler.factory.class";
Expand Down Expand Up @@ -86,11 +89,25 @@ protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) {
*/
@Override
public CompactResponse requestCompaction(RpcController controller,
CompactionProtos.CompactRequest request) {
requestCount.increment();
CompactionProtos.CompactRequest request) throws ServiceException {
compactionServer.requestCount.increment();
ServerName rsServerName = ProtobufUtil.toServerName(request.getServer());
RegionInfo regionInfo = ProtobufUtil.toRegionInfo(request.getRegionInfo());
ColumnFamilyDescriptor cfd = ProtobufUtil.toColumnFamilyDescriptor(request.getFamily());
boolean major = request.getMajor();
int priority = request.getPriority();
List<HBaseProtos.ServerName> favoredNodes = Collections.singletonList(request.getServer());
LOG.info("Receive compaction request from {}", ProtobufUtil.toString(request));
compactionServer.compactionThreadManager.requestCompaction();
return CompactionProtos.CompactResponse.newBuilder().build();
CompactionTask compactionTask =
CompactionTask.newBuilder().setRsServerName(rsServerName).setRegionInfo(regionInfo)
.setColumnFamilyDescriptor(cfd).setRequestMajor(major).setPriority(priority)
.setFavoredNodes(favoredNodes).setSubmitTime(System.currentTimeMillis()).build();
try {
compactionServer.compactionThreadManager.requestCompaction(compactionTask);
return CompactionProtos.CompactResponse.newBuilder().build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.compactionserver;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;

/**
* Since we do not maintain StoreFileManager in compaction server(can't refresh when flush). we use
* external storage(this class) to record compacting and compacted files. This storage is in memory
* and only used by CompactionServer,(RegionServer not use it).This storage never do hfile movement
* or deletion. RS does file movement still.
*/
@InterfaceAudience.Private
class CompactionFilesCache {
private static Logger LOG = LoggerFactory.getLogger(CompactionFilesCache.class);
private final ConcurrentMap<String, ConcurrentMap<String, Set<String>>> selectedFiles =
new ConcurrentHashMap<>();
private final ConcurrentMap<String, ConcurrentMap<String, Set<String>>> compactedFiles =
new ConcurrentHashMap<>();
/**
* Mark files as completed, called after CS finished compaction and RS accepted the results of
* this compaction, these compacted files will be deleted by RS if no reader referenced to them.
*/
boolean addCompactedFiles(RegionInfo regionInfo, ColumnFamilyDescriptor cfd,
List<String> compactedFiles) {
Set<String> compactedFileSet = getCompactedStoreFilesInternal(regionInfo, cfd);
synchronized (compactedFileSet) {
compactedFileSet.addAll(compactedFiles);
if (LOG.isDebugEnabled()) {
LOG.debug("Mark files as compacted, region: {}, cf, files: {}", regionInfo,
cfd.getNameAsString(), compactedFileSet);
}
}
return true;
}

/**
* Mark files as selected, called after the files are selected and before the compaction is
* started. Avoid a file is selected twice in two compaction.
* @return True if these files don't be selected, false if these files are already selected.
*/
boolean addSelectedFiles(RegionInfo regionInfo, ColumnFamilyDescriptor cfd,
List<String> selectedFiles) {
Set<String> selectedFileSet = getSelectedStoreFilesInternal(regionInfo, cfd);
synchronized (selectedFileSet) {
for (String selectedFile : selectedFiles) {
if (selectedFileSet.contains(selectedFile)) {
return false;
}
}
selectedFileSet.addAll(selectedFiles);
if (LOG.isDebugEnabled()) {
LOG.debug("Mark files are selected, region: {}, cf: {}, files: {}",
regionInfo.getEncodedName(), cfd.getNameAsString(), selectedFiles);
}
}
return true;
}

/**
* Get files which are compacted, called before select files to do compaction. Thread-safe
* @return The files which are compacted
*/
Set<String> getCompactedStoreFiles(RegionInfo regionInfo, ColumnFamilyDescriptor cfd) {
Set<String> compactedFiles = getStoreFiles(this.compactedFiles, regionInfo, cfd);
synchronized (compactedFiles) {
return ImmutableSet.copyOf(compactedFiles);
}
}

/**
* Get files which are compacting, called before select files to do compaction. Thread-safe
* @return The files which are compacting
*/
Set<String> getSelectedStoreFiles(RegionInfo regionInfo, ColumnFamilyDescriptor cfd) {
Set<String> selectedFiles = getStoreFiles(this.selectedFiles, regionInfo, cfd);
synchronized (selectedFiles) {
return ImmutableSet.copyOf(selectedFiles);
}
}

/**
* Get files which are compacted, we use this return object as lock
* @return The files which are compacted as lock object
*/
Object getCompactedStoreFilesAsLock(RegionInfo regionInfo, ColumnFamilyDescriptor cfd) {
return getCompactedStoreFilesInternal(regionInfo, cfd);
}

/**
* Get files which are compacting, we use this return object as lock
* @return The files which are compacting as lock object
*/
Object getSelectedStoreFilesAsLock(RegionInfo regionInfo, ColumnFamilyDescriptor cfd) {
return getSelectedStoreFilesInternal(regionInfo, cfd);
}

/**
* Get files which are compacted, called before select files to do compaction. No-thread-safe
* @return The files which are compacted
*/
private Set<String> getCompactedStoreFilesInternal(RegionInfo regionInfo,
ColumnFamilyDescriptor cfd) {
return getStoreFiles(this.compactedFiles, regionInfo, cfd);
}

/**
* Get files which are compacting, called before select files to do compaction. No-thread-safe
* @return The files which are compacting
*/
private Set<String> getSelectedStoreFilesInternal(RegionInfo regionInfo,
ColumnFamilyDescriptor cfd) {
return getStoreFiles(this.selectedFiles, regionInfo, cfd);
}

private Set<String> getStoreFiles(
ConcurrentMap<String, ConcurrentMap<String, Set<String>>> fileMap, RegionInfo regionInfo,
ColumnFamilyDescriptor cfd) {
String encodedName = regionInfo.getEncodedName();
String family = cfd.getNameAsString();
Map<String, Set<String>> familyFilesMap =
fileMap.computeIfAbsent(encodedName, v -> new ConcurrentHashMap<>());
return familyFilesMap.computeIfAbsent(family, v -> new HashSet<>());
}

/**
* Remove files from selected, called:
* 1. after the compaction is failed;
* 2. after the compaction is finished and report to RS failed;
* 3. after the compaction is finished and report to RS succeeded (and will mark these files
* as compacted).
*/
void removeSelectedFiles(RegionInfo regionInfo, ColumnFamilyDescriptor cfd,
List<String> selectedFiles) {
Set<String> selectedFileSet = getSelectedStoreFilesInternal(regionInfo, cfd);
synchronized (selectedFileSet) {
selectedFileSet.removeAll(selectedFiles);
if (LOG.isDebugEnabled()) {
LOG.debug("Remove files from selected, region: {}, cf: {}, files: {}",
regionInfo.getEncodedName(), cfd.getNameAsString(), selectedFiles);
}
}
}

/**
* Remove compacted files which are already deleted by RS
*/
void cleanupCompactedFiles(RegionInfo regionInfo, ColumnFamilyDescriptor cfd,
Set<String> storeFileNames) {
Set<String> compactedFileSet = getCompactedStoreFilesInternal(regionInfo, cfd);
synchronized (compactedFileSet) {
compactedFileSet.retainAll(storeFileNames);
}
}
}
Loading

0 comments on commit f19f92d

Please sign in to comment.