Skip to content

Commit

Permalink
HBASE-28215: region reopen procedure batching/throttling (#5534)
Browse files Browse the repository at this point in the history
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
  • Loading branch information
rmdmattingly authored Dec 4, 2023
1 parent bc0f7a4 commit 25e9228
Show file tree
Hide file tree
Showing 4 changed files with 458 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
*/
package org.apache.hadoop.hbase.master.procedure;

import static org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT;
import static org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY;
import static org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.PROGRESSIVE_BATCH_SIZE_MAX_DISABLED;
import static org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.PROGRESSIVE_BATCH_SIZE_MAX_KEY;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -26,6 +31,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ConcurrentTableModificationException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
Expand Down Expand Up @@ -219,7 +225,13 @@ protected Flow executeFromState(final MasterProcedureEnv env, final ModifyTableS
break;
case MODIFY_TABLE_REOPEN_ALL_REGIONS:
if (isTableEnabled(env)) {
addChildProcedure(new ReopenTableRegionsProcedure(getTableName()));
Configuration conf = env.getMasterConfiguration();
long backoffMillis = conf.getLong(PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY,
PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT);
int batchSizeMax =
conf.getInt(PROGRESSIVE_BATCH_SIZE_MAX_KEY, PROGRESSIVE_BATCH_SIZE_MAX_DISABLED);
addChildProcedure(
new ReopenTableRegionsProcedure(getTableName(), backoffMillis, batchSizeMax));
}
setNextState(ModifyTableState.MODIFY_TABLE_ASSIGN_NEW_REPLICAS);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.procedure;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -53,6 +54,17 @@ public class ReopenTableRegionsProcedure

private static final Logger LOG = LoggerFactory.getLogger(ReopenTableRegionsProcedure.class);

public static final String PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY =
"hbase.reopen.table.regions.progressive.batch.backoff.ms";
public static final long PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT = 0L;
public static final String PROGRESSIVE_BATCH_SIZE_MAX_KEY =
"hbase.reopen.table.regions.progressive.batch.size.max";
public static final int PROGRESSIVE_BATCH_SIZE_MAX_DISABLED = -1;
private static final int PROGRESSIVE_BATCH_SIZE_MAX_DEFAULT_VALUE = Integer.MAX_VALUE;

// this minimum prevents a max which would break this procedure
private static final int MINIMUM_BATCH_SIZE_MAX = 1;

private TableName tableName;

// Specify specific regions of a table to reopen.
Expand All @@ -61,20 +73,46 @@ public class ReopenTableRegionsProcedure

private List<HRegionLocation> regions = Collections.emptyList();

private List<HRegionLocation> currentRegionBatch = Collections.emptyList();

private RetryCounter retryCounter;

private long reopenBatchBackoffMillis;
private int reopenBatchSize;
private int reopenBatchSizeMax;
private long regionsReopened = 0;
private long batchesProcessed = 0;

public ReopenTableRegionsProcedure() {
regionNames = Collections.emptyList();
this(null);
}

public ReopenTableRegionsProcedure(TableName tableName) {
this.tableName = tableName;
this.regionNames = Collections.emptyList();
this(tableName, Collections.emptyList());
}

public ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]> regionNames) {
this(tableName, regionNames, PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT,
PROGRESSIVE_BATCH_SIZE_MAX_DISABLED);
}

public ReopenTableRegionsProcedure(final TableName tableName, long reopenBatchBackoffMillis,
int reopenBatchSizeMax) {
this(tableName, Collections.emptyList(), reopenBatchBackoffMillis, reopenBatchSizeMax);
}

public ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]> regionNames,
long reopenBatchBackoffMillis, int reopenBatchSizeMax) {
this.tableName = tableName;
this.regionNames = regionNames;
this.reopenBatchBackoffMillis = reopenBatchBackoffMillis;
if (reopenBatchSizeMax == PROGRESSIVE_BATCH_SIZE_MAX_DISABLED) {
this.reopenBatchSize = Integer.MAX_VALUE;
this.reopenBatchSizeMax = Integer.MAX_VALUE;
} else {
this.reopenBatchSize = 1;
this.reopenBatchSizeMax = Math.max(reopenBatchSizeMax, MINIMUM_BATCH_SIZE_MAX);
}
}

