7777import static org .apache .hadoop .fs .VectoredReadUtils .mergeSortedRanges ;
7878import static org .apache .hadoop .fs .VectoredReadUtils .validateAndSortRanges ;
7979import static org .apache .hadoop .fs .s3a .Invoker .onceTrackingDuration ;
80+ import static org .apache .hadoop .fs .s3a .impl .ErrorTranslation .shouldInputStreamBeAborted ;
81+ import static org .apache .hadoop .fs .s3a .impl .SDKStreamDrainer .abortSdkStream ;
8082import static org .apache .hadoop .util .StringUtils .toLowerCase ;
8183import static org .apache .hadoop .util .functional .FutureIO .awaitFuture ;
8284
@@ -571,11 +573,8 @@ public synchronized int read() throws IOException {
571573 }
572574 try {
573575 b = wrappedStream .read ();
574- } catch (HttpChannelEOFException | SocketTimeoutException e ) {
575- onReadFailure (e , true );
576- throw e ;
577576 } catch (IOException e ) {
578- onReadFailure (e , false );
577+ onReadFailure (e , shouldInputStreamBeAborted ( e ) );
579578 throw e ;
580579 }
581580 return b ;
@@ -1092,9 +1091,18 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa
10921091
10931092 // Attempt to recover from the failure by reading each range individually
10941093 // within the current thread.
1095- // If a single read is unrecoverable, all subsequent range reads are skipped.
1094+ // If a single read is unrecoverable, all subsequent range reads are failed
1095+ // with the same exception.
1096+ // this is to process unrecoverable failures faster.
1097+ IOException lastIOE = null ;
10961098 for (FileRange child : unreadRanges ) {
1097- readSingleRangeWithRetries (child , bufferPool );
1099+ if (lastIOE == null ) {
1100+ // all good so far: request the next range
1101+ lastIOE = readSingleRangeWithRetries (child , bufferPool );
1102+ } else {
1103+ // a predecessor failed, do not attempt to recover.
1104+ child .getData ().completeExceptionally (lastIOE );
1105+ }
10981106 }
10991107 } finally {
11001108 IOUtils .cleanupWithLogger (LOG , rangeContent );
@@ -1116,7 +1124,7 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa
11161124 */
11171125 @ Retries .OnceTranslated
11181126 private void populateChildBuffers (CombinedFileRange combinedFileRange ,
1119- InputStream objectContent ,
1127+ ResponseInputStream < GetObjectResponse > objectContent ,
11201128 ByteBufferPool bufferPool ) throws IOException {
11211129 // If the combined file range just contains a single child
11221130 // range, we only have to fill that one child buffer else
@@ -1146,6 +1154,7 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange,
11461154 // work out how much
11471155 long drainQuantity = child .getOffset () - position ;
11481156 // and drain it.
1157+ // this will raise EOFException if a -1 was returned.
11491158 drainUnnecessaryData (objectContent , position , drainQuantity );
11501159 }
11511160 }
@@ -1155,6 +1164,7 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange,
11551164 } catch (IOException e ) {
11561165 // release the buffer
11571166 bufferPool .putBuffer (buffer );
1167+ // rethrow
11581168 throw e ;
11591169 }
11601170 child .getData ().complete (buffer );
@@ -1174,7 +1184,7 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange,
11741184 */
11751185 @ Retries .OnceTranslated
11761186 private void drainUnnecessaryData (
1177- final InputStream objectContent ,
1187+ final ResponseInputStream < GetObjectResponse > objectContent ,
11781188 final long position ,
11791189 long drainQuantity ) throws IOException {
11801190
@@ -1198,38 +1208,69 @@ private void drainUnnecessaryData(
11981208 "End of stream reached draining data between ranges; expected %,d bytes;"
11991209 + " only drained %,d bytes before -1 returned (position=%,d)" ,
12001210 drainQuantity , drainBytes , position + drainBytes );
1211+ LOG .debug (s );
12011212 throw new EOFException (s );
12021213 }
12031214 drainBytes += readCount ;
12041215 remaining -= readCount ;
12051216 }
1217+ } catch (IOException ex ) {
1218+ if (shouldInputStreamBeAborted (ex )) {
1219+ // abort the stream if the exception indicates this is needed.
1220+ abortSdkStream (uri , objectContent , streamStatistics , "drain failure" );
1221+ }
1222+ throw ex ;
12061223 } finally {
12071224 streamStatistics .readVectoredBytesDiscarded (drainBytes );
12081225 LOG .debug ("{} bytes drained from stream " , drainBytes );
12091226 }
12101227 }
12111228
1212-
12131229 /**
12141230 * Read data from S3 for this range and populate a buffer.
1231+ * The GET request and single range reads are retried.
1232+ * Any IOException which is propagated by the retry logic is
1233+ * attached to the range as an exceptional failure.
12151234 * @param range range of data to read.
12161235 * @param bufferPool buffer allocator.
1236+ * @return any IOE which resulted in the read being unsuccessful; null on success.
12171237 */
12181238 @ Retries .RetryTranslated
1219- private void readSingleRangeWithRetries (
1220- FileRange range , ByteBufferPool bufferPool ) {
1221- context .getReadInvoker ().retry ("vector read" , uri , true , () ->
1222- readSingleRange (range , bufferPool , false ));
1239+ private IOException readSingleRangeWithRetries (
1240+ FileRange range ,
1241+ ByteBufferPool bufferPool ) {
1242+ try {
1243+ context .getReadInvoker ().retry ("vector read" , uri , true , () ->
1244+ readSingleRange (range , bufferPool ));
1245+ return null ;
1246+ } catch (IOException ex ) {
1247+ // the retry mechanism has stopped retrying, so mark the request as a failure.
1248+ range .getData ().completeExceptionally (ex );
1249+ return ex ;
1250+ }
12231251 }
12241252
12251253 /**
12261254 * Read data from S3 for this range and populate a buffer.
1255+ * If the full read was succesful, the range's future is declared
1256+ * complete.
1257+ * <p>
1258+ * If an exception is raised,
1259+ * <ol>
1260+ * <li>The buffer is returned to the pool.</li>
1261+ * <li>The HTTP connection will be aborted if deemed to have failed.</li>
1262+ * <li>The relevant statistics will be updated.</li>
1263+ * <li>The exception is rethrown.</li>
1264+ * </ol>
1265+ * This is to allow the operation to be invoked in a retry() operation.
12271266 * @param range range of data to read.
12281267 * @param bufferPool buffer allocator.
1229- * @param shouldRetry
1268+ * @throws IOException failure to GET or read the data.
12301269 */
1231- @ Retries .RetryTranslated ("GET is retried; reads are not" )
1232- private void readSingleRange (FileRange range , ByteBufferPool bufferPool , boolean shouldRetry ) {
1270+ @ Retries .OnceTranslated
1271+ private void readSingleRange (FileRange range ,
1272+ ByteBufferPool bufferPool ) throws IOException {
1273+
12331274 LOG .debug ("Start reading {} from {} " , range , pathStr );
12341275 if (range .getLength () == 0 ) {
12351276 ByteBuffer buffer = bufferPool .getBuffer (false , range .getLength ());
@@ -1248,7 +1289,7 @@ private void readSingleRange(FileRange range, ByteBufferPool bufferPool, boolean
12481289 long position = range .getOffset ();
12491290 int length = range .getLength ();
12501291 // a GET request, which has risk of failing if the file is gone, changed etc.
1251- objectRange = getS3Object ("readSingleRange" , position , length , shouldRetry );
1292+ objectRange = getS3Object ("readSingleRange" , position , length , false );
12521293
12531294 // GET has succeeded, make sure request is good to continue
12541295 checkIfVectoredIOStopped ();
@@ -1259,15 +1300,19 @@ private void readSingleRange(FileRange range, ByteBufferPool bufferPool, boolean
12591300 populateBuffer (range , buffer , objectRange );
12601301 range .getData ().complete (buffer );
12611302 LOG .debug ("Finished reading range {} from path {}" , range , pathStr );
1262- } catch (Throwable ex ) {
1303+ } catch (IOException ex ) {
12631304 // any failure.
12641305 // log, the error, return the buffer to the pool, and report a failure.
12651306 LOG .debug ("Exception while reading a range {} from path {}" , range , pathStr , ex );
12661307 if (buffer != null ) {
12671308 // return any buffer to the pool
12681309 bufferPool .putBuffer (buffer );
12691310 }
1270- range .getData ().completeExceptionally (ex );
1311+ if (shouldInputStreamBeAborted (ex )) {
1312+ // abort the stream if the exception indicates this is needed.
1313+ abortSdkStream (uri , objectRange , streamStatistics , "read failure" );
1314+ }
1315+ throw ex ;
12711316 } finally {
12721317 IOUtils .cleanupWithLogger (LOG , objectRange );
12731318 }
@@ -1280,7 +1325,7 @@ private void readSingleRange(FileRange range, ByteBufferPool bufferPool, boolean
12801325 * @param range vector range to populate.
12811326 * @param buffer buffer to fill.
12821327 * @param objectContent result retrieved from S3 store.
1283- * @throws IOException any IOE.
1328+ * @throws IOException any IOE raised reading the input stream .
12841329 * @throws EOFException if EOF if read() call returns -1
12851330 * @throws InterruptedIOException if vectored IO operation is stopped.
12861331 */
@@ -1334,14 +1379,12 @@ private void readByteArray(InputStream objectContent,
13341379 length - readBytes );
13351380 LOG .debug ("read {} bytes from stream" , readBytesCurr );
13361381 if (readBytesCurr < 0 ) {
1337- // TODO: abort the stream.
13381382 final String message = String .format ("HTTP stream closed before all bytes were read."
13391383 + " Expected %,d bytes but only read %,d bytes. Current position %,d"
13401384 + " (%s)" ,
13411385 length , readBytes , position , range );
13421386 LOG .warn (message );
1343- throw new EOFException (
1344- message );
1387+ throw new EOFException (message );
13451388 }
13461389 readBytes += readBytesCurr ;
13471390 position += readBytesCurr ;
@@ -1355,7 +1398,7 @@ private void readByteArray(InputStream objectContent,
13551398 * Read data from S3 with retries for the GET request, as part of a vector IO
13561399 * operation.
13571400 * <p>
1358- * This also handles if file has been changed while the
1401+ * This also handles the file being changed while the
13591402 * http call is getting executed. If the file has been
13601403 * changed RemoteFileChangedException is thrown.
13611404 * <p>
@@ -1405,20 +1448,19 @@ private ResponseInputStream<GetObjectResponse> getS3Object(String operationName,
14051448 }
14061449 changeTracker .processResponse (objectRange .response (), operationName ,
14071450 position );
1408- checkIfVectoredIOStopped ();
14091451 return objectRange ;
14101452 }
14111453
14121454 /**
1413- * Check if vectored io operation has been stooped . This happens
1414- * when the stream is closed or unbuffer is called.
1455+ * Check if vectored io operation has been stopped . This happens
1456+ * when the stream is closed or unbuffer() was called during the read .
14151457 * @throws InterruptedIOException throw InterruptedIOException such
14161458 * that all running vectored io is
14171459 * terminated thus releasing resources.
14181460 */
14191461 private void checkIfVectoredIOStopped () throws InterruptedIOException {
14201462 if (stopVectoredIOOperations .get ()) {
1421- throw new InterruptedIOException ("Stream closed or unbuffer is called" );
1463+ throw new InterruptedIOException ("Stream closed or unbuffer() was called during the read " );
14221464 }
14231465 }
14241466
0 commit comments