Skip to content

Commit 9aba17e

Browse files
authored
HADOOP-19542. S3A: Close AAL factory on service stop. (#7616) (#7658)
Contributed by: Ahmar Suhail.
1 parent 64bc773 commit 9aba17e

File tree

7 files changed

+37
-0
lines changed

7 files changed

+37
-0
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,13 @@ public final class StreamStatisticNames {
104104
*/
105105
public static final String STREAM_READ_ANALYTICS_OPENED = "stream_read_analytics_opened";
106106

107+
/**
108+
* Total count of times object stream factory was closed.
109+
*
110+
* Value: {@value}.
111+
*/
112+
public static final String ANALYTICS_STREAM_FACTORY_CLOSED = "analytics_stream_factory_closed";
113+
107114
/**
108115
* Count of exceptions raised during input stream reads.
109116
* Value: {@value}.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,10 @@ public enum Statistic {
352352
StreamStatisticNames.STREAM_READ_CLOSE_OPERATIONS,
353353
"Total count of times an attempt to close an input stream was made",
354354
TYPE_COUNTER),
355+
ANALYTICS_STREAM_FACTORY_CLOSED(
356+
"analytics_stream_factory_closed",
357+
"Count of times the analytics stream factory was closed",
358+
TYPE_COUNTER),
355359
STREAM_READ_EXCEPTIONS(
356360
StreamStatisticNames.STREAM_READ_EXCEPTIONS,
357361
"Count of exceptions raised during input stream reads",

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -996,6 +996,12 @@ public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) throws IOE
996996
LOG.debug("Stream factory requested async client");
997997
return clientManager().getOrCreateAsyncClient();
998998
}
999+
1000+
@Override
1001+
public void incrementFactoryStatistic(Statistic statistic) {
1002+
incrementStatistic(statistic);
1003+
}
1004+
9991005
}
10001006

10011007
/*

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.hadoop.util.functional.LazyAutoCloseableReference;
3333

3434
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
35+
import static org.apache.hadoop.fs.s3a.Statistic.ANALYTICS_STREAM_FACTORY_CLOSED;
3536
import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.populateVectoredIOContext;
3637

3738
/**
@@ -95,6 +96,13 @@ public StreamFactoryRequirements factoryRequirements() {
9596
StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests);
9697
}
9798

99+
@Override
100+
protected void serviceStop() throws Exception {
101+
this.s3SeekableInputStreamFactory.close();
102+
callbacks().incrementFactoryStatistic(ANALYTICS_STREAM_FACTORY_CLOSED);
103+
super.serviceStop();
104+
}
105+
98106
private S3SeekableInputStreamFactory getOrCreateS3SeekableInputStreamFactory()
99107
throws IOException {
100108
return s3SeekableInputStreamFactory.eval();

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import software.amazon.awssdk.services.s3.S3AsyncClient;
2424

25+
import org.apache.hadoop.fs.s3a.Statistic;
2526
import org.apache.hadoop.fs.StreamCapabilities;
2627
import org.apache.hadoop.service.Service;
2728

@@ -85,6 +86,8 @@ interface StreamFactoryCallbacks {
8586
* @throws IOException failure to create the client.
8687
*/
8788
S3AsyncClient getOrCreateAsyncClient(boolean requireCRT) throws IOException;
89+
90+
void incrementFactoryStatistic(Statistic statistic);
8891
}
8992
}
9093

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
4949
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
5050
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
51+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED;
5152
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
5253

5354
/**
@@ -106,6 +107,8 @@ public void testConnectorFrameWorkIntegration() throws Throwable {
106107
}
107108

108109
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
110+
fs.close();
111+
verifyStatisticCounterValue(fs.getIOStatistics(), ANALYTICS_STREAM_FACTORY_CLOSED, 1);
109112
}
110113

111114
@Test

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.IOException;
2222
import java.io.UncheckedIOException;
2323

24+
import org.apache.hadoop.fs.s3a.Statistic;
2425
import org.assertj.core.api.Assertions;
2526
import org.junit.Test;
2627
import software.amazon.awssdk.services.s3.S3AsyncClient;
@@ -334,6 +335,11 @@ private static final class Callbacks implements ObjectInputStreamFactory.StreamF
334335
public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) throws IOException {
335336
throw new UnsupportedOperationException("not implemented");
336337
}
338+
339+
@Override
340+
public void incrementFactoryStatistic(Statistic statistic) {
341+
throw new UnsupportedOperationException("not implemented");
342+
}
337343
}
338344

339345
}

0 commit comments

Comments
 (0)