Skip to content

Commit 593c939

Browse files
authored
Merge branch 'apache:trunk' into TestBuild
2 parents e99aca0 + d1daf26 commit 593c939

29 files changed

+964
-208
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.HashSet;
3737
import java.util.List;
3838
import java.util.Map;
39+
import java.util.Set;
3940
import java.util.concurrent.ExecutionException;
4041

4142
import javax.servlet.ServletContext;
@@ -278,6 +279,9 @@ protected void queueExternalCall(ExternalCall call)
278279
namenode.queueExternalCall(call);
279280
}
280281

282+
/**
283+
* Chooses a Datanode to redirect a request to.
284+
*/
281285
@VisibleForTesting
282286
static DatanodeInfo chooseDatanode(final NameNode namenode,
283287
final String path, final HttpOpParam.Op op, final long openOffset,
@@ -288,18 +292,18 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
288292
throw new IOException("Namesystem has not been initialized yet.");
289293
}
290294
final BlockManager bm = fsn.getBlockManager();
291-
292-
HashSet<Node> excludes = new HashSet<Node>();
295+
296+
Set<Node> excludes = new HashSet<>();
293297
if (excludeDatanodes != null) {
294298
for (String host : StringUtils
295299
.getTrimmedStringCollection(excludeDatanodes)) {
296-
int idx = host.indexOf(":");
300+
int idx = host.indexOf(':');
297301
Node excludeNode = null;
298-
if (idx != -1) {
299-
excludeNode = bm.getDatanodeManager().getDatanodeByXferAddr(
300-
host.substring(0, idx), Integer.parseInt(host.substring(idx + 1)));
301-
} else {
302+
if (idx == -1) {
302303
excludeNode = bm.getDatanodeManager().getDatanodeByHost(host);
304+
} else {
305+
excludeNode = bm.getDatanodeManager().getDatanodeByXferAddr(
306+
host.substring(0, idx), Integer.parseInt(host.substring(idx + 1)));
303307
}
304308

305309
if (excludeNode != null) {
@@ -311,25 +315,15 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
311315
}
312316
}
313317

314-
if (op == PutOpParam.Op.CREATE) {
315-
//choose a datanode near to client
316-
final DatanodeDescriptor clientNode = bm.getDatanodeManager(
317-
).getDatanodeByHost(remoteAddr);
318-
if (clientNode != null) {
319-
final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
320-
path, clientNode, excludes, blocksize);
321-
if (storages.length > 0) {
322-
return storages[0].getDatanodeDescriptor();
323-
}
324-
}
325-
} else if (op == GetOpParam.Op.OPEN
318+
// For these operations choose a datanode containing a replica
319+
if (op == GetOpParam.Op.OPEN
326320
|| op == GetOpParam.Op.GETFILECHECKSUM
327321
|| op == PostOpParam.Op.APPEND) {
328-
//choose a datanode containing a replica
329322
final NamenodeProtocols np = getRPCServer(namenode);
330323
if (status == null) {
331324
throw new FileNotFoundException("File " + path + " not found.");
332325
}
326+
333327
final long len = status.getLen();
334328
if (op == GetOpParam.Op.OPEN) {
335329
if (openOffset < 0L || (openOffset >= len && len > 0)) {
@@ -344,10 +338,22 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
344338
final int count = locations.locatedBlockCount();
345339
if (count > 0) {
346340
return bestNode(locations.get(0).getLocations(), excludes);
341+
} else {
342+
throw new IOException("Block could not be located. Path=" + path + ", offset=" + offset);
347343
}
348344
}
349345
}
350346

347+
// All other operations don't affect a specific node so let the BlockManager pick a target
348+
DatanodeDescriptor clientNode = bm.getDatanodeManager(
349+
).getDatanodeByHost(remoteAddr);
350+
351+
DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
352+
path, clientNode, excludes, blocksize);
353+
if (storages.length > 0) {
354+
return storages[0].getDatanodeDescriptor();
355+
}
356+
351357
return (DatanodeDescriptor)bm.getDatanodeManager().getNetworkTopology(
352358
).chooseRandom(NodeBase.ROOT, excludes);
353359
}
@@ -358,13 +364,13 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
358364
* to return the first element of the node here.
359365
*/
360366
protected static DatanodeInfo bestNode(DatanodeInfo[] nodes,
361-
HashSet<Node> excludes) throws IOException {
367+
Set<Node> excludes) throws IOException {
362368
for (DatanodeInfo dn: nodes) {
363-
if (false == dn.isDecommissioned() && false == excludes.contains(dn)) {
369+
if (!dn.isDecommissioned() && !excludes.contains(dn)) {
364370
return dn;
365371
}
366372
}
367-
throw new IOException("No active nodes contain this block");
373+
throw new IOException("No active and not excluded nodes contain this block");
368374
}
369375

370376
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,14 @@ public class AbfsConfiguration{
152152
DefaultValue = DEFAULT_MAX_BACKOFF_INTERVAL)
153153
private int maxBackoffInterval;
154154

155+
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED,
156+
DefaultValue = DEFAULT_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED)
157+
private boolean staticRetryForConnectionTimeoutEnabled;
158+
159+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_STATIC_RETRY_INTERVAL,
160+
DefaultValue = DEFAULT_STATIC_RETRY_INTERVAL)
161+
private int staticRetryInterval;
162+
155163
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BACKOFF_INTERVAL,
156164
DefaultValue = DEFAULT_BACKOFF_INTERVAL)
157165
private int backoffInterval;
@@ -166,6 +174,14 @@ public class AbfsConfiguration{
166174
DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT)
167175
private int customTokenFetchRetryCount;
168176

