Skip to content

Commit 4344537

Browse files
steveloughranslfan1989
authored andcommitted
HADOOP-19045. S3A: Validate CreateSession Timeout Propagation (#6470)
New test ITestCreateSessionTimeout to verify that the duration set in fs.s3a.connection.request.timeout is passed all the way down. This is done by adding a sleep() in a custom signer and verifying that it is interrupted and that an AWSApiCallTimeoutException is raised. + Fix testRequestTimeout() * doesn't skip if considered cross-region * sets a minimum duration of 0 before invocation * resets the minimum afterwards Contributed by Steve Loughran
1 parent 92e5974 commit 4344537

File tree

6 files changed

+262
-16
lines changed

6 files changed

+262
-16
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -337,16 +337,33 @@ private Constants() {
337337
public static final int DEFAULT_SOCKET_TIMEOUT = (int)DEFAULT_SOCKET_TIMEOUT_DURATION.toMillis();
338338

339339
/**
340-
* Time until a request is timed-out: {@value}.
341-
* If zero, there is no timeout.
340+
* How long should the SDK retry/wait on a response from an S3 store: {@value}
341+
* <i>including the time needed to sign the request</i>.
342+
* <p>
343+
* This is time to response, so for a GET request it is "time to 200 response"
344+
* not the time limit to download the requested data.
345+
* This makes it different from {@link #REQUEST_TIMEOUT}, which is for total
346+
* HTTP request.
347+
* <p>
348+
* Default unit is milliseconds.
349+
* <p>
350+
* There is a minimum duration set in {@link #MINIMUM_NETWORK_OPERATION_DURATION};
351+
* it is impossible to set a delay less than this, even for testing.
352+
* Why so? Too many deployments where the configuration assumed the timeout was in seconds
353+
* and that "120" was a reasonable value rather than "too short to work reliably"
354+
* <p>
355+
* Note for anyone writing tests which need to set a low value for this:
356+
* to avoid the minimum duration overrides, call
357+
* {@code AWSClientConfig.setMinimumOperationDuration()} and set a low value
358+
* before creating the filesystem.
342359
*/
343360
public static final String REQUEST_TIMEOUT =
344361
"fs.s3a.connection.request.timeout";
345362

346363
/**
347-
* Default duration of a request before it is timed out: Zero.
364+
* Default duration of a request before it is timed out: 60s.
348365
*/
349-
public static final Duration DEFAULT_REQUEST_TIMEOUT_DURATION = Duration.ZERO;
366+
public static final Duration DEFAULT_REQUEST_TIMEOUT_DURATION = Duration.ofSeconds(60);
350367

351368
/**
352369
* Default duration of a request before it is timed out: Zero.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomHttpSigner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
* fs.s3a.http.signer.class = org.apache.hadoop.fs.s3a.auth.CustomHttpSigner
4141
* </pre>
4242
*/
43-
public final class CustomHttpSigner implements HttpSigner<AwsCredentialsIdentity> {
43+
public class CustomHttpSigner implements HttpSigner<AwsCredentialsIdentity> {
4444
private static final Logger LOG = LoggerFactory
4545
.getLogger(CustomHttpSigner.class);
4646

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ static ClientSettings createApiConnectionSettings(Configuration conf) {
577577

578578
/**
579579
* Build the HTTP connection settings object from the configuration.
580-
* All settings are calculated, including the api call timeout.
580+
* All settings are calculated.
581581
* @param conf configuration to evaluate
582582
* @return connection settings.
583583
*/

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.net.ConnectException;
2424
import java.net.URI;
2525
import java.security.PrivilegedExceptionAction;
26+
import java.time.Duration;
2627

2728
import org.assertj.core.api.Assertions;
2829
import org.junit.Rule;
@@ -49,6 +50,7 @@
4950
import org.apache.hadoop.fs.Path;
5051
import org.apache.hadoop.fs.contract.ContractTestUtils;
5152
import org.apache.hadoop.fs.s3a.auth.STSClientFactory;
53+
import org.apache.hadoop.fs.s3a.impl.AWSClientConfig;
5254
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
5355
import org.apache.hadoop.security.ProviderUtils;
5456
import org.apache.hadoop.security.UserGroupInformation;
@@ -435,16 +437,22 @@ public void testCustomUserAgent() throws Exception {
435437
@Test
436438
public void testRequestTimeout() throws Exception {
437439
conf = new Configuration();
438-
skipIfCrossRegionClient(conf);
439-
conf.set(REQUEST_TIMEOUT, "120");
440-
fs = S3ATestUtils.createTestFileSystem(conf);
441-
S3Client s3 = getS3Client("Request timeout (ms)");
442-
SdkClientConfiguration clientConfiguration = getField(s3, SdkClientConfiguration.class,
443-
"clientConfiguration");
444-
assertEquals("Configured " + REQUEST_TIMEOUT +
445-
" is different than what AWS sdk configuration uses internally",
446-
120000,
447-
clientConfiguration.option(SdkClientOption.API_CALL_ATTEMPT_TIMEOUT).toMillis());
440+
// remove the safety check on minimum durations.
441+
AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
442+
try {
443+
Duration timeout = Duration.ofSeconds(120);
444+
conf.set(REQUEST_TIMEOUT, timeout.getSeconds() + "s");
445+
fs = S3ATestUtils.createTestFileSystem(conf);
446+
S3Client s3 = getS3Client("Request timeout (ms)");
447+
SdkClientConfiguration clientConfiguration = getField(s3, SdkClientConfiguration.class,
448+
"clientConfiguration");
449+
Assertions.assertThat(clientConfiguration.option(SdkClientOption.API_CALL_ATTEMPT_TIMEOUT))
450+
.describedAs("Configured " + REQUEST_TIMEOUT +
451+
" is different than what AWS sdk configuration uses internally")
452+
.isEqualTo(timeout);
453+
} finally {
454+
AWSClientConfig.resetMinimumOperationDuration();
455+
}
448456
}
449457

450458
@Test

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,16 @@ public static void skipIfS3ExpressBucket(
550550
!isS3ExpressTestBucket(configuration));
551551
}
552552

553+
/**
554+
* Skip a test if the test bucket is not an S3Express bucket.
555+
* @param configuration configuration to probe
556+
*/
557+
public static void skipIfNotS3ExpressBucket(
558+
Configuration configuration) {
559+
assume("Skipping test as bucket is not an S3Express bucket",
560+
isS3ExpressTestBucket(configuration));
561+
}
562+
553563
/**
554564
* Is the test bucket an S3Express bucket?
555565
* @param conf configuration
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.performance;
20+
21+
import java.time.Duration;
22+
import java.util.Arrays;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicLong;
26+
27+
import org.assertj.core.api.Assertions;
28+
import org.junit.Test;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
import software.amazon.awssdk.http.SdkHttpRequest;
32+
import software.amazon.awssdk.http.auth.spi.signer.AsyncSignRequest;
33+
import software.amazon.awssdk.http.auth.spi.signer.AsyncSignedRequest;
34+
import software.amazon.awssdk.http.auth.spi.signer.HttpSigner;
35+
import software.amazon.awssdk.http.auth.spi.signer.SignRequest;
36+
import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
37+
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
38+
39+
import org.apache.hadoop.conf.Configuration;
40+
import org.apache.hadoop.fs.Path;
41+
import org.apache.hadoop.fs.s3a.AWSApiCallTimeoutException;
42+
import org.apache.hadoop.fs.s3a.S3AFileSystem;
43+
import org.apache.hadoop.fs.s3a.auth.CustomHttpSigner;
44+
import org.apache.hadoop.fs.s3a.impl.AWSClientConfig;
45+
import org.apache.hadoop.util.DurationInfo;
46+
47+
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS;
48+
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_CLASS_NAME;
49+
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED;
50+
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
51+
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
52+
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
53+
import static org.apache.hadoop.fs.s3a.Constants.S3EXPRESS_CREATE_SESSION;
54+
import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_S3;
55+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
56+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
57+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotS3ExpressBucket;
58+
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
59+
60+
/**
61+
* Test timeout of S3 Client CreateSession call, which was originally
62+
* hard coded to 10 seconds.
63+
* Only executed against an S3Express store.
64+
*/
65+
public class ITestCreateSessionTimeout extends AbstractS3ACostTest {
66+
67+
private static final Logger LOG =
68+
LoggerFactory.getLogger(ITestCreateSessionTimeout.class);
69+
70+
/**
71+
* What is the duration for the operation after which the test is considered
72+
* to have failed because timeouts didn't get passed down?
73+
*/
74+
private static final long TIMEOUT_EXCEPTION_THRESHOLD = Duration.ofSeconds(5).toMillis();
75+
76+
/**
77+
* How long to sleep in requests?
78+
*/
79+
private static final AtomicLong SLEEP_DURATION = new AtomicLong(
80+
Duration.ofSeconds(20).toMillis());
81+
82+
/**
83+
* Flag set if the sleep was interrupted during signing.
84+
*/
85+
private static final AtomicBoolean SLEEP_INTERRUPTED = new AtomicBoolean(false);
86+
87+
/**
88+
* Create a configuration with a 10 millisecond timeout on API calls
89+
* and a custom signer which sleeps much longer than that.
90+
* @return the configuration.
91+
*/
92+
@Override
93+
public Configuration createConfiguration() {
94+
final Configuration conf = super.createConfiguration();
95+
skipIfNotS3ExpressBucket(conf);
96+
disableFilesystemCaching(conf);
97+
removeBaseAndBucketOverrides(conf,
98+
CUSTOM_SIGNERS,
99+
HTTP_SIGNER_ENABLED,
100+
REQUEST_TIMEOUT,
101+
RETRY_LIMIT,
102+
S3A_BUCKET_PROBE,
103+
S3EXPRESS_CREATE_SESSION,
104+
SIGNING_ALGORITHM_S3
105+
);
106+
107+
conf.setBoolean(HTTP_SIGNER_ENABLED, true);
108+
conf.setClass(HTTP_SIGNER_CLASS_NAME, SlowSigner.class, HttpSigner.class);
109+
Duration duration = Duration.ofMillis(10);
110+
111+
conf.setLong(REQUEST_TIMEOUT, duration.toMillis());
112+
conf.setInt(RETRY_LIMIT, 1);
113+
114+
return conf;
115+
}
116+
117+
@Override
118+
public void setup() throws Exception {
119+
// remove the safety check on minimum durations.
120+
AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
121+
try {
122+
super.setup();
123+
} finally {
124+
// restore the safety check on minimum durations.
125+
AWSClientConfig.resetMinimumOperationDuration();
126+
}
127+
}
128+
129+
@Override
130+
protected void deleteTestDirInTeardown() {
131+
// no-op
132+
}
133+
134+
/**
135+
* Make this a no-op to avoid IO.
136+
* @param path path path
137+
*/
138+
@Override
139+
protected void mkdirs(Path path) {
140+
141+
}
142+
143+
@Test
144+
public void testSlowSigningTriggersTimeout() throws Throwable {
145+
146+
final S3AFileSystem fs = getFileSystem();
147+
DurationInfo call = new DurationInfo(LOG, true, "Create session");
148+
final AWSApiCallTimeoutException thrown = intercept(AWSApiCallTimeoutException.class,
149+
() -> fs.getFileStatus(path("testShortTimeout")));
150+
call.finished();
151+
LOG.info("Exception raised after {}", call, thrown);
152+
// if the timeout took too long, fail with details and include the original
153+
// exception
154+
if (call.value() > TIMEOUT_EXCEPTION_THRESHOLD) {
155+
throw new AssertionError("Duration of create session " + call.getDurationString()
156+
+ " exceeds threshold " + TIMEOUT_EXCEPTION_THRESHOLD + " ms: " + thrown, thrown);
157+
}
158+
Assertions.assertThat(SLEEP_INTERRUPTED.get())
159+
.describedAs("Sleep interrupted during signing")
160+
.isTrue();
161+
162+
// now scan the inner exception stack for "createSession"
163+
Arrays.stream(thrown.getCause().getStackTrace())
164+
.filter(e -> e.getMethodName().equals("createSession"))
165+
.findFirst()
166+
.orElseThrow(() ->
167+
new AssertionError("No createSession() in inner stack trace of", thrown));
168+
}
169+
170+
/**
171+
* Sleep for as long as {@link #SLEEP_DURATION} requires.
172+
*/
173+
private static void sleep() {
174+
long sleep = SLEEP_DURATION.get();
175+
if (sleep > 0) {
176+
LOG.info("Sleeping for {} ms", sleep, new Exception());
177+
try (DurationInfo d = new DurationInfo(LOG, true, "Sleep for %d ms", sleep)) {
178+
Thread.sleep(sleep);
179+
} catch (InterruptedException e) {
180+
LOG.info("Interrupted", e);
181+
SLEEP_INTERRUPTED.set(true);
182+
Thread.currentThread().interrupt();
183+
}
184+
}
185+
}
186+
187+
/**
188+
* A signer which calls {@link #sleep()} before signing.
189+
* As this signing takes place within the CreateSession Pipeline,
190+
*/
191+
public static class SlowSigner extends CustomHttpSigner {
192+
193+
@Override
194+
public SignedRequest sign(
195+
final SignRequest<? extends AwsCredentialsIdentity> request) {
196+
197+
final SdkHttpRequest httpRequest = request.request();
198+
LOG.info("Signing request {}", httpRequest);
199+
sleep();
200+
return super.sign(request);
201+
}
202+
203+
@Override
204+
public CompletableFuture<AsyncSignedRequest> signAsync(
205+
final AsyncSignRequest<? extends AwsCredentialsIdentity> request) {
206+
sleep();
207+
return super.signAsync(request);
208+
}
209+
210+
}
211+
}

0 commit comments

Comments
 (0)