Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-24396 : RetryCounter#sleepUntilNextRetry and ThrottledInputStre… #1765

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;

/**
* The ThrottleInputStream provides bandwidth throttling on a specified
Expand Down Expand Up @@ -143,14 +144,10 @@ static long calSleepTimeMs(long bytesRead, long maxBytesPerSec, long elapsed) {
}
}

private void throttle() throws InterruptedIOException {
private void throttle() {
long sleepTime = calSleepTimeMs();
totalSleepTime += sleepTime;
try {
TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException e) {
throw new InterruptedIOException("Thread aborted");
}
Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to this jira? When during the throttle sleep, if we interrupt this thread, it would have come out of sleep by throwing an IOE as per the current code. But a call to sleepUninterruptibly will make sure the thread is in sleep state for that much time. The interrupt might be for a genuine case to stop the running thread. Now this change will make it such that even if been interrupted, the thread will still continue to be executed!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, throttle()'s purpose should ideally be to throttle without any interruption.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;

/**
* Operation retry accounting.
Expand Down Expand Up @@ -177,11 +178,11 @@ public int getMaxAttempts() {
/**
* Sleep for a back off time as supplied by the backoff policy, and increases the attempts
*/
public void sleepUntilNextRetry() throws InterruptedException {
public void sleepUntilNextRetry() {
int attempts = getAttemptTimes();
long sleepTime = getBackoffTime();
LOG.trace("Sleeping {} ms before retry #{}...", sleepTime, attempts);
retryConfig.getTimeUnit().sleep(sleepTime);
Uninterruptibles.sleepUninterruptibly(sleepTime, retryConfig.getTimeUnit());
useRetry();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class TestRetryCounter {
private static final Logger LOG = LoggerFactory.getLogger(TestRetryCounter.class);

@Test
public void testBasics() throws InterruptedException {
public void testBasics() {
int maxAttempts = 10;
RetryCounterFactory factory =
new RetryCounterFactory(maxAttempts, 10, 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,7 @@ private Pair<Integer, String> execWithRetries(String hostname, ServiceType servi
} catch (IOException e) {
retryOrThrow(retryCounter, e, hostname, cmd);
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ex) {
// ignore
LOG.warn("Sleep Interrupted:", ex);
}
retryCounter.sleepUntilNextRetry();
}
}

Expand Down Expand Up @@ -407,12 +402,7 @@ public Pair<Integer, String> execSudoWithRetries(String hostname, long timeout,
} catch (IOException e) {
retryOrThrow(retryCounter, e, hostname, cmd);
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ex) {
// ignore
LOG.warn("Sleep Interrupted:", ex);
}
retryCounter.sleepUntilNextRetry();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,11 +513,7 @@ private <T> T executeWithRetries(final Callable<T> callable) {
throw new RuntimeException("retries exhausted", e);
}
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
retryCounter.sleepUntilNextRetry();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also we are changing the behave. Previously throw RTE when interrupted. But now this is been changed Main thing is even if interrupted, the RetryCounter will make sure the thread been slept for the specified time (Which might not be really wanted some times)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the purpose of retryCounter.sleepUntilNextRetry() should be uninterrupted sleep because RetryCounter is mainly being used by retries with sleeps and retries with different backoff policies. In such scenario, RetryCounter being a library should not ideally throw InterruptedException even if sleep is interrupted because it is being retried by clients to achieve certain tasks.

}
}

Expand All @@ -535,11 +531,7 @@ private void waitFor(final Callable<Boolean> predicate) {
throw new RuntimeException("retries exhausted", e);
}
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
retryCounter.sleepUntilNextRetry();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,7 @@ private void loadMetaLocationsFromZk(RetryCounter retryCounter, ZNodeOpType opTy
watcher.abort("Error populating meta locations", ke);
return;
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ie) {
LOG.error("Interrupted while loading meta locations from ZK", ie);
Thread.currentThread().interrupt();
return;
}
retryCounter.sleepUntilNextRetry();
}
}
if (znodes == null || znodes.isEmpty()) {
Expand Down Expand Up @@ -182,12 +176,7 @@ private void updateMetaLocation(String path, ZNodeOpType opType) {
LOG.warn("Error getting meta location for path {}. Retries exhausted.", path, e);
break;
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
retryCounter.sleepUntilNextRetry();
}
}
if (location == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,7 @@ void triggerFlushInPrimaryRegion(final HRegion region) throws IOException {
ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
.getRegionNameAsString(),
region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes(), e);
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You did not get what I was saying I believe.
This is triggerFlushInPrimaryRegion() method and u can see a while loop within which the call happening. The while loop is to be terminated once this RS is set to be stopped/abort. When such happens, there might be many non daemon threads running within the server. Our logic in different places will interrupt these threads. so if the thread is sleeping or waiting it will get InterruptedException and allow the thread NOT to continue in running/waiting state. The logic should be checking the server status and allow to come out of loops etc.
But your change will make it such that even if the main thread interrupt this thread, it will continue to sleep for the specified time. That is totally against our intent.

Copy link
Contributor Author

@virajjasani virajjasani May 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok got it. Yes this example makes sense. Are you saying that similar to this one, all the other places also need to handle Interruptions and there is no need to have uninterrupted sleep during client side retries?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not check all places.. Most of the places where the RetryCounter is being used is in test I can see. But within RetryCounter we should not change. Tomorrow some other code path might use it too. IMO we can just keep the code as is. Let the calling part handle the InterruptedException the way they want. we can not generalise it. So just close this Jira Viraj

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okk just saw that many places are in asynchronously getting executed in separate threads from main thread and if this is how interruptions were planned to be handled, it's fine. No need to make change.

counter.sleepUntilNextRetry();
} catch (InterruptedException e1) {
throw new InterruptedIOException(e1.getMessage());
}
counter.sleepUntilNextRetry();
continue;
}

