Skip to content

Commit 2c51975

Browse files
committed
Code Improvements
1 parent 6da7267 commit 2c51975

File tree

7 files changed

+99
-29
lines changed

7 files changed

+99
-29
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TailLatencyRequestTimeoutException.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,18 @@
1818

1919
package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
2020

21+
import java.util.concurrent.TimeoutException;
2122
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_TAIL_LATENCY_REQUEST_TIMEOUT;
2223

24+
/**
25+
* Thrown when a request takes more time than the current reported tail latency.
26+
*/
2327
public class TailLatencyRequestTimeoutException extends AzureBlobFileSystemException {
2428

25-
public TailLatencyRequestTimeoutException() {
26-
super(ERR_TAIL_LATENCY_REQUEST_TIMEOUT);
27-
}
28-
29-
public TailLatencyRequestTimeoutException(Exception innerException) {
29+
/**
30+
* Constructs a TailLatencyRequestTimeoutException with TimeoutException as the cause.
31+
*/
32+
public TailLatencyRequestTimeoutException(TimeoutException innerException) {
3033
super(ERR_TAIL_LATENCY_REQUEST_TIMEOUT, innerException);
3134
}
3235
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,14 +91,19 @@ public class AbfsAHCHttpOperation extends AbfsHttpOperation {
9191
*/
9292
private final AbfsApacheHttpClient abfsApacheHttpClient;
9393

94-
private final long tailLatencyMs;
94+
/**
95+
* Timeout in milliseconds that defines maximum allowed time to execute operation.
96+
* This timeout starts when execution starts and includes E2E processing time of request.
97+
* This is based on tail latency observed in the system.
98+
*/
99+
private final long tailLatencyTimeout;
95100

96101
public AbfsAHCHttpOperation(final URL url,
97102
final String method,
98103
final List<AbfsHttpHeader> requestHeaders,
99104
final Duration connectionTimeout,
100105
final Duration readTimeout,
101-
final long tailLatencyMs,
106+
final long tailLatencyTimeout,
102107
final AbfsApacheHttpClient abfsApacheHttpClient,
103108
final AbfsClient abfsClient) throws IOException {
104109
super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout,
@@ -107,7 +112,7 @@ public AbfsAHCHttpOperation(final URL url,
107112
|| HTTP_METHOD_PATCH.equals(method)
108113
|| HTTP_METHOD_POST.equals(method);
109114
this.abfsApacheHttpClient = abfsApacheHttpClient;
110-
this.tailLatencyMs = tailLatencyMs;
115+
this.tailLatencyTimeout = tailLatencyTimeout;
111116
LOG.debug("Creating AbfsAHCHttpOperation for URL: {}, method: {}",
112117
url, method);
113118

@@ -163,8 +168,8 @@ AbfsManagedHttpClientContext getHttpClientContext() {
163168
return new AbfsManagedHttpClientContext();
164169
}
165170

166-
long getTailLatencyMs() {
167-
return tailLatencyMs;
171+
long getTailLatencyTimeout() {
172+
return tailLatencyTimeout;
168173
}
169174

170175
/**{@inheritDoc}*/
@@ -281,7 +286,7 @@ HttpResponse executeRequest() throws IOException {
281286
try {
282287
LOG.debug("Executing request: {}", httpRequestBase);
283288
HttpResponse response = abfsApacheHttpClient.execute(httpRequestBase,
284-
abfsHttpClientContext, getConnectionTimeout(), getReadTimeout(), getTailLatencyMs());
289+
abfsHttpClientContext, getConnectionTimeout(), getReadTimeout(), getTailLatencyTimeout());
285290
setConnectionTimeMs(abfsHttpClientContext.getConnectTime());
286291
setSendRequestTimeMs(abfsHttpClientContext.getSendTime());
287292
setRecvResponseTimeMs(abfsHttpClientContext.getReadTime());

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,36 @@ public HttpResponse execute(HttpRequestBase httpRequest,
141141
final AbfsManagedHttpClientContext abfsHttpClientContext,
142142
final int connectTimeout,
143143
final int readTimeout,
144-
final long tailLatency) throws IOException {
145-
if (tailLatency <= 0) {
146-
return executeWithDeadline(httpRequest, abfsHttpClientContext,
147-
connectTimeout, readTimeout, Long.MAX_VALUE);
144+
final long tailLatencyTimeout) throws IOException {
145+
if (tailLatencyTimeout <= 0) {
146+
return executeWithoutDeadline(httpRequest, abfsHttpClientContext,
147+
connectTimeout, readTimeout);
148148
}
149149
return executeWithDeadline(httpRequest, abfsHttpClientContext,
150-
connectTimeout, readTimeout, tailLatency);
150+
connectTimeout, readTimeout, tailLatencyTimeout);
151+
}
152+
153+
/**
154+
* Executes the HTTP request.
155+
*
156+
* @param httpRequest HTTP request to execute.
157+
* @param abfsHttpClientContext HttpClient context.
158+
* @param connectTimeout Connection timeout.
159+
* @param readTimeout Read timeout.
160+
*
161+
* @return HTTP response.
162+
* @throws IOException network error.
163+
*/
164+
public HttpResponse executeWithoutDeadline(HttpRequestBase httpRequest,
165+
final AbfsManagedHttpClientContext abfsHttpClientContext,
166+
final int connectTimeout,
167+
final int readTimeout) throws IOException {
168+
RequestConfig.Builder requestConfigBuilder = RequestConfig
169+
.custom()
170+
.setConnectTimeout(connectTimeout)
171+
.setSocketTimeout(readTimeout);
172+
httpRequest.setConfig(requestConfigBuilder.build());
173+
return httpClient.execute(httpRequest, abfsHttpClientContext);
151174
}
152175

153176
/**
@@ -168,22 +191,27 @@ public HttpResponse executeWithDeadline(HttpRequestBase httpRequest,
168191
final int connectTimeout,
169192
final int readTimeout,
170193
final long deadlineMillis) throws IOException {
171-
172194
RequestConfig.Builder requestConfigBuilder = RequestConfig
173195
.custom()
174196
.setConnectTimeout(connectTimeout)
175197
.setSocketTimeout(readTimeout);
176198
httpRequest.setConfig(requestConfigBuilder.build());
177-
178199
ExecutorService executor = Executors.newSingleThreadExecutor();
179200
Future<HttpResponse> future = executor.submit(() ->
180-
httpClient.execute(httpRequest, abfsHttpClientContext));
201+
httpClient.execute(httpRequest, abfsHttpClientContext)
202+
);
203+
181204
try {
182205
return future.get(deadlineMillis, TimeUnit.MILLISECONDS);
183206
} catch (TimeoutException e) {
207+
/* Deadline exceeded, abort the request.
208+
* This will also kill the underlying socket exception in the HttpClient.
209+
* Connection will be marker stale and won't be returned back to KAC for reuse.
210+
*/
184211
httpRequest.abort();
185212
throw new TailLatencyRequestTimeoutException(e);
186213
} catch (Exception e) {
214+
// Any other exception from execution should be thrown as IOException.
187215
throw new IOException("Request execution with deadline failed", e);
188216
} finally {
189217
executor.shutdownNow();

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public abstract class AbfsClient implements Closeable {
190190
private EncryptionContextProvider encryptionContextProvider = null;
191191
private EncryptionType encryptionType = EncryptionType.NONE;
192192
private final AbfsThrottlingIntercept intercept;
193-
private AbfsTailLatencyTracker latencyTracker = null;
193+
private AbfsTailLatencyTracker tailLatencyTracker = null;
194194

195195
private final ListeningScheduledExecutorService executorService;
196196

@@ -227,7 +227,7 @@ private AbfsClient(final URL baseUrl,
227227
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
228228
this.authType = abfsConfiguration.getAuthType(accountName);
229229
this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
230-
this.latencyTracker = AbfsTailLatencyTrackerFactory.getInstance(accountName, abfsConfiguration);
230+
this.tailLatencyTracker = AbfsTailLatencyTrackerFactory.getInstance(accountName, abfsConfiguration);
231231
this.renameResilience = abfsConfiguration.getRenameResilience();
232232
this.abfsServiceType = abfsServiceType;
233233

@@ -434,8 +434,12 @@ public void setEncryptionType(EncryptionType encryptionType) {
434434
this.encryptionType = encryptionType;
435435
}
436436

437-
public AbfsTailLatencyTracker getLatencyTracker() {
438-
return latencyTracker;
437+
/**
438+
* Get the tail latency tracker for this account.
439+
* @return tail latency tracker if enabled, null otherwise.
440+
*/
441+
public AbfsTailLatencyTracker getTailLatencyTracker() {
442+
return tailLatencyTracker;
439443
}
440444

441445
public EncryptionType getEncryptionType() {

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public class AbfsRestOperation {
8080
private final AbfsClient client;
8181
// Return intercept instance
8282
private final AbfsThrottlingIntercept intercept;
83-
// Latency tracker
83+
// Tail Latency tracker
8484
private final AbfsTailLatencyTracker tailLatencyTracker;
8585
// the HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE)
8686
private final String method;
@@ -236,7 +236,7 @@ String getSasToken() {
236236
}
237237
this.maxIoRetries = abfsConfiguration.getMaxIoRetries();
238238
this.intercept = client.getIntercept();
239-
this.tailLatencyTracker = client.getLatencyTracker();
239+
this.tailLatencyTracker = client.getTailLatencyTracker();
240240
this.abfsConfiguration = abfsConfiguration;
241241
this.retryPolicy = client.getExponentialRetryPolicy();
242242
}
@@ -524,7 +524,12 @@ private boolean executeHttpOperation(final int retryCount,
524524
}
525525
}
526526
if (!retryPolicy.shouldRetry(retryCount, -1)) {
527+
/*
528+
* If a request is failing with TailLatencyTimeout exception.
529+
* it should be retried without Tail Latency Timeout exception should not be returned to caller.
530+
*/
527531
if (retryPolicy instanceof TailLatencyRequestTimeoutRetryPolicy) {
532+
// Disable Tail Latency Timeout for the next retry.
528533
shouldTailLatencyTimeout = false;
529534
return false;
530535
}
@@ -540,6 +545,7 @@ private boolean executeHttpOperation(final int retryCount,
540545
intercept.updateMetrics(operationType, httpOperation);
541546
}
542547

548+
// Update Tail Latency Tracker only for successful requests.
543549
if (tailLatencyTracker != null && statusCode < HttpURLConnection.HTTP_MULT_CHOICE) {
544550
tailLatencyTracker.updateLatency(operationType,
545551
httpOperation.getSendLatency() + httpOperation.getRecvLatency());
@@ -624,14 +630,19 @@ AbfsJdkHttpOperation createAbfsHttpOperation() throws IOException {
624630

625631
@VisibleForTesting
626632
AbfsAHCHttpOperation createAbfsAHCHttpOperation() throws IOException {
627-
long tailLatency = getTailLatencyIfTimeoutEnabled();
633+
long tailLatency = getTailLatencyTimeoutIfEnabled();
628634
return new AbfsAHCHttpOperation(url, method, requestHeaders,
629635
Duration.ofMillis(client.getAbfsConfiguration().getHttpConnectionTimeout()),
630636
Duration.ofMillis(client.getAbfsConfiguration().getHttpReadTimeout()),
631637
tailLatency, client.getAbfsApacheHttpClient(), client);
632638
}
633639

634-
long getTailLatencyIfTimeoutEnabled() {
640+
/**
641+
* Get Tail Latency Timeout value if profiling is enabled, timeout is enabled
642+
* and retries due to tail latency request timeout is allowed.
643+
* @return tail latency timeout value else return zero.
644+
*/
645+
long getTailLatencyTimeoutIfEnabled() {
635646
if (tailLatencyTracker != null && abfsConfiguration.isTailLatencyRequestTimeoutEnabled() && shouldTailLatencyTimeout) {
636647
return (long) tailLatencyTracker.getTailLatency(this.operationType);
637648
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsTailLatencyTracker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,9 @@ public void updateLatency(final AbfsRestOperationType operationType,
144144
}
145145

146146
/**
147-
*
147+
* Gets the tail latency for a specific operation type.
148148
* @param operationType Only applicable for read and write operations.
149+
* @return Tail latency value.
149150
*/
150151
public double getTailLatency(final AbfsRestOperationType operationType) {
151152
SlidingWindowHdrHistogram histogram = operationLatencyMap.get(operationType);

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/retryReasonCategories/TailLatencyRequestTimeoutRetryReason.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
119
package org.apache.hadoop.fs.azurebfs.services.retryReasonCategories;
220

321
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_TAIL_LATENCY_REQUEST_TIMEOUT;
@@ -18,4 +36,4 @@ Boolean canCapture(final Exception ex,
1836
final String serverErrorMessage) {
1937
return checkExceptionMessage(ex, ERR_TAIL_LATENCY_REQUEST_TIMEOUT);
2038
}
21-
}
39+
}

0 commit comments

Comments
 (0)