5151import java .net .InetSocketAddress ;
5252import java .net .SocketTimeoutException ;
5353import java .nio .charset .StandardCharsets ;
54+ import java .util .Arrays ;
5455import java .util .Locale ;
5556import java .util .Objects ;
5657import java .util .concurrent .atomic .AtomicBoolean ;
@@ -192,21 +193,31 @@ public void testWriteBlobWithRetries() throws Exception {
192193 final int maxRetries = randomInt (5 );
193194 final CountDown countDown = new CountDown (maxRetries + 1 );
194195
195- final byte [] bytes = randomByteArrayOfLength (randomIntBetween (1 , 512 ));
196+ final byte [] bytes = randomByteArrayOfLength (randomIntBetween (1 , frequently () ? 512 : 1 << 20 )); // rarely up to 1mb
196197 httpServer .createContext ("/bucket/write_blob_max_retries" , exchange -> {
197- final BytesReference body = Streams .readFully (exchange .getRequestBody ());
198- if (countDown .countDown ()) {
199- if (Objects .deepEquals (bytes , BytesReference .toBytes (body ))) {
200- exchange .sendResponseHeaders (HttpStatus .SC_OK , -1 );
201- } else {
202- exchange .sendResponseHeaders (HttpStatus .SC_BAD_REQUEST , -1 );
198+ if ("PUT" .equals (exchange .getRequestMethod ()) && exchange .getRequestURI ().getQuery () == null ) {
199+ if (countDown .countDown ()) {
200+ final BytesReference body = Streams .readFully (exchange .getRequestBody ());
201+ if (Objects .deepEquals (bytes , BytesReference .toBytes (body ))) {
202+ exchange .sendResponseHeaders (HttpStatus .SC_OK , -1 );
203+ } else {
204+ exchange .sendResponseHeaders (HttpStatus .SC_BAD_REQUEST , -1 );
205+ }
206+ exchange .close ();
207+ return ;
208+ }
209+
210+ if (randomBoolean ()) {
211+ if (randomBoolean ()) {
212+ Streams .readFully (exchange .getRequestBody (), new byte [randomIntBetween (1 , bytes .length - 1 )]);
213+ } else {
214+ Streams .readFully (exchange .getRequestBody ());
215+ exchange .sendResponseHeaders (randomFrom (HttpStatus .SC_INTERNAL_SERVER_ERROR , HttpStatus .SC_BAD_GATEWAY ,
216+ HttpStatus .SC_SERVICE_UNAVAILABLE , HttpStatus .SC_GATEWAY_TIMEOUT ), -1 );
217+ }
203218 }
204219 exchange .close ();
205- return ;
206220 }
207- exchange .sendResponseHeaders (randomFrom (HttpStatus .SC_INTERNAL_SERVER_ERROR , HttpStatus .SC_BAD_GATEWAY ,
208- HttpStatus .SC_SERVICE_UNAVAILABLE , HttpStatus .SC_GATEWAY_TIMEOUT ), -1 );
209- exchange .close ();
210221 });
211222
212223 final BlobContainer blobContainer = createBlobContainer (maxRetries , null , true , null );
@@ -217,17 +228,21 @@ public void testWriteBlobWithRetries() throws Exception {
217228 }
218229
219230 public void testWriteBlobWithReadTimeouts () {
231+ final byte [] bytes = randomByteArrayOfLength (randomIntBetween (10 , 128 ));
220232 final TimeValue readTimeout = TimeValue .timeValueMillis (randomIntBetween (100 , 500 ));
221233 final BlobContainer blobContainer = createBlobContainer (1 , readTimeout , true , null );
222234
223235 // HTTP server does not send a response
224236 httpServer .createContext ("/bucket/write_blob_timeout" , exchange -> {
225237 if (randomBoolean ()) {
226- Streams .readFully (exchange .getRequestBody ());
238+ if (randomBoolean ()) {
239+ Streams .readFully (exchange .getRequestBody (), new byte [randomIntBetween (1 , bytes .length - 1 )]);
240+ } else {
241+ Streams .readFully (exchange .getRequestBody ());
242+ }
227243 }
228244 });
229245
230- final byte [] bytes = randomByteArrayOfLength (randomIntBetween (1 , 128 ));
231246 Exception exception = expectThrows (IOException .class , () -> {
232247 try (InputStream stream = new InputStreamIndexInput (new ByteArrayIndexInput ("desc" , bytes ), bytes .length )) {
233248 blobContainer .writeBlob ("write_blob_timeout" , stream , bytes .length , false );
@@ -249,16 +264,18 @@ public void testWriteLargeBlob() throws Exception {
249264 final ByteSizeValue bufferSize = new ByteSizeValue (5 , ByteSizeUnit .MB );
250265 final BlobContainer blobContainer = createBlobContainer (null , readTimeout , true , bufferSize );
251266
252- final int parts = randomIntBetween (1 , 2 );
267+ final int parts = randomIntBetween (1 , 5 );
253268 final long lastPartSize = randomLongBetween (10 , 512 );
254269 final long blobSize = (parts * bufferSize .getBytes ()) + lastPartSize ;
255270
256- final int maxRetries = 2 ; // we want all requests to fail at least once
257- final CountDown countDownInitiate = new CountDown (maxRetries );
258- final AtomicInteger countDownUploads = new AtomicInteger (maxRetries * (parts + 1 ));
259- final CountDown countDownComplete = new CountDown (maxRetries );
271+ final int nbErrors = 2 ; // we want all requests to fail at least once
272+ final CountDown countDownInitiate = new CountDown (nbErrors );
273+ final AtomicInteger countDownUploads = new AtomicInteger (nbErrors * (parts + 1 ));
274+ final CountDown countDownComplete = new CountDown (nbErrors );
260275
261276 httpServer .createContext ("/bucket/write_large_blob" , exchange -> {
277+ final long contentLength = Long .parseLong (exchange .getRequestHeaders ().getFirst ("Content-Length" ));
278+
262279 if ("POST" .equals (exchange .getRequestMethod ())
263280 && exchange .getRequestURI ().getQuery ().equals ("uploads" )) {
264281 // initiate multipart upload request
@@ -275,11 +292,14 @@ public void testWriteLargeBlob() throws Exception {
275292 exchange .close ();
276293 return ;
277294 }
278- } else if ("PUT" .equals (exchange .getRequestMethod ())) {
295+ } else if ("PUT" .equals (exchange .getRequestMethod ())
296+ && exchange .getRequestURI ().getQuery ().contains ("uploadId=TEST" )
297+ && exchange .getRequestURI ().getQuery ().contains ("partNumber=" )) {
279298 // upload part request
280299 MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream (exchange .getRequestBody ());
281300 BytesReference bytes = Streams .readFully (md5 );
282301 assertThat ((long ) bytes .length (), anyOf (equalTo (lastPartSize ), equalTo (bufferSize .getBytes ())));
302+ assertThat (contentLength , anyOf (equalTo (lastPartSize ), equalTo (bufferSize .getBytes ())));
283303
284304 if (countDownUploads .decrementAndGet () % 2 == 0 ) {
285305 exchange .getResponseHeaders ().add ("ETag" , Base16 .encodeAsString (md5 .getMd5Digest ()));
@@ -289,10 +309,10 @@ public void testWriteLargeBlob() throws Exception {
289309 }
290310
291311 } else if ("POST" .equals (exchange .getRequestMethod ())
292- && exchange .getRequestURI ().getQuery ().equals ("uploadId=TEST" )) {
312+ && exchange .getRequestURI ().getQuery ().equals ("uploadId=TEST" )) {
293313 // complete multipart upload request
294- Streams .readFully (exchange .getRequestBody ());
295314 if (countDownComplete .countDown ()) {
315+ Streams .readFully (exchange .getRequestBody ());
296316 byte [] response = ("<?xml version=\" 1.0\" encoding=\" UTF-8\" ?>\n " +
297317 "<CompleteMultipartUploadResult>\n " +
298318 " <Bucket>bucket</Bucket>\n " +
@@ -308,8 +328,13 @@ public void testWriteLargeBlob() throws Exception {
308328
309329 // sends an error back or let the request time out
310330 if (useTimeout == false ) {
311- exchange .sendResponseHeaders (randomFrom (HttpStatus .SC_INTERNAL_SERVER_ERROR , HttpStatus .SC_BAD_GATEWAY ,
312- HttpStatus .SC_SERVICE_UNAVAILABLE , HttpStatus .SC_GATEWAY_TIMEOUT ), -1 );
331+ if (randomBoolean () && contentLength > 0 ) {
332+ Streams .readFully (exchange .getRequestBody (), new byte [randomIntBetween (1 , Math .toIntExact (contentLength - 1 ))]);
333+ } else {
334+ Streams .readFully (exchange .getRequestBody ());
335+ exchange .sendResponseHeaders (randomFrom (HttpStatus .SC_INTERNAL_SERVER_ERROR , HttpStatus .SC_BAD_GATEWAY ,
336+ HttpStatus .SC_SERVICE_UNAVAILABLE , HttpStatus .SC_GATEWAY_TIMEOUT ), -1 );
337+ }
313338 exchange .close ();
314339 }
315340 });
@@ -323,9 +348,6 @@ public void testWriteLargeBlob() throws Exception {
323348
324349 /**
325350 * A resettable InputStream that only serves zeros.
326- *
327- * Ideally it should be wrapped into a BufferedInputStream but it seems that the AWS SDK is calling InputStream{@link #reset()}
328- * before calling InputStream{@link #mark(int)}, which is not permitted by the {@link #reset()} method contract.
329351 **/
330352 private static class ZeroInputStream extends InputStream {
331353
@@ -336,17 +358,32 @@ private static class ZeroInputStream extends InputStream {
336358
337359 private ZeroInputStream (final long length ) {
338360 this .length = length ;
339- this .reads = new AtomicLong (length );
361+ this .reads = new AtomicLong (0 );
340362 this .mark = -1 ;
341363 }
342364
343365 @ Override
344366 public int read () throws IOException {
345367 ensureOpen ();
346- if (reads .decrementAndGet () < 0 ) {
368+ return (reads .incrementAndGet () <= length ) ? 0 : -1 ;
369+ }
370+
371+ @ Override
372+ public int read (byte [] b , int off , int len ) throws IOException {
373+ ensureOpen ();
374+ if (len == 0 ) {
375+ return 0 ;
376+ }
377+
378+ final int available = available ();
379+ if (available == 0 ) {
347380 return -1 ;
348381 }
349- return 0 ;
382+
383+ final int toCopy = Math .min (len , available );
384+ Arrays .fill (b , off , off + toCopy , (byte ) 0 );
385+ reads .addAndGet (toCopy );
386+ return toCopy ;
350387 }
351388
352389 @ Override
@@ -368,7 +405,14 @@ public synchronized void reset() throws IOException {
368405 @ Override
369406 public int available () throws IOException {
370407 ensureOpen ();
371- return Math .toIntExact (length - reads .get ());
408+ if (reads .get () >= length ) {
409+ return 0 ;
410+ }
411+ try {
412+ return Math .toIntExact (length - reads .get ());
413+ } catch (ArithmeticException e ) {
414+ return Integer .MAX_VALUE ;
415+ }
372416 }
373417
374418 @ Override
0 commit comments