Skip to content

Commit 7298e92

Browse files
committed
HADOOP-19295. review feedback
* default timeout set in builder * tune logging of content provider on recovery * new tests to verify timeout propagation Change-Id: I43e2822e4dbd684d2c0469650b07369b731a2e7c
1 parent b5833bb commit 7298e92

File tree

5 files changed

+79
-8
lines changed

5 files changed

+79
-8
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
6161

6262
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
63+
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
6364
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
6465
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout;
6566
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
@@ -724,7 +725,7 @@ public static final class RequestFactoryBuilder {
724725
* This will be set on data put/post operations only.
725726
* A zero value means "no custom timeout"
726727
*/
727-
private Duration partUploadTimeout = Duration.ZERO;
728+
private Duration partUploadTimeout = DEFAULT_PART_UPLOAD_TIMEOUT;
728729

729730
private RequestFactoryBuilder() {
730731
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/UploadContentProviders.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,11 +282,11 @@ public final InputStream newStream() {
282282
close();
283283
checkOpen();
284284
streamCreationCount++;
285-
if (streamCreationCount > 1) {
285+
if (streamCreationCount == 2) {
286+
// the stream has been recreated for the first time.
287+
// notify only once for this stream, so as not to flood
288+
// the logs.
286289
LOG.info("Stream recreated: {}", this);
287-
if (LOG.isDebugEnabled()) {
288-
LOG.debug("Stream creation stack", new Exception("here"));
289-
}
290290
}
291291
return setCurrentStream(createNewStream());
292292
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksPathCapabilities;
4242
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
4343
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
44+
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
4445
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
4546
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
4647
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
@@ -100,7 +101,10 @@ public void testCreateNonRecursiveSuccess() throws IOException {
100101
public void testPutObjectDirect() throws Throwable {
101102
final S3AFileSystem fs = getFileSystem();
102103
try (AuditSpan span = span()) {
103-
RequestFactory factory = RequestFactoryImpl.builder().withBucket(fs.getBucket()).build();
104+
RequestFactory factory = RequestFactoryImpl.builder()
105+
.withBucket(fs.getBucket())
106+
.withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT)
107+
.build();
104108
Path path = path("putDirect");
105109
PutObjectRequest.Builder putObjectRequestBuilder =
106110
factory.newPutObjectRequestBuilder(path.toUri().getPath(), null, -1, false);

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.hadoop.util.Progressable;
5555

5656

57+
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
5758
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.noopAuditor;
5859
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory;
5960
import static org.apache.hadoop.util.Preconditions.checkNotNull;
@@ -99,6 +100,7 @@ public class MockS3AFileSystem extends S3AFileSystem {
99100
.withRequestPreparer(MockS3AFileSystem::prepareRequest)
100101
.withBucket(BUCKET)
101102
.withEncryptionSecrets(new EncryptionSecrets())
103+
.withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT)
102104
.build();
103105

104106
/**

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,21 @@
1919
package org.apache.hadoop.fs.s3a.impl;
2020

2121
import java.io.IOException;
22+
import java.time.Duration;
2223
import java.util.ArrayList;
2324
import java.util.concurrent.atomic.AtomicLong;
2425

2526
import software.amazon.awssdk.awscore.AwsRequest;
27+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
2628
import software.amazon.awssdk.core.SdkRequest;
2729
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
2830
import org.assertj.core.api.Assertions;
2931
import org.junit.Test;
3032
import org.slf4j.Logger;
3133
import org.slf4j.LoggerFactory;
34+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
35+
import software.amazon.awssdk.services.s3.model.S3Request;
36+
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
3237

3338

3439
import org.apache.hadoop.fs.PathIOException;
@@ -38,6 +43,7 @@
3843
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
3944
import org.apache.hadoop.test.AbstractHadoopTestBase;
4045

46+
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
4147
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
4248
import static org.assertj.core.api.Assertions.assertThat;
4349

@@ -109,8 +115,6 @@ public void testRequestFactoryWithCannedACL() throws Throwable {
109115
.isEqualTo(acl);
110116
}
111117

112-
113-
114118
/**
115119
* Now add a processor and verify that it was invoked for
116120
* exactly as many requests as were analyzed.
@@ -207,4 +211,64 @@ public void testMultipartUploadRequest() throws Throwable {
207211
.isEqualTo(requestsAnalyzed);
208212
}
209213

214+
/**
215+
* Assertion for Request timeouts.
216+
* @param duration expected duration.
217+
* @param request request.
218+
*/
219+
private void assertApiTimeouts(Duration duration, S3Request request) {
220+
Assertions.assertThat(request.overrideConfiguration())
221+
.describedAs("request %s", request)
222+
.isNotEmpty();
223+
final AwsRequestOverrideConfiguration override =
224+
request.overrideConfiguration().get();
225+
Assertions.assertThat(override.apiCallAttemptTimeout())
226+
.describedAs("apiCallAttemptTimeout")
227+
.hasValue(duration);
228+
Assertions.assertThat(override.apiCallTimeout())
229+
.describedAs("apiCallTimeout")
230+
.hasValue(duration);
231+
}
232+
233+
/**
234+
* If not overridden timeouts are set to the default part upload timeout.
235+
*/
236+
@Test
237+
public void testDefaultUploadTimeouts() throws Throwable {
238+
239+
RequestFactory factory = RequestFactoryImpl.builder()
240+
.withBucket("bucket")
241+
.withMultipartPartCountLimit(2)
242+
.build();
243+
final UploadPartRequest upload =
244+
factory.newUploadPartRequestBuilder("path", "id", 2, 128_000_000).build();
245+
assertApiTimeouts(DEFAULT_PART_UPLOAD_TIMEOUT, upload);
246+
}
247+
248+
/**
249+
* Verify that when upload request timeouts are set,
250+
* they are passed down.
251+
*/
252+
@Test
253+
public void testUploadTimeouts() throws Throwable {
254+
Duration partDuration = Duration.ofDays(1);
255+
RequestFactory factory = RequestFactoryImpl.builder()
256+
.withBucket("bucket")
257+
.withPartUploadTimeout(partDuration)
258+
.build();
259+
260+
String path = "path";
261+
262+
// A simple PUT
263+
final PutObjectRequest put = factory.newPutObjectRequestBuilder(path,
264+
PutObjectOptions.deletingDirs(), 1024, false).build();
265+
assertApiTimeouts(partDuration, put);
266+
267+
// multipart part
268+
final UploadPartRequest upload = factory.newUploadPartRequestBuilder(path,
269+
"1", 3, 128_000_000)
270+
.build();
271+
assertApiTimeouts(partDuration, upload);
272+
273+
}
210274
}

0 commit comments

Comments
 (0)