Skip to content

Commit 06d132a

Browse files
mukund-thakurHarshitGupta11
authored andcommitted
HADOOP-18227. Add input stream IOStats for vectored IO api in S3A. (apache#4636)
part of HADOOP-18103. Contributed By: Mukund Thakur
1 parent 3008014 commit 06d132a

File tree

10 files changed

+303
-35
lines changed

10 files changed

+303
-35
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public final class StreamStatisticNames {
4747
public static final String STREAM_READ_ABORTED = "stream_aborted";
4848

4949
/**
50-
* Bytes read from an input stream in read() calls.
50+
* Bytes read from an input stream in read()/readVectored() calls.
5151
* Does not include bytes read and then discarded in seek/close etc.
5252
* These are the bytes returned to the caller.
5353
* Value: {@value}.
@@ -110,6 +110,34 @@ public final class StreamStatisticNames {
110110
public static final String STREAM_READ_OPERATIONS =
111111
"stream_read_operations";
112112

113+
/**
114+
* Count of readVectored() operations in an input stream.
115+
* Value: {@value}.
116+
*/
117+
public static final String STREAM_READ_VECTORED_OPERATIONS =
118+
"stream_read_vectored_operations";
119+
120+
/**
121+
* Count of bytes discarded during readVectored() operation
122+
* in an input stream.
123+
* Value: {@value}.
124+
*/
125+
public static final String STREAM_READ_VECTORED_READ_BYTES_DISCARDED =
126+
"stream_read_vectored_read_bytes_discarded";
127+
128+
/**
129+
* Count of incoming file ranges during readVectored() operation.
130+
* Value: {@value}
131+
*/
132+
public static final String STREAM_READ_VECTORED_INCOMING_RANGES =
133+
"stream_read_vectored_incoming_ranges";
134+
/**
135+
* Count of combined file ranges during readVectored() operation.
136+
* Value: {@value}
137+
*/
138+
public static final String STREAM_READ_VECTORED_COMBINED_RANGES =
139+
"stream_read_vectored_combined_ranges";
140+
113141
/**
114142
* Count of incomplete read() operations in an input stream,
115143
* that is, when the bytes returned were less than that requested.

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ public IntFunction<ByteBuffer> getAllocate() {
8484
return allocate;
8585
}
8686

87+
public WeakReferencedElasticByteBufferPool getPool() {
88+
return pool;
89+
}
90+
8791
@Override
8892
public void setup() throws Exception {
8993
super.setup();
@@ -382,6 +386,13 @@ protected List<FileRange> getSampleOverlappingRanges() {
382386
return fileRanges;
383387
}
384388

389+
protected List<FileRange> getConsecutiveRanges() {
390+
List<FileRange> fileRanges = new ArrayList<>();
391+
fileRanges.add(FileRange.createFileRange(100, 500));
392+
fileRanges.add(FileRange.createFileRange(600, 500));
393+
return fileRanges;
394+
}
395+
385396
/**
386397
* Validate that exceptions must be thrown during a vectored
387398
* read operation with specific input ranges.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -963,7 +963,6 @@ public int maxReadSizeForVectorReads() {
963963
@Override
964964
public void readVectored(List<? extends FileRange> ranges,
965965
IntFunction<ByteBuffer> allocate) throws IOException {
966-
967966
LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges);
968967
checkNotClosed();
969968
if (stopVectoredIOOperations.getAndSet(false)) {
@@ -978,6 +977,7 @@ public void readVectored(List<? extends FileRange> ranges,
978977

979978
if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
980979
LOG.debug("Not merging the ranges as they are disjoint");
980+
streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size());
981981
for (FileRange range: sortedRanges) {
982982
ByteBuffer buffer = allocate.apply(range.getLength());
983983
unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
@@ -987,6 +987,7 @@ public void readVectored(List<? extends FileRange> ranges,
987987
List<CombinedFileRange> combinedFileRanges = mergeSortedRanges(sortedRanges,
988988
1, minSeekForVectorReads(),
989989
maxReadSizeForVectorReads());
990+
streamStatistics.readVectoredOperationStarted(sortedRanges.size(), combinedFileRanges.size());
990991
LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
991992
ranges.size(), combinedFileRanges.size());
992993
for (CombinedFileRange combinedFileRange: combinedFileRanges) {
@@ -1088,6 +1089,7 @@ private void drainUnnecessaryData(S3ObjectInputStream objectContent, long drainQ
10881089
}
10891090
drainBytes += readCount;
10901091
}
1092+
streamStatistics.readVectoredBytesDiscarded(drainBytes);
10911093
LOG.debug("{} bytes drained from stream ", drainBytes);
10921094
}
10931095

@@ -1168,6 +1170,8 @@ private void populateBuffer(int length,
11681170
} else {
11691171
readByteArray(objectContent, buffer.array(), 0, length);
11701172
}
1173+
// update io stats.
1174+
incrementBytesRead(length);
11711175
}
11721176

11731177
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,10 @@ private final class InputStreamStatistics
803803
private final AtomicLong readOperations;
804804
private final AtomicLong readFullyOperations;
805805
private final AtomicLong seekOperations;
806+
private final AtomicLong readVectoredOperations;
807+
private final AtomicLong bytesDiscardedInVectoredIO;
808+
private final AtomicLong readVectoredIncomingRanges;
809+
private final AtomicLong readVectoredCombinedRanges;
806810

807811
/** Bytes read by the application and any when draining streams . */
808812
private final AtomicLong totalBytesRead;
@@ -836,6 +840,10 @@ private InputStreamStatistics(
836840
StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED,
837841
StreamStatisticNames.STREAM_READ_TOTAL_BYTES,
838842
StreamStatisticNames.STREAM_READ_UNBUFFERED,
843+
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
844+
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
845+
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
846+
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
839847
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES)
840848
.withGauges(STREAM_READ_GAUGE_INPUT_POLICY)
841849
.withDurationTracking(ACTION_HTTP_GET_REQUEST,
@@ -872,6 +880,14 @@ private InputStreamStatistics(
872880
StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE);
873881
readOperations = st.getCounterReference(
874882
StreamStatisticNames.STREAM_READ_OPERATIONS);
883+
readVectoredOperations = st.getCounterReference(
884+
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS);
885+
bytesDiscardedInVectoredIO = st.getCounterReference(
886+
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED);
887+
readVectoredIncomingRanges = st.getCounterReference(
888+
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES);
889+
readVectoredCombinedRanges = st.getCounterReference(
890+
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES);
875891
readFullyOperations = st.getCounterReference(
876892
StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS);
877893
seekOperations = st.getCounterReference(
@@ -1017,6 +1033,19 @@ public void readOperationCompleted(int requested, int actual) {
10171033
}
10181034
}
10191035

1036+
@Override
1037+
public void readVectoredOperationStarted(int numIncomingRanges,
1038+
int numCombinedRanges) {
1039+
readVectoredIncomingRanges.addAndGet(numIncomingRanges);
1040+
readVectoredCombinedRanges.addAndGet(numCombinedRanges);
1041+
readVectoredOperations.incrementAndGet();
1042+
}
1043+
1044+
@Override
1045+
public void readVectoredBytesDiscarded(int discarded) {
1046+
bytesDiscardedInVectoredIO.addAndGet(discarded);
1047+
}
1048+
10201049
/**
10211050
* {@code close()} merges the stream statistics into the filesystem's
10221051
* instrumentation instance.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,20 @@ void streamClose(boolean abortedConnection,
9696
*/
9797
void readOperationCompleted(int requested, int actual);
9898

99+
/**
100+
* A vectored read operation has started..
101+
* @param numIncomingRanges number of input ranges.
102+
* @param numCombinedRanges number of combined ranges.
103+
*/
104+
void readVectoredOperationStarted(int numIncomingRanges,
105+
int numCombinedRanges);
106+
107+
/**
108+
* Number of bytes discarded during vectored read.
109+
* @param discarded discarded bytes during vectored read.
110+
*/
111+
void readVectoredBytesDiscarded(int discarded);
112+
99113
@Override
100114
void close();
101115

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,17 @@ public void readOperationCompleted(final int requested, final int actual) {
195195

196196
}
197197

198+
@Override
199+
public void readVectoredOperationStarted(int numIncomingRanges,
200+
int numCombinedRanges) {
201+
202+
}
203+
204+
@Override
205+
public void readVectoredBytesDiscarded(int discarded) {
206+
207+
}
208+
198209
@Override
199210
public void close() {
200211

0 commit comments

Comments
 (0)