Skip to content

Commit 0d1f291

Browse files
committed
Adds in integration for AWS analytics accelerator library
1 parent eadf0dd commit 0d1f291

28 files changed

+590
-27
lines changed

hadoop-project/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@
206206
<aws-java-sdk.version>1.12.720</aws-java-sdk.version>
207207
<aws-java-sdk-v2.version>2.25.53</aws-java-sdk-v2.version>
208208
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
209+
<amazon-s3-analyticsaccelerator-s3.version>0.0.3</amazon-s3-analyticsaccelerator-s3.version>
209210
<aws.eventstream.version>1.0.1</aws.eventstream.version>
210211
<hsqldb.version>2.7.1</hsqldb.version>
211212
<frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version>
@@ -1192,6 +1193,11 @@
11921193
</exclusion>
11931194
</exclusions>
11941195
</dependency>
1196+
<dependency>
1197+
<groupId>software.amazon.s3.analyticsaccelerator</groupId>
1198+
<artifactId>analyticsaccelerator-s3</artifactId>
1199+
<version>${amazon-s3-analyticsaccelerator-s3.version}</version>
1200+
</dependency>
11951201
<dependency>
11961202
<groupId>org.apache.mina</groupId>
11971203
<artifactId>mina-core</artifactId>

hadoop-tools/hadoop-aws/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,11 @@
472472
<artifactId>amazon-s3-encryption-client-java</artifactId>
473473
<scope>provided</scope>
474474
</dependency>
475+
<dependency>
476+
<groupId>software.amazon.s3.analyticsaccelerator</groupId>
477+
<artifactId>analyticsaccelerator-s3</artifactId>
478+
<scope>compile</scope>
479+
</dependency>
475480
<dependency>
476481
<groupId>org.assertj</groupId>
477482
<artifactId>assertj-core</artifactId>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1827,4 +1827,12 @@ private Constants() {
18271827
* Value: {@value}.
18281828
*/
18291829
public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";
1830+
1831+
1832+
/**
1833+
* Prefix to configure Analytics Accelerator Library.
1834+
*/
1835+
public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
1836+
"fs.s3a.analytics.accelerator";
1837+
18301838
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -948,7 +948,7 @@ public File createTemporaryFileForWriting(String pathStr,
948948
* All stream factory initialization required after {@code Service.init()},
949949
* after all other services have themselves been initialized.
950950
*/
951-
private void finishStreamFactoryInit() {
951+
private void finishStreamFactoryInit() throws IOException {
952952
// must be on be invoked during service initialization
953953
Preconditions.checkState(isInState(STATE.INITED),
954954
"Store is in wrong state: %s", getServiceState());

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.hadoop.fs.s3a.impl.streams;
2020

21+
import java.io.IOException;
22+
2123
import org.apache.hadoop.fs.StreamCapabilities;
2224
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
2325
import org.apache.hadoop.service.AbstractService;
@@ -54,7 +56,7 @@ protected AbstractObjectInputStreamFactory(final String name) {
5456
* @param factoryBindingParameters parameters for the factory binding
5557
*/
5658
@Override
57-
public void bind(final FactoryBindingParameters factoryBindingParameters) {
59+
public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException {
5860
// must be on be invoked during service initialization
5961
Preconditions.checkState(isInState(STATE.INITED),
6062
"Input Stream factory %s is in wrong state: %s",
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hadoop.fs.s3a.impl.streams;
21+
22+
import java.io.EOFException;
23+
import java.io.IOException;
24+
25+
import org.apache.hadoop.fs.FSExceptionMessages;
26+
import org.apache.hadoop.fs.StreamCapabilities;
27+
import org.apache.hadoop.fs.s3a.Retries;
28+
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
29+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
34+
import software.amazon.s3.analyticsaccelerator.util.S3URI;
35+
36+
/**
37+
* Analytics stream creates a stream using aws-analytics-accelerator-s3. This stream supports
38+
* parquet specific optimisations such as parquet-aware prefetching. For more details, see
39+
* https://github.com/awslabs/analytics-accelerator-s3.
40+
*/
41+
public class AnalyticsStream extends ObjectInputStream implements StreamCapabilities {
42+
43+
private S3SeekableInputStream inputStream;
44+
private long lastReadCurrentPos = 0;
45+
private volatile boolean closed;
46+
47+
public static final Logger LOG = LoggerFactory.getLogger(AnalyticsStream.class);
48+
49+
public AnalyticsStream(final ObjectReadParameters parameters, final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
50+
super(InputStreamType.Analytics, parameters);
51+
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
52+
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey()));
53+
}
54+
55+
/**
56+
* Indicates whether the given {@code capability} is supported by this stream.
57+
*
58+
* @param capability the capability to check.
59+
* @return true if the given {@code capability} is supported by this stream, false otherwise.
60+
*/
61+
@Override
62+
public boolean hasCapability(String capability) {
63+
return false;
64+
}
65+
66+
@Override
67+
public int read() throws IOException {
68+
throwIfClosed();
69+
int bytesRead;
70+
try {
71+
bytesRead = inputStream.read();
72+
} catch (IOException ioe) {
73+
onReadFailure(ioe);
74+
throw ioe;
75+
}
76+
return bytesRead;
77+
}
78+
79+
@Override
80+
public void seek(long pos) throws IOException {
81+
throwIfClosed();
82+
if (pos < 0) {
83+
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
84+
+ " " + pos);
85+
}
86+
inputStream.seek(pos);
87+
}
88+
89+
90+
@Override
91+
public synchronized long getPos() {
92+
if (!closed) {
93+
lastReadCurrentPos = inputStream.getPos();
94+
}
95+
return lastReadCurrentPos;
96+
}
97+
98+
99+
/**
100+
* Reads the last n bytes from the stream into a byte buffer. Blocks until end of stream is
101+
* reached. Leaves the position of the stream unaltered.
102+
*
103+
* @param buf buffer to read data into
104+
* @param off start position in buffer at which data is written
105+
* @param len the number of bytes to read; the n-th byte should be the last byte of the stream.
106+
* @return the total number of bytes read into the buffer
107+
* @throws IOException if an I/O error occurs
108+
*/
109+
public int readTail(byte[] buf, int off, int len) throws IOException {
110+
throwIfClosed();
111+
int bytesRead;
112+
try {
113+
bytesRead = inputStream.readTail(buf, off, len);
114+
} catch (IOException ioe) {
115+
onReadFailure(ioe);
116+
throw ioe;
117+
}
118+
return bytesRead;
119+
}
120+
121+
@Override
122+
public int read(byte[] buf, int off, int len) throws IOException {
123+
throwIfClosed();
124+
int bytesRead;
125+
try {
126+
bytesRead = inputStream.read(buf, off, len);
127+
} catch (IOException ioe) {
128+
onReadFailure(ioe);
129+
throw ioe;
130+
}
131+
return bytesRead;
132+
}
133+
134+
135+
@Override
136+
public boolean seekToNewSource(long l) throws IOException {
137+
return false;
138+
}
139+
140+
@Override
141+
public int available() throws IOException {
142+
throwIfClosed();
143+
return super.available();
144+
}
145+
146+
@Override
147+
protected boolean isStreamOpen() {
148+
return !isClosed();
149+
}
150+
151+
protected boolean isClosed() {
152+
return inputStream == null;
153+
}
154+
155+
@Override
156+
protected void abortInFinalizer() {
157+
try {
158+
close();
159+
} catch (IOException ignored) {
160+
161+
}
162+
}
163+
164+
@Override
165+
public synchronized void close() throws IOException {
166+
if(!closed) {
167+
closed = true;
168+
try {
169+
inputStream.close();
170+
inputStream = null;
171+
super.close();
172+
} catch (IOException ioe) {
173+
LOG.debug("Failure closing stream {}: ", getKey());
174+
throw ioe;
175+
}
176+
}
177+
}
178+
179+
/**
180+
* Close the stream on read failure.
181+
* No attempt to recover from failure
182+
*
183+
* @param ioe exception caught.
184+
*/
185+
@Retries.OnceTranslated
186+
private void onReadFailure(IOException ioe) throws IOException {
187+
if (LOG.isDebugEnabled()) {
188+
LOG.debug("Got exception while trying to read from stream {}, " +
189+
"not trying to recover:",
190+
getKey(), ioe);
191+
} else {
192+
LOG.info("Got exception while trying to read from stream {}, " +
193+
"not trying to recover:",
194+
getKey(), ioe);
195+
}
196+
this.close();
197+
}
198+
199+
200+
protected void throwIfClosed() throws IOException {
201+
if (closed) {
202+
throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
203+
}
204+
}
205+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hadoop.fs.s3a.impl.streams;
21+
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.fs.s3a.VectoredIOContext;
24+
25+
import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
26+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
27+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
28+
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
29+
30+
import java.io.IOException;
31+
32+
import static org.apache.hadoop.fs.s3a.Constants.*;
33+
import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.populateVectoredIOContext;
34+
35+
/**
36+
* A factory for {@link AnalyticsStream}. This class is instantiated during initialization of
37+
* {@code S3AStore}, if fs.s3a.input.stream.type is set to Analytics.
38+
*/
39+
public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory {
40+
41+
private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration;
42+
private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;
43+
private boolean requireCrt;
44+
45+
public AnalyticsStreamFactory() {
46+
super("AnalyticsStreamFactory");
47+
}
48+
49+
@Override
50+
protected void serviceInit(final Configuration conf) throws Exception {
51+
super.serviceInit(conf);
52+
ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
53+
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
54+
this.seekableInputStreamConfiguration =
55+
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
56+
this.requireCrt = false;
57+
}
58+
59+
@Override
60+
public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException {
61+
super.bind(factoryBindingParameters);
62+
this.s3SeekableInputStreamFactory = new S3SeekableInputStreamFactory(
63+
new S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)),
64+
seekableInputStreamConfiguration);
65+
}
66+
67+
@Override
68+
public ObjectInputStream readObject(final ObjectReadParameters parameters) throws IOException {
69+
return new AnalyticsStream(
70+
parameters,
71+
s3SeekableInputStreamFactory);
72+
}
73+
74+
75+
76+
@Override
77+
public InputStreamType streamType() {
78+
return InputStreamType.Analytics;
79+
}
80+
81+
/**
82+
* Calculate Return StreamFactoryRequirements
83+
* @return a positive thread count.
84+
*/
85+
@Override
86+
public StreamFactoryRequirements factoryRequirements() {
87+
// fill in the vector context
88+
final VectoredIOContext vectorContext = populateVectoredIOContext(getConfig());
89+
// and then disable range merging.
90+
// this ensures that no reads are made for data which is then discarded...
91+
// so the prefetch and block read code doesn't ever do wasteful fetches.
92+
vectorContext.setMinSeekForVectoredReads(0);
93+
94+
return new StreamFactoryRequirements(0,
95+
0, false, false,
96+
vectorContext);
97+
}
98+
99+
100+
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,11 @@ public enum InputStreamType {
4545
*/
4646
Prefetch(StreamIntegration.PREFETCH, 2, c ->
4747
new PrefetchingInputStreamFactory()),
48-
4948
/**
5049
* The analytics input stream.
5150
*/
52-
Analytics(StreamIntegration.ANALYTICS, 3, c -> {
53-
throw new IllegalArgumentException("not yet supported");
54-
}),
51+
Analytics(StreamIntegration.ANALYTICS, 3, c ->
52+
new AnalyticsStreamFactory()),
5553

5654
/**
5755
* The a custom input stream.

0 commit comments

Comments
 (0)