Skip to content

Commit

Permalink
HDDS-10587. Reset ETag's thread-local MessageDigest instance on excep…
Browse files Browse the repository at this point in the history
…tion (apache#6435)
  • Loading branch information
ivandika3 authored Mar 26, 2024
1 parent 80bafd0 commit c6c611f
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 23 deletions.
5 changes: 5 additions & 0 deletions hadoop-ozone/s3gateway/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@
<artifactId>hdds-test-utils</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,14 @@ public Response put(
@HeaderParam("Content-Length") long length,
@QueryParam("partNumber") int partNumber,
@QueryParam("uploadId") @DefaultValue("") String uploadID,
InputStream body) throws IOException, OS3Exception {
final InputStream body) throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
S3GAction s3GAction = S3GAction.CREATE_KEY;
boolean auditSuccess = true;
PerformanceStringBuilder perf = new PerformanceStringBuilder();

String copyHeader = null, storageType = null;
DigestInputStream digestInputStream = null;
try {
OzoneVolume volume = getVolume();
if (uploadID != null && !uploadID.equals("")) {
Expand Down Expand Up @@ -297,11 +298,11 @@ public Response put(

if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
.equals(headers.getHeaderString("x-amz-content-sha256"))) {
body = new DigestInputStream(new SignedChunksInputStream(body),
E_TAG_PROVIDER.get());
digestInputStream = new DigestInputStream(new SignedChunksInputStream(body),
getMessageDigestInstance());
length = Long.parseLong(amzDecodedLength);
} else {
body = new DigestInputStream(body, E_TAG_PROVIDER.get());
digestInputStream = new DigestInputStream(body, getMessageDigestInstance());
}

long putLength;
Expand All @@ -310,7 +311,7 @@ public Response put(
perf.appendStreamMode();
Pair<String, Long> keyWriteResult = ObjectEndpointStreaming
.put(bucket, keyPath, length, replicationConfig, chunkSize,
customMetadata, (DigestInputStream) body, perf);
customMetadata, digestInputStream, perf);
eTag = keyWriteResult.getKey();
putLength = keyWriteResult.getValue();
} else {
Expand All @@ -320,9 +321,9 @@ public Response put(
long metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
perf.appendMetaLatencyNanos(metadataLatencyNs);
putLength = IOUtils.copyLarge(body, output);
putLength = IOUtils.copyLarge(digestInputStream, output);
eTag = DatatypeConverter.printHexBinary(
((DigestInputStream) body).getMessageDigest().digest())
digestInputStream.getMessageDigest().digest())
.toLowerCase();
output.getMetadata().put(ETAG, eTag);
}
Expand Down Expand Up @@ -367,6 +368,11 @@ public Response put(
}
throw ex;
} finally {
// Reset the thread-local message digest instance in case of exception
// and MessageDigest#digest is never called
if (digestInputStream != null) {
digestInputStream.getMessageDigest().reset();
}
if (auditSuccess) {
long opLatencyNs = getMetrics().updateCreateKeySuccessStats(startNanos);
perf.appendOpLatencyNanos(opLatencyNs);
Expand Down Expand Up @@ -879,20 +885,21 @@ public Response completeMultipartUpload(@PathParam("bucket") String bucket,
@SuppressWarnings({"checkstyle:MethodLength", "checkstyle:ParameterNumber"})
private Response createMultipartKey(OzoneVolume volume, String bucket,
String key, long length, int partNumber, String uploadID,
InputStream body, PerformanceStringBuilder perf)
final InputStream body, PerformanceStringBuilder perf)
throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
String copyHeader = null;
DigestInputStream digestInputStream = null;
try {

if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
.equals(headers.getHeaderString("x-amz-content-sha256"))) {
body = new DigestInputStream(new SignedChunksInputStream(body),
E_TAG_PROVIDER.get());
digestInputStream = new DigestInputStream(new SignedChunksInputStream(body),
getMessageDigestInstance());
length = Long.parseLong(
headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER));
} else {
body = new DigestInputStream(body, E_TAG_PROVIDER.get());
digestInputStream = new DigestInputStream(body, getMessageDigestInstance());
}

copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
Expand All @@ -912,7 +919,7 @@ private Response createMultipartKey(OzoneVolume volume, String bucket,
perf.appendStreamMode();
return ObjectEndpointStreaming
.createMultipartKey(ozoneBucket, key, length, partNumber,
uploadID, chunkSize, (DigestInputStream) body, perf);
uploadID, chunkSize, digestInputStream, perf);
}
// OmMultipartCommitUploadPartInfo can only be gotten after the
// OzoneOutputStream is closed, so we need to save the KeyOutputStream
Expand Down Expand Up @@ -993,10 +1000,10 @@ private Response createMultipartKey(OzoneVolume volume, String bucket,
partNumber, uploadID)) {
metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
putLength = IOUtils.copyLarge(body, ozoneOutputStream);
putLength = IOUtils.copyLarge(digestInputStream, ozoneOutputStream);
((KeyMetadataAware)ozoneOutputStream.getOutputStream())
.getMetadata().put(ETAG, DatatypeConverter.printHexBinary(
((DigestInputStream) body).getMessageDigest().digest())
digestInputStream.getMessageDigest().digest())
.toLowerCase());
keyOutputStream
= ozoneOutputStream.getKeyOutputStream();
Expand Down Expand Up @@ -1042,6 +1049,12 @@ private Response createMultipartKey(OzoneVolume volume, String bucket,
throw os3Exception;
}
throw ex;
} finally {
// Reset the thread-local message digest instance in case of exception
// and MessageDigest#digest is never called
if (digestInputStream != null) {
digestInputStream.getMessageDigest().reset();
}
}
}

