Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26076 Support favoredNodes when do compaction offload #3468

Merged
merged 5 commits into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -42,7 +41,6 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;

@InterfaceAudience.Private
public class CSRpcServices extends AbstractRpcServices
Expand Down Expand Up @@ -96,12 +94,11 @@ public CompactResponse requestCompaction(RpcController controller,
ColumnFamilyDescriptor cfd = ProtobufUtil.toColumnFamilyDescriptor(request.getFamily());
boolean major = request.getMajor();
int priority = request.getPriority();
List<HBaseProtos.ServerName> favoredNodes = Collections.singletonList(request.getServer());
LOG.info("Receive compaction request from {}", ProtobufUtil.toString(request));
CompactionTask compactionTask =
CompactionTask.newBuilder().setRsServerName(rsServerName).setRegionInfo(regionInfo)
.setColumnFamilyDescriptor(cfd).setRequestMajor(major).setPriority(priority)
.setFavoredNodes(favoredNodes).setSubmitTime(System.currentTimeMillis()).build();
CompactionTask compactionTask = CompactionTask.newBuilder().setRsServerName(rsServerName)
.setRegionInfo(regionInfo).setColumnFamilyDescriptor(cfd).setRequestMajor(major)
.setPriority(priority).setFavoredNodes(request.getFavoredNodesList())
.setSubmitTime(System.currentTimeMillis()).build();
try {
compactionServer.compactionThreadManager.requestCompaction(compactionTask);
return CompactionProtos.CompactResponse.newBuilder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,27 @@ public void requestCompaction(CompactionTask compactionTask) throws IOException
}
}

/**
* Open store, and clean stale compacted file in cache
*/
private HStore openStore(RegionInfo regionInfo, ColumnFamilyDescriptor cfd, boolean major,
Apache9 marked this conversation as resolved.
Show resolved Hide resolved
MonitoredTask status) throws IOException {
status.setStatus("Open store");
HStore store = getStore(conf, server.getFileSystem(), rootDir,
tableDescriptors.get(regionInfo.getTable()), regionInfo, cfd.getNameAsString());
// handle TTL case
store.removeUnneededFiles(false);
// CompactedHFilesDischarger only run on regionserver, so compactionserver does not have
// opportunity to clean compacted file at that time, we clean compacted files here
compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
store.getStorefiles().stream().map(sf -> sf.getPath().getName()).collect(Collectors.toSet()));
if (major) {
status.setStatus("Trigger major compaction");
store.triggerMajorCompaction();
}
return store;
}

