diff --git a/hadoop-ozone/s3gateway/pom.xml b/hadoop-ozone/s3gateway/pom.xml index 18bbd906a0b..5956e92476a 100644 --- a/hadoop-ozone/s3gateway/pom.xml +++ b/hadoop-ozone/s3gateway/pom.xml @@ -165,6 +165,11 @@ hdds-test-utils test + + org.mockito + mockito-inline + test + diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 81e49d64f7c..f163d3eac33 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -217,13 +217,14 @@ public Response put( @HeaderParam("Content-Length") long length, @QueryParam("partNumber") int partNumber, @QueryParam("uploadId") @DefaultValue("") String uploadID, - InputStream body) throws IOException, OS3Exception { + final InputStream body) throws IOException, OS3Exception { long startNanos = Time.monotonicNowNanos(); S3GAction s3GAction = S3GAction.CREATE_KEY; boolean auditSuccess = true; PerformanceStringBuilder perf = new PerformanceStringBuilder(); String copyHeader = null, storageType = null; + DigestInputStream digestInputStream = null; try { OzoneVolume volume = getVolume(); if (uploadID != null && !uploadID.equals("")) { @@ -297,11 +298,11 @@ public Response put( if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD" .equals(headers.getHeaderString("x-amz-content-sha256"))) { - body = new DigestInputStream(new SignedChunksInputStream(body), - E_TAG_PROVIDER.get()); + digestInputStream = new DigestInputStream(new SignedChunksInputStream(body), + getMessageDigestInstance()); length = Long.parseLong(amzDecodedLength); } else { - body = new DigestInputStream(body, E_TAG_PROVIDER.get()); + digestInputStream = new DigestInputStream(body, getMessageDigestInstance()); } long putLength; @@ -310,7 +311,7 @@ public Response put( perf.appendStreamMode(); Pair keyWriteResult = ObjectEndpointStreaming .put(bucket, keyPath, length, replicationConfig, chunkSize, - customMetadata, (DigestInputStream) body, perf); + customMetadata, digestInputStream, perf); eTag = keyWriteResult.getKey(); putLength = keyWriteResult.getValue(); } else { @@ -320,9 +321,9 @@ public Response put( long metadataLatencyNs = getMetrics().updatePutKeyMetadataStats(startNanos); perf.appendMetaLatencyNanos(metadataLatencyNs); - putLength = IOUtils.copyLarge(body, output); + putLength = IOUtils.copyLarge(digestInputStream, output); eTag = DatatypeConverter.printHexBinary( - ((DigestInputStream) body).getMessageDigest().digest()) + digestInputStream.getMessageDigest().digest()) .toLowerCase(); output.getMetadata().put(ETAG, eTag); } @@ -367,6 +368,11 @@ public Response put( } throw ex; } finally { + // Reset the thread-local message digest instance in case of exception + // and MessageDigest#digest is never called + if (digestInputStream != null) { + digestInputStream.getMessageDigest().reset(); + } if (auditSuccess) { long opLatencyNs = getMetrics().updateCreateKeySuccessStats(startNanos); perf.appendOpLatencyNanos(opLatencyNs); @@ -879,20 +885,21 @@ public Response completeMultipartUpload(@PathParam("bucket") String bucket, @SuppressWarnings({"checkstyle:MethodLength", "checkstyle:ParameterNumber"}) private Response createMultipartKey(OzoneVolume volume, String bucket, String key, long length, int partNumber, String uploadID, - InputStream body, PerformanceStringBuilder perf) + final InputStream body, PerformanceStringBuilder perf) throws IOException, OS3Exception { long startNanos = Time.monotonicNowNanos(); String copyHeader = null; + DigestInputStream digestInputStream = null; try { if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD" .equals(headers.getHeaderString("x-amz-content-sha256"))) { - body = new DigestInputStream(new SignedChunksInputStream(body), - E_TAG_PROVIDER.get()); + digestInputStream = new DigestInputStream(new SignedChunksInputStream(body), + getMessageDigestInstance()); length = Long.parseLong( headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER)); } else { - body = new DigestInputStream(body, E_TAG_PROVIDER.get()); + digestInputStream = new DigestInputStream(body, getMessageDigestInstance()); } copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER); @@ -912,7 +919,7 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, perf.appendStreamMode(); return ObjectEndpointStreaming .createMultipartKey(ozoneBucket, key, length, partNumber, - uploadID, chunkSize, (DigestInputStream) body, perf); + uploadID, chunkSize, digestInputStream, perf); } // OmMultipartCommitUploadPartInfo can only be gotten after the // OzoneOutputStream is closed, so we need to save the KeyOutputStream @@ -993,10 +1000,10 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, partNumber, uploadID)) { metadataLatencyNs = getMetrics().updatePutKeyMetadataStats(startNanos); - putLength = IOUtils.copyLarge(body, ozoneOutputStream); + putLength = IOUtils.copyLarge(digestInputStream, ozoneOutputStream); ((KeyMetadataAware)ozoneOutputStream.getOutputStream()) .getMetadata().put(ETAG, DatatypeConverter.printHexBinary( - ((DigestInputStream) body).getMessageDigest().digest()) + digestInputStream.getMessageDigest().digest()) .toLowerCase()); keyOutputStream = ozoneOutputStream.getKeyOutputStream(); @@ -1042,6 +1049,12 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, throw os3Exception; } throw ex; + } finally { + // Reset the thread-local message digest instance in case of exception + // and MessageDigest#digest is never called + if (digestInputStream != null) { + digestInputStream.getMessageDigest().reset(); + } } } @@ -1122,21 +1135,20 @@ public void setContext(ContainerRequestContext context) { } @SuppressWarnings("checkstyle:ParameterNumber") - void copy(OzoneVolume volume, InputStream src, long srcKeyLen, + void copy(OzoneVolume volume, DigestInputStream src, long srcKeyLen, String destKey, String destBucket, ReplicationConfig replication, Map metadata, PerformanceStringBuilder perf, long startNanos) throws IOException { long copyLength; - src = new DigestInputStream(src, E_TAG_PROVIDER.get()); if (datastreamEnabled && !(replication != null && replication.getReplicationType() == EC) && srcKeyLen > datastreamMinLength) { perf.appendStreamMode(); copyLength = ObjectEndpointStreaming .copyKeyWithStream(volume.getBucket(destBucket), destKey, srcKeyLen, - chunkSize, replication, metadata, (DigestInputStream) src, perf, startNanos); + chunkSize, replication, metadata, src, perf, startNanos); } else { try (OzoneOutputStream dest = getClientProtocol() .createKey(volume.getName(), destBucket, destKey, srcKeyLen, @@ -1145,9 +1157,7 @@ void copy(OzoneVolume volume, InputStream src, long srcKeyLen, getMetrics().updateCopyKeyMetadataStats(startNanos); perf.appendMetaLatencyNanos(metadataLatencyNs); copyLength = IOUtils.copyLarge(src, dest); - String eTag = DatatypeConverter.printHexBinary( - ((DigestInputStream) src).getMessageDigest().digest()) - .toLowerCase(); + String eTag = DatatypeConverter.printHexBinary(src.getMessageDigest().digest()).toLowerCase(); dest.getMetadata().put(ETAG, eTag); } } @@ -1166,6 +1176,7 @@ private CopyObjectResponse copyObject(OzoneVolume volume, String sourceBucket = result.getLeft(); String sourceKey = result.getRight(); + DigestInputStream sourceDigestInputStream = null; try { OzoneKeyDetails sourceKeyDetails = getClientProtocol().getKeyDetails( volume.getName(), sourceBucket, sourceKey); @@ -1195,11 +1206,11 @@ private CopyObjectResponse copyObject(OzoneVolume volume, } } long sourceKeyLen = sourceKeyDetails.getDataSize(); - try (OzoneInputStream src = getClientProtocol().getKey(volume.getName(), sourceBucket, sourceKey)) { getMetrics().updateCopyKeyMetadataStats(startNanos); - copy(volume, src, sourceKeyLen, destkey, destBucket, replicationConfig, + sourceDigestInputStream = new DigestInputStream(src, getMessageDigestInstance()); + copy(volume, sourceDigestInputStream, sourceKeyLen, destkey, destBucket, replicationConfig, sourceKeyDetails.getMetadata(), perf, startNanos); } @@ -1221,6 +1232,12 @@ private CopyObjectResponse copyObject(OzoneVolume volume, destBucket + "/" + destkey, ex); } throw ex; + } finally { + // Reset the thread-local message digest instance in case of exception + // and MessageDigest#digest is never called + if (sourceDigestInputStream != null) { + sourceDigestInputStream.getMessageDigest().reset(); + } } } @@ -1321,4 +1338,9 @@ private String wrapInQuotes(String value) { return "\"" + value + "\""; } + @VisibleForTesting + public MessageDigest getMessageDigestInstance() { + return E_TAG_PROVIDER.get(); + } + } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java index 0daa666ae4c..07c7bbd5d71 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java @@ -23,6 +23,8 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.security.MessageDigest; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import org.apache.commons.io.IOUtils; @@ -46,6 +48,7 @@ import org.apache.http.HttpStatus; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER; @@ -57,10 +60,13 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -88,7 +94,7 @@ public void setup() throws IOException { clientStub.getObjectStore().createS3Bucket(destBucket); // Create PutObject and setClient to OzoneClientStub - objectEndpoint = new ObjectEndpoint(); + objectEndpoint = spy(new ObjectEndpoint()); objectEndpoint.setClient(clientStub); objectEndpoint.setOzoneConfiguration(new OzoneConfiguration()); } @@ -226,6 +232,31 @@ public void testPutObjectWithSignedChunks() throws IOException, OS3Exception { assertTrue(StringUtils.isNotEmpty(keyDetails.getMetadata().get(OzoneConsts.ETAG))); } + @Test + public void testPutObjectMessageDigestResetDuringException() throws OS3Exception { + MessageDigest messageDigest = mock(MessageDigest.class); + try (MockedStatic mocked = mockStatic(IOUtils.class)) { + // For example, EOFException during put-object due to client cancelling the operation before it completes + mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), any(OutputStream.class))) + .thenThrow(IOException.class); + when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest); + + HttpHeaders headers = mock(HttpHeaders.class); + ByteArrayInputStream body = + new ByteArrayInputStream(CONTENT.getBytes(UTF_8)); + objectEndpoint.setHeaders(headers); + try { + objectEndpoint.put(bucketName, keyName, CONTENT + .length(), 1, null, body); + fail("Should throw IOException"); + } catch (IOException ignored) { + // Verify that the message digest is reset so that the instance can be reused for the + // next request in the same thread + verify(messageDigest, times(1)).reset(); + } + } + } + @Test public void testCopyObject() throws IOException, OS3Exception { // Put object in to source bucket @@ -314,6 +345,53 @@ public void testCopyObject() throws IOException, OS3Exception { assertThat(e.getCode()).contains("NoSuchBucket"); } + @Test + public void testCopyObjectMessageDigestResetDuringException() throws IOException, OS3Exception { + // Put object in to source bucket + HttpHeaders headers = mock(HttpHeaders.class); + ByteArrayInputStream body = + new ByteArrayInputStream(CONTENT.getBytes(UTF_8)); + objectEndpoint.setHeaders(headers); + keyName = "sourceKey"; + + Response response = objectEndpoint.put(bucketName, keyName, + CONTENT.length(), 1, null, body); + + OzoneInputStream ozoneInputStream = clientStub.getObjectStore() + .getS3Bucket(bucketName) + .readKey(keyName); + + String keyContent = IOUtils.toString(ozoneInputStream, UTF_8); + OzoneKeyDetails keyDetails = clientStub.getObjectStore().getS3Bucket(bucketName).getKey(keyName); + + assertEquals(200, response.getStatus()); + assertEquals(CONTENT, keyContent); + assertNotNull(keyDetails.getMetadata()); + assertTrue(StringUtils.isNotEmpty(keyDetails.getMetadata().get(OzoneConsts.ETAG))); + + MessageDigest messageDigest = mock(MessageDigest.class); + try (MockedStatic mocked = mockStatic(IOUtils.class)) { + // Add the mocked methods only during the copy request + when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest); + mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), any(OutputStream.class))) + .thenThrow(IOException.class); + + // Add copy header, and then call put + when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn( + bucketName + "/" + urlEncode(keyName)); + + try { + objectEndpoint.put(destBucket, destkey, CONTENT.length(), 1, + null, body); + fail("Should throw IOException"); + } catch (IOException ignored) { + // Verify that the message digest is reset so that the instance can be reused for the + // next request in the same thread + verify(messageDigest, times(1)).reset(); + } + } + } + @Test public void testInvalidStorageType() throws IOException { HttpHeaders headers = mock(HttpHeaders.class); diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java index bb1b7037bd9..aecc56fe172 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java @@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.s3.endpoint; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.OzoneClient; @@ -28,12 +29,16 @@ import org.apache.hadoop.ozone.s3.exception.OS3Exception; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.security.MessageDigest; import java.util.UUID; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; @@ -44,7 +49,13 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -194,6 +205,53 @@ public void testPartUploadContentLength() throws IOException, OS3Exception { assertContentLength(uploadID, keyName, content.length()); } + @Test + public void testPartUploadMessageDigestResetDuringException() throws IOException, OS3Exception { + OzoneClient clientStub = new OzoneClientStub(); + clientStub.getObjectStore().createS3Bucket(OzoneConsts.S3_BUCKET); + + + HttpHeaders headers = mock(HttpHeaders.class); + when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn( + "STANDARD"); + + ObjectEndpoint objectEndpoint = spy(new ObjectEndpoint()); + + objectEndpoint.setHeaders(headers); + objectEndpoint.setClient(clientStub); + objectEndpoint.setOzoneConfiguration(new OzoneConfiguration()); + + Response response = objectEndpoint.initializeMultipartUpload(OzoneConsts.S3_BUCKET, + OzoneConsts.KEY); + MultipartUploadInitiateResponse multipartUploadInitiateResponse = + (MultipartUploadInitiateResponse) response.getEntity(); + assertNotNull(multipartUploadInitiateResponse.getUploadID()); + String uploadID = multipartUploadInitiateResponse.getUploadID(); + + assertEquals(200, response.getStatus()); + + MessageDigest messageDigest = mock(MessageDigest.class); + try (MockedStatic mocked = mockStatic(IOUtils.class)) { + // Add the mocked methods only during the copy request + when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest); + mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), any(OutputStream.class))) + .thenThrow(IOException.class); + + String content = "Multipart Upload"; + ByteArrayInputStream body = + new ByteArrayInputStream(content.getBytes(UTF_8)); + try { + objectEndpoint.put(OzoneConsts.S3_BUCKET, OzoneConsts.KEY, + content.length(), 1, uploadID, body); + fail("Should throw IOException"); + } catch (IOException ignored) { + // Verify that the message digest is reset so that the instance can be reused for the + // next request in the same thread + verify(messageDigest, times(1)).reset(); + } + } + } + private void assertContentLength(String uploadID, String key, long contentLength) throws IOException { OzoneMultipartUploadPartListParts parts =