Expand Down Expand Up @@ -1122,21 +1135,20 @@ public void setContext(ContainerRequestContext context) {
}

@SuppressWarnings("checkstyle:ParameterNumber")
void copy(OzoneVolume volume, InputStream src, long srcKeyLen,
void copy(OzoneVolume volume, DigestInputStream src, long srcKeyLen,
String destKey, String destBucket,
ReplicationConfig replication,
Map<String, String> metadata,
PerformanceStringBuilder perf, long startNanos)
throws IOException {
long copyLength;
src = new DigestInputStream(src, E_TAG_PROVIDER.get());
if (datastreamEnabled && !(replication != null &&
replication.getReplicationType() == EC) &&
srcKeyLen > datastreamMinLength) {
perf.appendStreamMode();
copyLength = ObjectEndpointStreaming
.copyKeyWithStream(volume.getBucket(destBucket), destKey, srcKeyLen,
chunkSize, replication, metadata, (DigestInputStream) src, perf, startNanos);
chunkSize, replication, metadata, src, perf, startNanos);
} else {
try (OzoneOutputStream dest = getClientProtocol()
.createKey(volume.getName(), destBucket, destKey, srcKeyLen,
Expand All @@ -1145,9 +1157,7 @@ void copy(OzoneVolume volume, InputStream src, long srcKeyLen,
getMetrics().updateCopyKeyMetadataStats(startNanos);
perf.appendMetaLatencyNanos(metadataLatencyNs);
copyLength = IOUtils.copyLarge(src, dest);
String eTag = DatatypeConverter.printHexBinary(
((DigestInputStream) src).getMessageDigest().digest())
.toLowerCase();
String eTag = DatatypeConverter.printHexBinary(src.getMessageDigest().digest()).toLowerCase();
dest.getMetadata().put(ETAG, eTag);
}
}
Expand All @@ -1166,6 +1176,7 @@ private CopyObjectResponse copyObject(OzoneVolume volume,

String sourceBucket = result.getLeft();
String sourceKey = result.getRight();
DigestInputStream sourceDigestInputStream = null;
try {
OzoneKeyDetails sourceKeyDetails = getClientProtocol().getKeyDetails(
volume.getName(), sourceBucket, sourceKey);
Expand Down Expand Up @@ -1195,11 +1206,11 @@ private CopyObjectResponse copyObject(OzoneVolume volume,
}
}
long sourceKeyLen = sourceKeyDetails.getDataSize();

try (OzoneInputStream src = getClientProtocol().getKey(volume.getName(),
sourceBucket, sourceKey)) {
getMetrics().updateCopyKeyMetadataStats(startNanos);
copy(volume, src, sourceKeyLen, destkey, destBucket, replicationConfig,
sourceDigestInputStream = new DigestInputStream(src, getMessageDigestInstance());
copy(volume, sourceDigestInputStream, sourceKeyLen, destkey, destBucket, replicationConfig,
sourceKeyDetails.getMetadata(), perf, startNanos);
}

Expand All @@ -1221,6 +1232,12 @@ private CopyObjectResponse copyObject(OzoneVolume volume,
destBucket + "/" + destkey, ex);
}
throw ex;
} finally {
// Reset the thread-local message digest instance in case of exception
// and MessageDigest#digest is never called
if (sourceDigestInputStream != null) {
sourceDigestInputStream.getMessageDigest().reset();
}
}
}

Expand Down Expand Up @@ -1321,4 +1338,9 @@ private String wrapInQuotes(String value) {
return "\"" + value + "\"";
}

@VisibleForTesting
public MessageDigest getMessageDigestInstance() {
return E_TAG_PROVIDER.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.MessageDigest;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.commons.io.IOUtils;
Expand All @@ -46,6 +48,7 @@
import org.apache.http.HttpStatus;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER;
Expand All @@ -57,10 +60,13 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -88,7 +94,7 @@ public void setup() throws IOException {
clientStub.getObjectStore().createS3Bucket(destBucket);

// Create PutObject and setClient to OzoneClientStub
objectEndpoint = new ObjectEndpoint();
objectEndpoint = spy(new ObjectEndpoint());
objectEndpoint.setClient(clientStub);
objectEndpoint.setOzoneConfiguration(new OzoneConfiguration());
}
Expand Down Expand Up @@ -226,6 +232,31 @@ public void testPutObjectWithSignedChunks() throws IOException, OS3Exception {
assertTrue(StringUtils.isNotEmpty(keyDetails.getMetadata().get(OzoneConsts.ETAG)));
}

@Test
public void testPutObjectMessageDigestResetDuringException() throws OS3Exception {
MessageDigest messageDigest = mock(MessageDigest.class);
try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
// For example, EOFException during put-object due to client cancelling the operation before it completes
mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), any(OutputStream.class)))
.thenThrow(IOException.class);
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);

