Skip to content

Commit 4fdfc08

Browse files
committed
HADOOP-16948. Add unit test for acquire lease failure
1 parent b6803cf commit 4fdfc08

File tree

2 files changed

+63
-6
lines changed

2 files changed

+63
-6
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
import java.io.IOException;
2222
import java.util.concurrent.TimeUnit;
2323

24+
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
2425
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback;
2526
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableScheduledFuture;
2627
import org.apache.hadoop.thirdparty.org.checkerframework.checker.nullness.qual.Nullable;
28+
2729
import org.slf4j.Logger;
2830
import org.slf4j.LoggerFactory;
2931

@@ -51,8 +53,10 @@
5153
public final class AbfsLease {
5254
private static final Logger LOG = LoggerFactory.getLogger(AbfsLease.class);
5355

54-
static final int LEASE_ACQUIRE_RETRY_INTERVAL = 10; // Retry interval for acquiring lease in secs
55-
static final int LEASE_ACQUIRE_MAX_RETRIES = 7; // Number of retries for acquiring lease
56+
// Number of retries for acquiring lease
57+
static final int DEFAULT_LEASE_ACQUIRE_MAX_RETRIES = 7;
58+
// Retry interval for acquiring lease in secs
59+
static final int DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL = 10;
5660

5761
private final AbfsClient client;
5862
private final String path;
@@ -61,6 +65,7 @@ public final class AbfsLease {
6165
private volatile boolean leaseFreed;
6266
private volatile String leaseID = null;
6367
private volatile Throwable exception = null;
68+
private volatile int acquireRetryCount = 0;
6469
private volatile ListenableScheduledFuture<AbfsRestOperation> future = null;
6570

6671
public static class LeaseException extends AzureBlobFileSystemException {
@@ -74,6 +79,12 @@ public LeaseException(String s) {
7479
}
7580

7681
public AbfsLease(AbfsClient client, String path) throws AzureBlobFileSystemException {
82+
this(client, path, DEFAULT_LEASE_ACQUIRE_MAX_RETRIES, DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL);
83+
}
84+
85+
@VisibleForTesting
86+
public AbfsLease(AbfsClient client, String path, int acquireMaxRetries,
87+
int acquireRetryInterval) throws AzureBlobFileSystemException {
7788
this.leaseFreed = false;
7889
this.client = client;
7990
this.path = path;
@@ -84,8 +95,8 @@ public AbfsLease(AbfsClient client, String path) throws AzureBlobFileSystemExcep
8495

8596
// Try to get the lease a specified number of times, else throw an error
8697
RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
87-
LEASE_ACQUIRE_MAX_RETRIES, LEASE_ACQUIRE_RETRY_INTERVAL, TimeUnit.SECONDS);
88-
acquireLease(retryPolicy, 0, 0);
98+
acquireMaxRetries, acquireRetryInterval, TimeUnit.SECONDS);
99+
acquireLease(retryPolicy, 0, acquireRetryInterval, 0);
89100

90101
while (leaseID == null && exception == null) {
91102
try {
@@ -103,7 +114,7 @@ public AbfsLease(AbfsClient client, String path) throws AzureBlobFileSystemExcep
103114
LOG.debug("Acquired lease {} on {}", leaseID, path);
104115
}
105116

106-
private void acquireLease(RetryPolicy retryPolicy, int numRetries, long delay)
117+
private void acquireLease(RetryPolicy retryPolicy, int numRetries, int retryInterval, long delay)
107118
throws LeaseException {
108119
LOG.debug("Attempting to acquire lease on {}, retry {}", path, numRetries);
109120
if (future != null && !future.isDone()) {
@@ -124,7 +135,8 @@ public void onFailure(Throwable throwable) {
124135
if (RetryPolicy.RetryAction.RetryDecision.RETRY
125136
== retryPolicy.shouldRetry(null, numRetries, 0, true).action) {
126137
LOG.debug("Failed to acquire lease on {}, retrying: {}", path, throwable);
127-
acquireLease(retryPolicy, numRetries + 1, LEASE_ACQUIRE_RETRY_INTERVAL);
138+
acquireRetryCount++;
139+
acquireLease(retryPolicy, numRetries + 1, retryInterval, retryInterval);
128140
} else {
129141
exception = throwable;
130142
}
@@ -168,4 +180,9 @@ public boolean isFreed() {
168180
public String getLeaseID() {
169181
return leaseID;
170182
}
183+
184+
@VisibleForTesting
185+
public int getAcquireRetryCount() {
186+
return acquireRetryCount;
187+
}
171188
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,18 @@
2626
import org.apache.hadoop.conf.Configuration;
2727
import org.apache.hadoop.fs.FSDataOutputStream;
2828
import org.apache.hadoop.fs.Path;
29+
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
30+
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
31+
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
2932
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
3033
import org.apache.hadoop.test.GenericTestUtils;
3134
import org.apache.hadoop.test.LambdaTestUtils;
3235

36+
import static org.mockito.ArgumentMatchers.anyInt;
37+
import static org.mockito.ArgumentMatchers.anyString;
38+
import static org.mockito.Mockito.doThrow;
39+
import static org.mockito.Mockito.spy;
40+
3341
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INFINITE_LEASE_KEY;
3442
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS;
3543
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
@@ -293,4 +301,36 @@ public void testFileSystemClose() throws Exception {
293301
return "Expected exception on new append after closed FS";
294302
});
295303
}
304+
305+
@Test(timeout = TEST_EXECUTION_TIMEOUT)
306+
public void testAcquireRetry() throws Exception {
307+
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
308+
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
309+
fs.mkdirs(testFilePath.getParent());
310+
fs.createNewFile(testFilePath);
311+
312+
AbfsLease lease = new AbfsLease(fs.getAbfsClient(), testFilePath.toUri().getPath());
313+
Assert.assertNotNull("Did not successfully lease file", lease.getLeaseID());
314+
lease.free();
315+
Assert.assertEquals("Unexpected acquire retry count", 0, lease.getAcquireRetryCount());
316+
317+
AbfsClient mockClient = spy(fs.getAbfsClient());
318+
319+
doThrow(new AbfsLease.LeaseException("failed to acquire 1"))
320+
.doThrow(new AbfsLease.LeaseException("failed to acquire 2"))
321+
.doCallRealMethod()
322+
.when(mockClient).acquireLease(anyString(), anyInt());
323+
324+
lease = new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1);
325+
Assert.assertNotNull("Acquire lease should have retried", lease.getLeaseID());
326+
lease.free();
327+
Assert.assertEquals("Unexpected acquire retry count", 2, lease.getAcquireRetryCount());
328+
329+
doThrow(new AbfsLease.LeaseException("failed to acquire"))
330+
.when(mockClient).acquireLease(anyString(), anyInt());
331+
332+
LambdaTestUtils.intercept(AzureBlobFileSystemException.class, () -> {
333+
new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1);
334+
});
335+
}
296336
}

0 commit comments

Comments
 (0)