177+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_HTTP_CONNECTION_TIMEOUT,
178+
DefaultValue = DEFAULT_HTTP_CONNECTION_TIMEOUT)
179+
private int httpConnectionTimeout;
180+
181+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_HTTP_READ_TIMEOUT,
182+
DefaultValue = DEFAULT_HTTP_READ_TIMEOUT)
183+
private int httpReadTimeout;
184+
169185
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT,
170186
MinValue = 0,
171187
DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS)
@@ -669,6 +685,14 @@ public int getMaxBackoffIntervalMilliseconds() {
669685
return this.maxBackoffInterval;
670686
}
671687

688+
public boolean getStaticRetryForConnectionTimeoutEnabled() {
689+
return staticRetryForConnectionTimeoutEnabled;
690+
}
691+
692+
public int getStaticRetryInterval() {
693+
return staticRetryInterval;
694+
}
695+
672696
public int getBackoffIntervalMilliseconds() {
673697
return this.backoffInterval;
674698
}
@@ -681,6 +705,14 @@ public int getCustomTokenFetchRetryCount() {
681705
return this.customTokenFetchRetryCount;
682706
}
683707

708+
public int getHttpConnectionTimeout() {
709+
return this.httpConnectionTimeout;
710+
}
711+
712+
public int getHttpReadTimeout() {
713+
return this.httpReadTimeout;
714+
}
715+
684716
public long getAzureBlockSize() {
685717
return this.azureBlockSize;
686718
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@
118118
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
119119
import org.apache.hadoop.fs.azurebfs.services.AuthType;
120120
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
121+
import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy;
121122
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
122123
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
123124
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
@@ -1781,6 +1782,8 @@ private AbfsClientContext populateAbfsClientContext() {
17811782
return new AbfsClientContextBuilder()
17821783
.withExponentialRetryPolicy(
17831784
new ExponentialRetryPolicy(abfsConfiguration))
1785+
.withStaticRetryPolicy(
1786+
new StaticRetryPolicy(abfsConfiguration))
17841787
.withAbfsCounters(abfsCounters)
17851788
.withAbfsPerfTracker(abfsPerfTracker)
17861789
.build();

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,23 @@ public final class ConfigurationKeys {
4848
// Retry strategy defined by the user
4949
public static final String AZURE_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval";
5050
public static final String AZURE_MAX_BACKOFF_INTERVAL = "fs.azure.io.retry.max.backoff.interval";
51+
public static final String AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED = "fs.azure.static.retry.for.connection.timeout.enabled";
52+
public static final String AZURE_STATIC_RETRY_INTERVAL = "fs.azure.static.retry.interval";
5153
public static final String AZURE_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval";
5254
public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries";
5355
public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = "fs.azure.custom.token.fetch.retry.count";
5456

57+
/**
58+
* Config to set HTTP Connection Timeout Value for Rest Operations.
59+
* Value: {@value}.
60+
*/
61+
public static final String AZURE_HTTP_CONNECTION_TIMEOUT = "fs.azure.http.connection.timeout";
62+
/**
63+
* Config to set HTTP Read Timeout Value for Rest Operations.
64+
* Value: {@value}.
65+
*/
66+
public static final String AZURE_HTTP_READ_TIMEOUT = "fs.azure.http.read.timeout";
67+
5568
// Retry strategy for getToken calls
5669
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT = "fs.azure.oauth.token.fetch.retry.max.retries";
5770
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF = "fs.azure.oauth.token.fetch.retry.min.backoff.interval";

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,28 @@ public final class FileSystemConfigurations {
3535
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
3636
public static final String USER_HOME_DIRECTORY_PREFIX = "/user";
3737

38-
private static final int SIXTY_SECONDS = 60 * 1000;
38+
private static final int SIXTY_SECONDS = 60_000;
3939

4040
// Retry parameter defaults.
41-
public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3 * 1000; // 3s
42-
public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
43-
public static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 3s
41+
public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3_000; // 3s
42+
public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30_000; // 30s
43+
public static final boolean DEFAULT_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED = true;
44+
public static final int DEFAULT_STATIC_RETRY_INTERVAL = 1_000; // 1s
45+
public static final int DEFAULT_BACKOFF_INTERVAL = 3_000; // 3s
4446
public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30;
4547
public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3;
4648

49+
/**
50+
* Default value of connection timeout to be used while setting up HTTP Connection.
51+
* Value: {@value}.
52+
*/
53+
public static final int DEFAULT_HTTP_CONNECTION_TIMEOUT = 2_000; // 2s
54+
/**
55+
* Default value of read timeout to be used while setting up HTTP Connection.
56+
* Value: {@value}.
57+
*/
58+
public static final int DEFAULT_HTTP_READ_TIMEOUT = 30_000; // 30 secs
59+
4760
// Retry parameter defaults.
4861
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS = 5;
4962
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL = 0;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
8383
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
8484
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
85+
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
8586

8687
/**
8788
* AbfsClient.
@@ -93,7 +94,8 @@ public class AbfsClient implements Closeable {
9394
private final URL baseUrl;
9495
private final SharedKeyCredentials sharedKeyCredentials;
9596
private String xMsVersion = DECEMBER_2019_API_VERSION;
96-
private final ExponentialRetryPolicy retryPolicy;
97+
private final ExponentialRetryPolicy exponentialRetryPolicy;
98+
private final StaticRetryPolicy staticRetryPolicy;
9799
private final String filesystem;
98100
private final AbfsConfiguration abfsConfiguration;
99101
private final String userAgent;
@@ -131,7 +133,8 @@ private AbfsClient(final URL baseUrl,
131133
String baseUrlString = baseUrl.toString();
132134
this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1);
133135
this.abfsConfiguration = abfsConfiguration;
134-
this.retryPolicy = abfsClientContext.getExponentialRetryPolicy();
136+
this.exponentialRetryPolicy = abfsClientContext.getExponentialRetryPolicy();
137+
this.staticRetryPolicy = abfsClientContext.getStaticRetryPolicy();
135138
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
136139
this.authType = abfsConfiguration.getAuthType(accountName);
137140
this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
@@ -213,8 +216,24 @@ protected AbfsPerfTracker getAbfsPerfTracker() {
213216
return abfsPerfTracker;
214217
}
215218

216-
ExponentialRetryPolicy getRetryPolicy() {
217-
return retryPolicy;
219+
ExponentialRetryPolicy getExponentialRetryPolicy() {
220+
return exponentialRetryPolicy;
221+
}
222+
223+
StaticRetryPolicy getStaticRetryPolicy() {
224+
return staticRetryPolicy;
225+
}
226+
227+
/**
228+
* Returns the retry policy to be used for Abfs Rest Operation Failure.
229+
* @param failureReason helps to decide which type of retryPolicy to be used.
230+
* @return retry policy to be used.
231+
*/
232+
public AbfsRetryPolicy getRetryPolicy(final String failureReason) {
233+
return CONNECTION_TIMEOUT_ABBREVIATION.equals(failureReason)
234+
&& getAbfsConfiguration().getStaticRetryForConnectionTimeoutEnabled()
235+
? getStaticRetryPolicy()
236+
: getExponentialRetryPolicy();
218237
}
219238

220239
SharedKeyCredentials getSharedKeyCredentials() {

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,18 @@
2525
public class AbfsClientContext {
2626

2727
private final ExponentialRetryPolicy exponentialRetryPolicy;
28+
private final StaticRetryPolicy staticRetryPolicy;
2829
private final AbfsPerfTracker abfsPerfTracker;
2930
private final AbfsCounters abfsCounters;
3031

3132
AbfsClientContext(
3233
ExponentialRetryPolicy exponentialRetryPolicy,
34+
StaticRetryPolicy staticRetryPolicy,
3335
AbfsPerfTracker abfsPerfTracker,
3436
AbfsCounters abfsCounters) {
3537
this.exponentialRetryPolicy = exponentialRetryPolicy;
38+
39+
this.staticRetryPolicy = staticRetryPolicy;
3640
this.abfsPerfTracker = abfsPerfTracker;
3741
this.abfsCounters = abfsCounters;
3842
}
@@ -41,6 +45,10 @@ public ExponentialRetryPolicy getExponentialRetryPolicy() {
4145
return exponentialRetryPolicy;
4246
}
4347

48+
public StaticRetryPolicy getStaticRetryPolicy() {
49+
return staticRetryPolicy;
50+
}
51+
4452
public AbfsPerfTracker getAbfsPerfTracker() {
4553
return abfsPerfTracker;
4654
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
public class AbfsClientContextBuilder {
2626

2727
private ExponentialRetryPolicy exponentialRetryPolicy;
28+
private StaticRetryPolicy staticRetryPolicy;
2829
private AbfsPerfTracker abfsPerfTracker;
2930
private AbfsCounters abfsCounters;
3031

@@ -34,6 +35,12 @@ public AbfsClientContextBuilder withExponentialRetryPolicy(
3435
return this;
3536
}
3637

38+
public AbfsClientContextBuilder withStaticRetryPolicy(
39+
final StaticRetryPolicy staticRetryPolicy) {
40+
this.staticRetryPolicy = staticRetryPolicy;
41+
return this;
42+
}
43+
3744
public AbfsClientContextBuilder withAbfsPerfTracker(
3845
final AbfsPerfTracker abfsPerfTracker) {
3946
this.abfsPerfTracker = abfsPerfTracker;
@@ -52,7 +59,10 @@ public AbfsClientContextBuilder withAbfsCounters(final AbfsCounters abfsCounters
5259
*/
5360
public AbfsClientContext build() {
5461
//validate the values
55-
return new AbfsClientContext(exponentialRetryPolicy, abfsPerfTracker,
62+
return new AbfsClientContext(
63+
exponentialRetryPolicy,
64+
staticRetryPolicy,
65+
abfsPerfTracker,
5666
abfsCounters);
5767
}
5868
}

0 commit comments

Comments
 (0)