HttpHeaders headers = mock(HttpHeaders.class);
ByteArrayInputStream body =
new ByteArrayInputStream(CONTENT.getBytes(UTF_8));
objectEndpoint.setHeaders(headers);
try {
objectEndpoint.put(bucketName, keyName, CONTENT
.length(), 1, null, body);
fail("Should throw IOException");
} catch (IOException ignored) {
// Verify that the message digest is reset so that the instance can be reused for the
// next request in the same thread
verify(messageDigest, times(1)).reset();
}
}
}

@Test
public void testCopyObject() throws IOException, OS3Exception {
// Put object in to source bucket
Expand Down Expand Up @@ -314,6 +345,53 @@ public void testCopyObject() throws IOException, OS3Exception {
assertThat(e.getCode()).contains("NoSuchBucket");
}

@Test
public void testCopyObjectMessageDigestResetDuringException() throws IOException, OS3Exception {
// Put object in to source bucket
HttpHeaders headers = mock(HttpHeaders.class);
ByteArrayInputStream body =
new ByteArrayInputStream(CONTENT.getBytes(UTF_8));
objectEndpoint.setHeaders(headers);
keyName = "sourceKey";

Response response = objectEndpoint.put(bucketName, keyName,
CONTENT.length(), 1, null, body);

OzoneInputStream ozoneInputStream = clientStub.getObjectStore()
.getS3Bucket(bucketName)
.readKey(keyName);

String keyContent = IOUtils.toString(ozoneInputStream, UTF_8);
OzoneKeyDetails keyDetails = clientStub.getObjectStore().getS3Bucket(bucketName).getKey(keyName);

assertEquals(200, response.getStatus());
assertEquals(CONTENT, keyContent);
assertNotNull(keyDetails.getMetadata());
assertTrue(StringUtils.isNotEmpty(keyDetails.getMetadata().get(OzoneConsts.ETAG)));

MessageDigest messageDigest = mock(MessageDigest.class);
try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
// Add the mocked methods only during the copy request
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);
mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), any(OutputStream.class)))
.thenThrow(IOException.class);

// Add copy header, and then call put
when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
bucketName + "/" + urlEncode(keyName));

try {
objectEndpoint.put(destBucket, destkey, CONTENT.length(), 1,
null, body);
fail("Should throw IOException");
} catch (IOException ignored) {
// Verify that the message digest is reset so that the instance can be reused for the
// next request in the same thread
verify(messageDigest, times(1)).reset();
}
}
}

@Test
public void testInvalidStorageType() throws IOException {
HttpHeaders headers = mock(HttpHeaders.class);
Expand Down
Loading

0 comments on commit c6c611f

Please sign in to comment.