Skip to content

Commit

Permalink
HBASE-26076 Support favoredNodes when do compaction offload (#3468)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
nyl3532016 authored Aug 16, 2021
1 parent 85f0291 commit 68800b5
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -42,7 +41,6 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;

@InterfaceAudience.Private
public class CSRpcServices extends AbstractRpcServices
Expand Down Expand Up @@ -96,12 +94,11 @@ public CompactResponse requestCompaction(RpcController controller,
ColumnFamilyDescriptor cfd = ProtobufUtil.toColumnFamilyDescriptor(request.getFamily());
boolean major = request.getMajor();
int priority = request.getPriority();
List<HBaseProtos.ServerName> favoredNodes = Collections.singletonList(request.getServer());
LOG.info("Receive compaction request from {}", ProtobufUtil.toString(request));
CompactionTask compactionTask =
CompactionTask.newBuilder().setRsServerName(rsServerName).setRegionInfo(regionInfo)
.setColumnFamilyDescriptor(cfd).setRequestMajor(major).setPriority(priority)
.setFavoredNodes(favoredNodes).setSubmitTime(System.currentTimeMillis()).build();
CompactionTask compactionTask = CompactionTask.newBuilder().setRsServerName(rsServerName)
.setRegionInfo(regionInfo).setColumnFamilyDescriptor(cfd).setRequestMajor(major)
.setPriority(priority).setFavoredNodes(request.getFavoredNodesList())
.setSubmitTime(System.currentTimeMillis()).build();
try {
compactionServer.compactionThreadManager.requestCompaction(compactionTask);
return CompactionProtos.CompactResponse.newBuilder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,27 @@ public void requestCompaction(CompactionTask compactionTask) throws IOException
}
}

/**
* Open store, and clean stale compacted file in cache
*/
private HStore openStore(RegionInfo regionInfo, ColumnFamilyDescriptor cfd, boolean major,
MonitoredTask status) throws IOException {
status.setStatus("Open store");
HStore store = getStore(conf, server.getFileSystem(), rootDir,
tableDescriptors.get(regionInfo.getTable()), regionInfo, cfd.getNameAsString());
// handle TTL case
store.removeUnneededFiles(false);
// CompactedHFilesDischarger only run on regionserver, so compactionserver does not have
// opportunity to clean compacted file at that time, we clean compacted files here
compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
store.getStorefiles().stream().map(sf -> sf.getPath().getName()).collect(Collectors.toSet()));
if (major) {
status.setStatus("Trigger major compaction");
store.triggerMajorCompaction();
}
return store;
}

