Skip to content

Commit 3d8f4a4

Browse files
authored
HADOOP-19348. Add initial support for Analytics Accelerator Library for Amazon S3 (#7192)
1 parent efb83ec commit 3d8f4a4

33 files changed

+618
-31
lines changed

hadoop-tools/hadoop-aws/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,17 @@
525525
<artifactId>amazon-s3-encryption-client-java</artifactId>
526526
<scope>provided</scope>
527527
</dependency>
528+
<dependency>
529+
<groupId>software.amazon.s3.analyticsaccelerator</groupId>
530+
<artifactId>analyticsaccelerator-s3</artifactId>
531+
<version>0.0.2</version>
532+
<scope>compile</scope>
533+
</dependency>
534+
<dependency>
535+
<groupId>software.amazon.awssdk.crt</groupId>
536+
<artifactId>aws-crt</artifactId>
537+
<version>0.29.10</version>
538+
</dependency>
528539
<dependency>
529540
<groupId>org.assertj</groupId>
530541
<artifactId>assertj-core</artifactId>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1760,4 +1760,38 @@ private Constants() {
17601760
* Value: {@value}.
17611761
*/
17621762
public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";
1763+
1764+
1765+
/**
1766+
* Prefix to configure Analytics Accelerator Library.
1767+
*/
1768+
public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
1769+
"fs.s3a.analytics.accelerator";
1770+
1771+
/**
1772+
* Config to enable Analytics Accelerator Library for Amazon S3.
1773+
* https://github.com/awslabs/analytics-accelerator-s3
1774+
*/
1775+
public static final String ANALYTICS_ACCELERATOR_ENABLED_KEY =
1776+
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + ".enabled";
1777+
1778+
/**
1779+
* Config to enable usage of crt client with Analytics Accelerator Library.
1780+
* It is by default true.
1781+
*/
1782+
public static final String ANALYTICS_ACCELERATOR_CRT_ENABLED =
1783+
"fs.s3a.analytics.accelerator.crt.client";
1784+
1785+
/**
1786+
* Default value for {@link #ANALYTICS_ACCELERATOR_ENABLED_KEY }
1787+
* Value {@value}.
1788+
*/
1789+
public static final boolean ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false;
1790+
1791+
/**
1792+
* Default value for {@link #ANALYTICS_ACCELERATOR_CRT_ENABLED }
1793+
* Value {@value}.
1794+
*/
1795+
public static final boolean ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT = true;
1796+
17631797
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@
5454

5555
import software.amazon.awssdk.core.ResponseInputStream;
5656
import software.amazon.awssdk.core.exception.SdkException;
57+
import software.amazon.awssdk.services.s3.S3AsyncClient;
5758
import software.amazon.awssdk.services.s3.S3Client;
59+
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
5860
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
5961
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
6062
import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
@@ -87,6 +89,11 @@
8789
import software.amazon.awssdk.transfer.s3.model.Copy;
8890
import software.amazon.awssdk.transfer.s3.model.CopyRequest;
8991

92+
import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
93+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
94+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
95+
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
96+
9097
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
9198
import org.slf4j.Logger;
9299
import org.slf4j.LoggerFactory;
@@ -317,6 +324,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
317324
*/
318325
private S3Client s3Client;
319326

327+
/**
328+
* CRT-Based S3Client created of analytics accelerator library is enabled
329+
* and managed by the S3AStoreImpl. Analytics accelerator library can be
330+
* enabled with {@link Constants#ANALYTICS_ACCELERATOR_ENABLED_KEY}
331+
*/
332+
private S3AsyncClient s3AsyncClient;
333+
320334
// initial callback policy is fail-once; it's there just to assist
321335
// some mock tests and other codepaths trying to call the low level
322336
// APIs on an uninitialized filesystem.
@@ -344,6 +358,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
344358
// If true, the prefetching input stream is used for reads.
345359
private boolean prefetchEnabled;
346360

361+
// If true, S3SeekableInputStream from Analytics Accelerator for Amazon S3 will be used.
362+
private boolean analyticsAcceleratorEnabled;
363+
364+
private boolean analyticsAcceleratorCRTEnabled;
365+
347366
// Size in bytes of a single prefetch block.
348367
private int prefetchBlockSize;
349368

@@ -525,6 +544,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
525544
*/
526545
private boolean s3AccessGrantsEnabled;
527546

547+
/**
548+
* Factory to create S3SeekableInputStream if {@link this#analyticsAcceleratorEnabled} is true.
549+
*/
550+
private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;
551+
528552
/** Add any deprecated keys. */
529553
@SuppressWarnings("deprecation")
530554
private static void addDeprecatedKeys() {
@@ -680,8 +704,21 @@ public void initialize(URI name, Configuration originalConf)
680704
this.prefetchBlockSize = (int) prefetchBlockSizeLong;
681705
this.prefetchBlockCount =
682706
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
707+
708+
this.analyticsAcceleratorEnabled =
709+
conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
710+
this.analyticsAcceleratorCRTEnabled =
711+
conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED,
712+
ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
713+
683714
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
684-
DEFAULT_MULTIPART_UPLOAD_ENABLED);
715+
DEFAULT_MULTIPART_UPLOAD_ENABLED);
716+
717+
if(this.analyticsAcceleratorEnabled && !analyticsAcceleratorCRTEnabled) {
718+
// Temp change: Analytics Accelerator with S3AsyncClient do not support Multi-part upload.
719+
this.isMultipartUploadEnabled = false;
720+
}
721+
685722
// multipart copy and upload are the same; this just makes it explicit
686723
this.isMultipartCopyEnabled = isMultipartUploadEnabled;
687724

@@ -809,6 +846,27 @@ public void initialize(URI name, Configuration originalConf)
809846
// directly through the client manager.
810847
// this is to aid mocking.
811848
s3Client = store.getOrCreateS3Client();
849+
850+
if (this.analyticsAcceleratorEnabled) {
851+
LOG.info("Using S3SeekableInputStream");
852+
if(this.analyticsAcceleratorCRTEnabled) {
853+
LOG.info("Using S3 CRT client for analytics accelerator S3");
854+
this.s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
855+
} else {
856+
LOG.info("Using S3 async client for analytics accelerator S3");
857+
this.s3AsyncClient = store.getOrCreateAsyncClient();
858+
}
859+
860+
ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
861+
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
862+
S3SeekableInputStreamConfiguration seekableInputStreamConfiguration =
863+
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
864+
this.s3SeekableInputStreamFactory =
865+
new S3SeekableInputStreamFactory(
866+
new S3SdkObjectClient(this.s3AsyncClient),
867+
seekableInputStreamConfiguration);
868+
}
869+
812870
// The filesystem is now ready to perform operations against
813871
// S3
814872
// This initiates a probe against S3 for the bucket existing.
@@ -1876,6 +1934,8 @@ private FSDataInputStream executeOpen(
18761934
final Path path,
18771935
final OpenFileSupport.OpenFileInformation fileInformation)
18781936
throws IOException {
1937+
1938+
18791939
// create the input stream statistics before opening
18801940
// the file so that the time to prepare to open the file is included.
18811941
S3AInputStreamStatistics inputStreamStats =
@@ -1892,6 +1952,14 @@ private FSDataInputStream executeOpen(
18921952
fileInformation.applyOptions(readContext);
18931953
LOG.debug("Opening '{}'", readContext);
18941954

1955+
if (this.analyticsAcceleratorEnabled) {
1956+
return new FSDataInputStream(
1957+
new S3ASeekableStream(
1958+
this.bucket,
1959+
pathToKey(path),
1960+
s3SeekableInputStreamFactory));
1961+
}
1962+
18951963
if (this.prefetchEnabled) {
18961964
Configuration configuration = getConf();
18971965
initLocalDirAllocatorIfNotInitialized(configuration);
@@ -4421,9 +4489,11 @@ public void close() throws IOException {
44214489
protected synchronized void stopAllServices() {
44224490
try {
44234491
trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), () -> {
4424-
closeAutocloseables(LOG, store);
4492+
closeAutocloseables(LOG, store, s3SeekableInputStreamFactory);
44254493
store = null;
44264494
s3Client = null;
4495+
s3AsyncClient = null;
4496+
s3SeekableInputStreamFactory = null;
44274497

44284498
// At this point the S3A client is shut down,
44294499
// now the executor pools are closed
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hadoop.fs.s3a;
21+
22+
import java.io.EOFException;
23+
import java.io.IOException;
24+
25+
import org.apache.hadoop.fs.FSExceptionMessages;
26+
import org.apache.hadoop.fs.StreamCapabilities;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import org.apache.hadoop.fs.FSInputStream;
31+
32+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
33+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
34+
import software.amazon.s3.analyticsaccelerator.util.S3URI;
35+
36+
public class S3ASeekableStream extends FSInputStream implements StreamCapabilities {
37+
38+
private S3SeekableInputStream inputStream;
39+
private long lastReadCurrentPos = 0;
40+
private final String key;
41+
private volatile boolean closed;
42+
43+
public static final Logger LOG = LoggerFactory.getLogger(S3ASeekableStream.class);
44+
45+
public S3ASeekableStream(String bucket, String key,
46+
S3SeekableInputStreamFactory s3SeekableInputStreamFactory) {
47+
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key));
48+
this.key = key;
49+
}
50+
51+
/**
52+
* Indicates whether the given {@code capability} is supported by this stream.
53+
*
54+
* @param capability the capability to check.
55+
* @return true if the given {@code capability} is supported by this stream, false otherwise.
56+
*/
57+
@Override
58+
public boolean hasCapability(String capability) {
59+
return false;
60+
}
61+
62+
@Override
63+
public int read() throws IOException {
64+
throwIfClosed();
65+
int bytesRead;
66+
try {
67+
bytesRead = inputStream.read();
68+
} catch (IOException ioe) {
69+
onReadFailure(ioe);
70+
throw ioe;
71+
}
72+
return bytesRead;
73+
}
74+
75+
@Override
76+
public void seek(long pos) throws IOException {
77+
throwIfClosed();
78+
if (pos < 0) {
79+
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
80+
+ " " + pos);
81+
}
82+
inputStream.seek(pos);
83+
}
84+
85+
86+
@Override
87+
public synchronized long getPos() {
88+
if (!closed) {
89+
lastReadCurrentPos = inputStream.getPos();
90+
}
91+
return lastReadCurrentPos;
92+
}
93+
94+
95+
/**
96+
* Reads the last n bytes from the stream into a byte buffer. Blocks until end of stream is
97+
* reached. Leaves the position of the stream unaltered.
98+
*
99+
* @param buf buffer to read data into
100+
* @param off start position in buffer at which data is written
101+
* @param len the number of bytes to read; the n-th byte should be the last byte of the stream.
102+
* @return the total number of bytes read into the buffer
103+
* @throws IOException if an I/O error occurs
104+
*/
105+
public int readTail(byte[] buf, int off, int len) throws IOException {
106+
throwIfClosed();
107+
int bytesRead;
108+
try {
109+
bytesRead = inputStream.readTail(buf, off, len);
110+
} catch (IOException ioe) {
111+
onReadFailure(ioe);
112+
throw ioe;
113+
}
114+
return bytesRead;
115+
}
116+
117+
@Override
118+
public int read(byte[] buf, int off, int len) throws IOException {
119+
throwIfClosed();
120+
int bytesRead;
121+
try {
122+
bytesRead = inputStream.read(buf, off, len);
123+
} catch (IOException ioe) {
124+
onReadFailure(ioe);
125+
throw ioe;
126+
}
127+
return bytesRead;
128+
}
129+
130+
131+
@Override
132+
public boolean seekToNewSource(long l) throws IOException {
133+
return false;
134+
}
135+
136+
@Override
137+
public int available() throws IOException {
138+
throwIfClosed();
139+
return super.available();
140+
}
141+
142+
@Override
143+
public synchronized void close() throws IOException {
144+
if(!closed) {
145+
closed = true;
146+
try {
147+
inputStream.close();
148+
inputStream = null;
149+
super.close();
150+
} catch (IOException ioe) {
151+
LOG.debug("Failure closing stream {}: ", key);
152+
throw ioe;
153+
}
154+
}
155+
}
156+
157+
/**
158+
* Close the stream on read failure.
159+
* No attempt to recover from failure
160+
*
161+
* @param ioe exception caught.
162+
*/
163+
@Retries.OnceTranslated
164+
private void onReadFailure(IOException ioe) throws IOException {
165+
if (LOG.isDebugEnabled()) {
166+
LOG.debug("Got exception while trying to read from stream {}, " +
167+
"not trying to recover:",
168+
key, ioe);
169+
} else {
170+
LOG.info("Got exception while trying to read from stream {}, " +
171+
"not trying to recover:",
172+
key, ioe);
173+
}
174+
this.close();
175+
}
176+
177+
178+
protected void throwIfClosed() throws IOException {
179+
if (closed) {
180+
throw new IOException(key + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
181+
}
182+
}
183+
}

0 commit comments

Comments
 (0)