Expand Down Expand Up @@ -188,11 +184,7 @@ void triggerFlushInPrimaryRegion(final HRegion region) throws IOException {
break;
}
}
try {
counter.sleepUntilNextRetry();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
}
counter.sleepUntilNextRetry();
}
region.setReadsEnabled(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,7 @@ public boolean replicate(ReplicateContext replicateContext) {
if (!retryCounter.shouldRetry()) {
return false;
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException e1) {
// restore the interrupted state
Thread.currentThread().interrupt();
return false;
}
retryCounter.sleepUntilNextRetry();
continue outer;
}
if (!requiresReplication(tableDesc, entry)) {
Expand Down Expand Up @@ -372,13 +366,7 @@ public boolean replicate(ReplicateContext replicateContext) {
if (!retryCounter.shouldRetry()) {
return false;
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException e) {
// restore the interrupted state
Thread.currentThread().interrupt();
return false;
}
retryCounter.sleepUntilNextRetry();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,14 +457,8 @@ private FSDataOutputStream createFileWithRetries(final FileSystem fs,
+ retryCounter.getMaxAttempts());
LOG.debug("Failed to create lock file " + hbckLockFilePath.getName(),
ioe);
try {
exception = ioe;
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ie) {
throw (InterruptedIOException) new InterruptedIOException(
"Can't create lock file " + hbckLockFilePath.getName())
.initCause(ie);
}
exception = ioe;
retryCounter.sleepUntilNextRetry();
}
} while (retryCounter.shouldRetry());

Expand Down Expand Up @@ -518,14 +512,7 @@ private void unlockHbck() {
+ (retryCounter.getAttemptTimes() + 1) + " of "
+ retryCounter.getMaxAttempts());
LOG.debug("Failed to delete " + HBCK_LOCK_PATH, ioe);
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while deleting lock file" +
HBCK_LOCK_PATH);
return;
}
retryCounter.sleepUntilNextRetry();
}
} while (retryCounter.shouldRetry());
}
Expand Down Expand Up @@ -771,12 +758,7 @@ private boolean setMasterInMaintenanceMode() throws IOException {
LOG.warn("Fail to create znode " + hbckEphemeralNodePath + ", try=" +
(retryCounter.getAttemptTimes() + 1) + " of " + retryCounter.getMaxAttempts());

try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ie) {
throw (InterruptedIOException) new InterruptedIOException(
"Can't create znode " + hbckEphemeralNodePath).initCause(ie);
}
retryCounter.sleepUntilNextRetry();
} while (retryCounter.shouldRetry());
return hbckZodeCreated;
}
Expand Down