private void selectFileAndExecuteTask(CompactionTask compactionTask) throws IOException {
ServerName rsServerName = compactionTask.getRsServerName();
RegionInfo regionInfo = compactionTask.getRegionInfo();
Expand All @@ -169,10 +190,10 @@ private void selectFileAndExecuteTask(CompactionTask compactionTask) throws IOEx
// the three has consistent state, we need this condition to guarantee correct selection
synchronized (compactionFilesCache.getCompactedStoreFilesAsLock(regionInfo, cfd)) {
synchronized (compactionFilesCache.getSelectedStoreFilesAsLock(regionInfo, cfd)) {
Pair<HStore, Optional<CompactionContext>> pair = selectCompaction(regionInfo, cfd,
store = openStore(regionInfo, cfd, compactionTask.isRequestMajor(), status);
store.assignFavoredNodesForCompactionOffload(compactionTask.getFavoredNodes());
Optional<CompactionContext> compaction = selectCompaction(store, regionInfo, cfd,
compactionTask.isRequestMajor(), compactionTask.getPriority(), status, logStr);
store = pair.getFirst();
Optional<CompactionContext> compaction = pair.getSecond();
if (!compaction.isPresent()) {
store.close();
LOG.info("Compaction context is empty: {}", compactionTask);
Expand Down Expand Up @@ -204,26 +225,12 @@ private void selectFileAndExecuteTask(CompactionTask compactionTask) throws IOEx
}

/**
* Open store, and select compaction context
* @return Store and CompactionContext
* select compaction context
* @return CompactionContext
*/
Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo regionInfo,
Optional<CompactionContext> selectCompaction(HStore store, RegionInfo regionInfo,
ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask status, String logStr)
throws IOException {
status.setStatus("Open store");
tableDescriptors.get(regionInfo.getTable());
HStore store = getStore(conf, server.getFileSystem(), rootDir,
tableDescriptors.get(regionInfo.getTable()), regionInfo, cfd.getNameAsString());
// handle TTL case
store.removeUnneededFiles(false);
// CompactedHFilesDischarger only run on regionserver, so compactionserver does not have
// opportunity to clean compacted file at that time, we clean compacted files here
compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
store.getStorefiles().stream().map(sf -> sf.getPath().getName()).collect(Collectors.toSet()));
if (major) {
status.setStatus("Trigger major compaction");
store.triggerMajorCompaction();
}
// get current compacting and compacted files, NOTE: these files are file names only, don't
// include paths.
status.setStatus("Get current compacting and compacted files from compactionFilesCache");
Expand All @@ -243,7 +250,7 @@ Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo regionInfo
CompactionLifeCycleTracker.DUMMY, null, excludeStoreFiles);
LOG.info("After select store: {}, if compaction context is present: {}", logStr,
compaction.isPresent());
return new Pair<>(store, compaction);
return compaction;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3758,16 +3758,18 @@ public boolean requestCompactRegion(RegionInfo regionInfo, ColumnFamilyDescripto
}
CompactionService.BlockingInterface cms = cmsStub;
InetSocketAddress[] favoredNodesForRegion =
getFavoredNodesForRegion(regionInfo.getEncodedName());
CompactRequest.Builder builder =
CompactRequest.newBuilder().setServer(ProtobufUtil.toServerName(getServerName()))
getFavoredNodesForRegion(regionInfo.getEncodedName());
CompactRequest.Builder builder = CompactRequest.newBuilder()
.setServer(ProtobufUtil.toServerName(getServerName()))
.setRegionInfo(ProtobufUtil.toRegionInfo(regionInfo))
.setFamily(ProtobufUtil.toColumnFamilySchema(cfd)).setMajor(major).setPriority(priority);
if (favoredNodesForRegion != null) {
if (favoredNodesForRegion != null && favoredNodesForRegion.length > 0) {
for (InetSocketAddress address : favoredNodesForRegion) {
builder.addFavoredNodes(ProtobufUtil
.toServerName(ServerName.valueOf(address.getHostName(), address.getPort(), 0L)));
.toServerName(ServerName.valueOf(address.getHostName(), address.getPort(), 0L)));
}
} else {
builder.addFavoredNodes(ProtobufUtil.toServerName(getServerName()));
}
CompactRequest compactRequest = builder.build();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;

/**
Expand Down Expand Up @@ -344,14 +345,29 @@ private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throw
}

private InetSocketAddress[] getFavoredNodes() {
InetSocketAddress[] favoredNodes = null;
if (region.getRegionServerServices() != null) {
favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
region.getRegionInfo().getEncodedName());
return region.getRegionServerServices()
.getFavoredNodesForRegion(region.getRegionInfo().getEncodedName());
}
return favoredNodes;
}

// Favored nodes used by compaction offload
private InetSocketAddress[] favoredNodes = null;

// This method is not thread safe.
// We initialize a new store everytime for a compaction request when compaction offload.
// So the method is only called once after initializeStoreContext and before real do compaction.
public void assignFavoredNodesForCompactionOffload(List<HBaseProtos.ServerName> favoredNodes) {
if (CollectionUtils.isNotEmpty(favoredNodes)) {
this.favoredNodes = new InetSocketAddress[favoredNodes.size()];
for (int i = 0; i < favoredNodes.size(); i++) {
this.favoredNodes[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
favoredNodes.get(i).getPort());
}
}
}

/**
* @return MemStore Instance to use in this store.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.compactionserver;

import java.net.InetSocketAddress;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.TestRegionFavoredNodes;
import org.apache.hadoop.hbase.testclassification.CompactionServerTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ CompactionServerTests.class, MediumTests.class })
public class TestRegionFavoredNodesWhenCompactOffload extends TestRegionFavoredNodes {
private static HCompactionServer COMPACTION_SERVER;
private static final int FLUSHES = 10;

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionFavoredNodesWhenCompactOffload.class);

@BeforeClass
public static void setUpBeforeClass() throws Exception {
try {
checkFileSystemWithFavoredNode();
} catch (NoSuchMethodException nm) {
return;
}
TableDescriptor tableDescriptor =
TableDescriptorBuilder.newBuilder(TABLE_NAME).setCompactionOffloadEnabled(true).build();
TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numCompactionServers(1)
.numDataNodes(REGION_SERVERS).numRegionServers(REGION_SERVERS).build());
TEST_UTIL.getAdmin().switchCompactionOffload(true);
TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
table = TEST_UTIL.createTable(tableDescriptor, Bytes.toByteArrays(COLUMN_FAMILY),
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE, TEST_UTIL.getConfiguration());
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
COMPACTION_SERVER = TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0)
.getCompactionServer();
}

@Test
public void testFavoredNodes() throws Exception {
Assume.assumeTrue(createWithFavoredNode != null);
InetSocketAddress[] nodes = getDataNodes();
String[] nodeNames = new String[REGION_SERVERS];
for (int i = 0; i < REGION_SERVERS; i++) {
nodeNames[i] = nodes[i].getAddress().getHostAddress() + ":" + nodes[i].getPort();
}
updateFavoredNodes(nodes);
// Write some data to each region and flush. Repeat some number of times to
// get multiple files for each region.
for (int i = 0; i < FLUSHES; i++) {
TEST_UTIL.loadTable(table, COLUMN_FAMILY, false);
TEST_UTIL.flush();
}
TEST_UTIL.compact(TABLE_NAME, true);
TEST_UTIL.waitFor(60000, () -> {
int hFileCount = 0;
for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) {
hFileCount += region.getStore(COLUMN_FAMILY).getStorefilesCount();

}
return hFileCount == HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE.length + 1;
});
checkFavoredNodes(nodeNames);
// To ensure do compaction on compaction server
TEST_UTIL.waitFor(60000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
}

}
Loading

0 comments on commit 68800b5

Please sign in to comment.