@Override
Expand All @@ -87,6 +125,30 @@ public TableOperationType getTableOperationType() {
return TableOperationType.REGION_EDIT;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public long getRegionsReopened() {
return regionsReopened;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public long getBatchesProcessed() {
return batchesProcessed;
}

@RestrictedApi(explanation = "Should only be called internally or in tests", link = "",
allowedOnPath = ".*(/src/test/.*|ReopenTableRegionsProcedure).java")
protected int progressBatchSize() {
int previousBatchSize = reopenBatchSize;
reopenBatchSize = Math.min(reopenBatchSizeMax, 2 * reopenBatchSize);
if (reopenBatchSize < previousBatchSize) {
// the batch size should never decrease. this must be overflow, so just use max
reopenBatchSize = reopenBatchSizeMax;
}
return reopenBatchSize;
}

private boolean canSchedule(MasterProcedureEnv env, HRegionLocation loc) {
if (loc.getSeqNum() < 0) {
return false;
Expand Down Expand Up @@ -114,7 +176,13 @@ protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
return Flow.HAS_MORE_STATE;
case REOPEN_TABLE_REGIONS_REOPEN_REGIONS:
for (HRegionLocation loc : regions) {
// if we didn't finish reopening the last batch yet, let's keep trying until we do.
// at that point, the batch will be empty and we can generate a new batch
if (!regions.isEmpty() && currentRegionBatch.isEmpty()) {
currentRegionBatch = regions.stream().limit(reopenBatchSize).collect(Collectors.toList());
batchesProcessed++;
}
for (HRegionLocation loc : currentRegionBatch) {
RegionStateNode regionNode =
env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion());
// this possible, maybe the region has already been merged or split, see HBASE-20921
Expand All @@ -133,39 +201,72 @@ protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState
regionNode.unlock();
}
addChildProcedure(proc);
regionsReopened++;
}
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_CONFIRM_REOPENED);
return Flow.HAS_MORE_STATE;
case REOPEN_TABLE_REGIONS_CONFIRM_REOPENED:
regions = regions.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened)
.filter(l -> l != null).collect(Collectors.toList());
// update region lists based on what's been reopened
regions = filterReopened(env, regions);
currentRegionBatch = filterReopened(env, currentRegionBatch);

// existing batch didn't fully reopen, so try to resolve that first.
// since this is a retry, don't do the batch backoff
if (!currentRegionBatch.isEmpty()) {
return reopenIfSchedulable(env, currentRegionBatch, false);
}

if (regions.isEmpty()) {
return Flow.NO_MORE_STATE;
}
if (regions.stream().anyMatch(loc -> canSchedule(env, loc))) {
retryCounter = null;
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
return Flow.HAS_MORE_STATE;
}
// We can not schedule TRSP for all the regions need to reopen, wait for a while and retry
// again.
if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.info(
"There are still {} region(s) which need to be reopened for table {} are in "
+ "OPENING state, suspend {}secs and try again later",
regions.size(), tableName, backoff / 1000);
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();

// current batch is finished, schedule more regions
return reopenIfSchedulable(env, regions, true);
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}

private List<HRegionLocation> filterReopened(MasterProcedureEnv env,
List<HRegionLocation> regionsToCheck) {
return regionsToCheck.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened)
.filter(l -> l != null).collect(Collectors.toList());
}

private Flow reopenIfSchedulable(MasterProcedureEnv env, List<HRegionLocation> regionsToReopen,
boolean shouldBatchBackoff) throws ProcedureSuspendedException {
if (regionsToReopen.stream().anyMatch(loc -> canSchedule(env, loc))) {
retryCounter = null;
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
if (shouldBatchBackoff && reopenBatchBackoffMillis > 0) {
progressBatchSize();
setBackoffState(reopenBatchBackoffMillis);
throw new ProcedureSuspendedException();
} else {
return Flow.HAS_MORE_STATE;
}
}

// We can not schedule TRSP for all the regions need to reopen, wait for a while and retry
// again.
if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoffMillis = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.info(
"There are still {} region(s) which need to be reopened for table {}. {} are in "
+ "OPENING state, suspend {}secs and try again later",
regions.size(), tableName, currentRegionBatch.size(), backoffMillis / 1000);
setBackoffState(backoffMillis);
throw new ProcedureSuspendedException();
}

private void setBackoffState(long millis) {
setTimeout(Math.toIntExact(millis));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
}

private List<HRegionLocation>
getRegionLocationsForReopen(List<HRegionLocation> tableRegionsForReopen) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;

import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

/**
* Confirm that we will rate limit reopen batches when reopening all table regions. This can avoid
* the pain associated with reopening too many regions at once.
*/
@Category({ MasterTests.class, MediumTests.class })
public class TestReopenTableRegionsProcedureBatchBackoff {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReopenTableRegionsProcedureBatchBackoff.class);

private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();

private static TableName TABLE_NAME = TableName.valueOf("BatchBackoff");
private static final int BACKOFF_MILLIS_PER_RS = 3_000;
private static final int REOPEN_BATCH_SIZE = 1;

private static byte[] CF = Bytes.toBytes("cf");

@BeforeClass
public static void setUp() throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
UTIL.startMiniCluster(1);
UTIL.createMultiRegionTable(TABLE_NAME, CF, 10);
}

@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}

@Test
public void testRegionBatchBackoff() throws IOException {
ProcedureExecutor<MasterProcedureEnv> procExec =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
List<RegionInfo> regions = UTIL.getAdmin().getRegions(TABLE_NAME);
assertTrue(10 <= regions.size());
ReopenTableRegionsProcedure proc =
new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS, REOPEN_BATCH_SIZE);
procExec.submitProcedure(proc);
Instant startedAt = Instant.now();
ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60_000);
Instant stoppedAt = Instant.now();
assertTrue(Duration.between(startedAt, stoppedAt).toMillis()
> (long) regions.size() * BACKOFF_MILLIS_PER_RS);
}

@Test
public void testRegionBatchNoBackoff() throws IOException {
ProcedureExecutor<MasterProcedureEnv> procExec =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
List<RegionInfo> regions = UTIL.getAdmin().getRegions(TABLE_NAME);
assertTrue(10 <= regions.size());
int noBackoffMillis = 0;
ReopenTableRegionsProcedure proc =
new ReopenTableRegionsProcedure(TABLE_NAME, noBackoffMillis, REOPEN_BATCH_SIZE);
procExec.submitProcedure(proc);
ProcedureSyncWait.waitForProcedureToComplete(procExec, proc,
(long) regions.size() * BACKOFF_MILLIS_PER_RS);
}
}
Loading

0 comments on commit 25e9228

Please sign in to comment.