private void selectFileAndExecuteTask(CompactionTask compactionTask) throws IOException {
ServerName rsServerName = compactionTask.getRsServerName();
RegionInfo regionInfo = compactionTask.getRegionInfo();
Expand All @@ -169,10 +190,10 @@ private void selectFileAndExecuteTask(CompactionTask compactionTask) throws IOEx
// the three has consistent state, we need this condition to guarantee correct selection
synchronized (compactionFilesCache.getCompactedStoreFilesAsLock(regionInfo, cfd)) {
synchronized (compactionFilesCache.getSelectedStoreFilesAsLock(regionInfo, cfd)) {
Pair<HStore, Optional<CompactionContext>> pair = selectCompaction(regionInfo, cfd,
store = openStore(regionInfo, cfd, compactionTask.isRequestMajor(), status);
store.assignFavoredNodesForCompactionOffload(compactionTask.getFavoredNodes());
Optional<CompactionContext> compaction = selectCompaction(store, regionInfo, cfd,
compactionTask.isRequestMajor(), compactionTask.getPriority(), status, logStr);
store = pair.getFirst();
Optional<CompactionContext> compaction = pair.getSecond();
if (!compaction.isPresent()) {
store.close();
LOG.info("Compaction context is empty: {}", compactionTask);
Expand Down Expand Up @@ -204,26 +225,12 @@ private void selectFileAndExecuteTask(CompactionTask compactionTask) throws IOEx
}

/**
* Open store, and select compaction context
* @return Store and CompactionContext
* select compaction context
* @return CompactionContext
*/
Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo regionInfo,
Optional<CompactionContext> selectCompaction(HStore store, RegionInfo regionInfo,
ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask status, String logStr)
throws IOException {
status.setStatus("Open store");
tableDescriptors.get(regionInfo.getTable());
HStore store = getStore(conf, server.getFileSystem(), rootDir,
tableDescriptors.get(regionInfo.getTable()), regionInfo, cfd.getNameAsString());
// handle TTL case
store.removeUnneededFiles(false);
// CompactedHFilesDischarger only run on regionserver, so compactionserver does not have
// opportunity to clean compacted file at that time, we clean compacted files here
compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
store.getStorefiles().stream().map(sf -> sf.getPath().getName()).collect(Collectors.toSet()));
if (major) {
status.setStatus("Trigger major compaction");
store.triggerMajorCompaction();
}
// get current compacting and compacted files, NOTE: these files are file names only, don't
// include paths.
status.setStatus("Get current compacting and compacted files from compactionFilesCache");
Expand All @@ -243,7 +250,7 @@ Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo regionInfo
CompactionLifeCycleTracker.DUMMY, null, excludeStoreFiles);
LOG.info("After select store: {}, if compaction context is present: {}", logStr,
compaction.isPresent());
return new Pair<>(store, compaction);
return compaction;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3758,16 +3758,18 @@ public boolean requestCompactRegion(RegionInfo regionInfo, ColumnFamilyDescripto
}
CompactionService.BlockingInterface cms = cmsStub;
InetSocketAddress[] favoredNodesForRegion =
getFavoredNodesForRegion(regionInfo.getEncodedName());
CompactRequest.Builder builder =
CompactRequest.newBuilder().setServer(ProtobufUtil.toServerName(getServerName()))
getFavoredNodesForRegion(regionInfo.getEncodedName());
CompactRequest.Builder builder = CompactRequest.newBuilder()
.setServer(ProtobufUtil.toServerName(getServerName()))
.setRegionInfo(ProtobufUtil.toRegionInfo(regionInfo))
.setFamily(ProtobufUtil.toColumnFamilySchema(cfd)).setMajor(major).setPriority(priority);
if (favoredNodesForRegion != null) {
if (favoredNodesForRegion != null && favoredNodesForRegion.length > 0) {
for (InetSocketAddress address : favoredNodesForRegion) {
builder.addFavoredNodes(ProtobufUtil
.toServerName(ServerName.valueOf(address.getHostName(), address.getPort(), 0L)));
.toServerName(ServerName.valueOf(address.getHostName(), address.getPort(), 0L)));
}
} else {
builder.addFavoredNodes(ProtobufUtil.toServerName(getServerName()));
}
CompactRequest compactRequest = builder.build();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;

/**
Expand Down Expand Up @@ -344,14 +345,29 @@ private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throw
}

private InetSocketAddress[] getFavoredNodes() {
InetSocketAddress[] favoredNodes = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the code still compile after you remove this line? This is a behavior change? We have a favoredNodes class field in HStore?

if (region.getRegionServerServices() != null) {
favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
region.getRegionInfo().getEncodedName());
return region.getRegionServerServices()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this means, if on a region server, we will get the favored nodes from region server, otherwise, use the one set by compaction server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the logic is like this

.getFavoredNodesForRegion(region.getRegionInfo().getEncodedName());
}
return favoredNodes;
}

// Favored nodes used by compaction offload
private InetSocketAddress[] favoredNodes = null;
Apache9 marked this conversation as resolved.
Show resolved Hide resolved

// This method is not thread safe.
// We initialize a new store everytime for a compaction request when compaction offload.
// So the method is only called once after initializeStoreContext and before real do compaction.
public void assignFavoredNodesForCompactionOffload(List<HBaseProtos.ServerName> favoredNodes) {
if (CollectionUtils.isNotEmpty(favoredNodes)) {
this.favoredNodes = new InetSocketAddress[favoredNodes.size()];
for (int i = 0; i < favoredNodes.size(); i++) {
this.favoredNodes[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
favoredNodes.get(i).getPort());
}
}
}

/**
* @return MemStore Instance to use in this store.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.compactionserver;

import java.net.InetSocketAddress;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.TestRegionFavoredNodes;
import org.apache.hadoop.hbase.testclassification.CompactionServerTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ CompactionServerTests.class, MediumTests.class })
public class TestRegionFavoredNodesWhenCompactOffload extends TestRegionFavoredNodes {
private static HCompactionServer COMPACTION_SERVER;
private static final int FLUSHES = 10;

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionFavoredNodesWhenCompactOffload.class);

@BeforeClass
public static void setUpBeforeClass() throws Exception {
try {
checkFileSystemWithFavoredNode();
} catch (NoSuchMethodException nm) {
return;
}
TableDescriptor tableDescriptor =
TableDescriptorBuilder.newBuilder(TABLE_NAME).setCompactionOffloadEnabled(true).build();
TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numCompactionServers(1)
.numDataNodes(REGION_SERVERS).numRegionServers(REGION_SERVERS).build());
TEST_UTIL.getAdmin().switchCompactionOffload(true);
TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
table = TEST_UTIL.createTable(tableDescriptor, Bytes.toByteArrays(COLUMN_FAMILY),
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE, TEST_UTIL.getConfiguration());
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
COMPACTION_SERVER = TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0)
.getCompactionServer();
}

@Test
public void testFavoredNodes() throws Exception {
Assume.assumeTrue(createWithFavoredNode != null);
InetSocketAddress[] nodes = getDataNodes();
String[] nodeNames = new String[REGION_SERVERS];
for (int i = 0; i < REGION_SERVERS; i++) {
nodeNames[i] = nodes[i].getAddress().getHostAddress() + ":" + nodes[i].getPort();
}
updateFavoredNodes(nodes);
// Write some data to each region and flush. Repeat some number of times to
// get multiple files for each region.
for (int i = 0; i < FLUSHES; i++) {
TEST_UTIL.loadTable(table, COLUMN_FAMILY, false);
TEST_UTIL.flush();
}
TEST_UTIL.compact(TABLE_NAME, true);
TEST_UTIL.waitFor(60000, () -> {
int hFileCount = 0;
for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) {
hFileCount += region.getStore(COLUMN_FAMILY).getStorefilesCount();

}
return hFileCount == HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE.length + 1;
});
checkFavoredNodes(nodeNames);
// To ensure do compaction on compaction server
TEST_UTIL.waitFor(60000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
}

}
Loading