Skip to content

Commit

Permalink
HDDS-11483. Make s3g object get and put operation buffer configurable (
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenSammi authored Oct 17, 2024
1 parent 515977a commit 85eb89b
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 13 deletions.
4 changes: 2 additions & 2 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3476,9 +3476,9 @@
<property>
<name>ozone.s3g.client.buffer.size</name>
<tag>OZONE, S3GATEWAY</tag>
<value>4KB</value>
<value>4MB</value>
<description>
The size of the buffer which is for read block. (4KB by default).
The size of the buffer which is for read block. (4MB by default).
</description>
</property>
<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public final class S3GatewayConfigKeys {
public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_KEY =
"ozone.s3g.client.buffer.size";
public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT =
"4KB";
"4MB";

// S3G kerberos, principal config
public static final String OZONE_S3G_KERBEROS_KEYTAB_FILE_KEY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public Response put(
long metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
perf.appendMetaLatencyNanos(metadataLatencyNs);
putLength = IOUtils.copyLarge(digestInputStream, output);
putLength = IOUtils.copy(digestInputStream, output, getIOBufferSize(length));
eTag = DatatypeConverter.printHexBinary(
digestInputStream.getMessageDigest().digest())
.toLowerCase();
Expand Down Expand Up @@ -443,7 +443,7 @@ public Response get(
if (rangeHeaderVal == null || rangeHeader.isReadFull()) {
StreamingOutput output = dest -> {
try (OzoneInputStream key = keyDetails.getContent()) {
long readLength = IOUtils.copyLarge(key, dest);
long readLength = IOUtils.copy(key, dest, getIOBufferSize(keyDetails.getDataSize()));
getMetrics().incGetKeySuccessLength(readLength);
perf.appendSizeBytes(readLength);
}
Expand All @@ -467,7 +467,7 @@ public Response get(
try (OzoneInputStream ozoneInputStream = keyDetails.getContent()) {
ozoneInputStream.seek(startOffset);
long readLength = IOUtils.copyLarge(ozoneInputStream, dest, 0,
copyLength, new byte[bufferSize]);
copyLength, new byte[getIOBufferSize(copyLength)]);
getMetrics().incGetKeySuccessLength(readLength);
perf.appendSizeBytes(readLength);
}
Expand Down Expand Up @@ -997,7 +997,7 @@ private Response createMultipartKey(OzoneVolume volume, String bucket,
metadataLatencyNs =
getMetrics().updateCopyKeyMetadataStats(startNanos);
copyLength = IOUtils.copyLarge(
sourceObject, ozoneOutputStream, 0, length);
sourceObject, ozoneOutputStream, 0, length, new byte[getIOBufferSize(length)]);
ozoneOutputStream.getMetadata()
.putAll(sourceKeyDetails.getMetadata());
outputStream = ozoneOutputStream;
Expand All @@ -1008,7 +1008,7 @@ private Response createMultipartKey(OzoneVolume volume, String bucket,
partNumber, uploadID)) {
metadataLatencyNs =
getMetrics().updateCopyKeyMetadataStats(startNanos);
copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream);
copyLength = IOUtils.copy(sourceObject, ozoneOutputStream, getIOBufferSize(length));
ozoneOutputStream.getMetadata()
.putAll(sourceKeyDetails.getMetadata());
outputStream = ozoneOutputStream;
Expand All @@ -1024,7 +1024,7 @@ private Response createMultipartKey(OzoneVolume volume, String bucket,
partNumber, uploadID)) {
metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
putLength = IOUtils.copyLarge(digestInputStream, ozoneOutputStream);
putLength = IOUtils.copy(digestInputStream, ozoneOutputStream, getIOBufferSize(length));
byte[] digest = digestInputStream.getMessageDigest().digest();
ozoneOutputStream.getMetadata()
.put(ETAG, DatatypeConverter.printHexBinary(digest).toLowerCase());
Expand Down Expand Up @@ -1178,7 +1178,7 @@ void copy(OzoneVolume volume, DigestInputStream src, long srcKeyLen,
long metadataLatencyNs =
getMetrics().updateCopyKeyMetadataStats(startNanos);
perf.appendMetaLatencyNanos(metadataLatencyNs);
copyLength = IOUtils.copyLarge(src, dest);
copyLength = IOUtils.copy(src, dest, getIOBufferSize(srcKeyLen));
String eTag = DatatypeConverter.printHexBinary(src.getMessageDigest().digest()).toLowerCase();
dest.getMetadata().put(ETAG, eTag);
}
Expand Down Expand Up @@ -1408,4 +1408,18 @@ private String extractPartsCount(String eTag) {
}
return null;
}

private int getIOBufferSize(long fileLength) {
if (bufferSize == 0) {
// this is mainly for unit tests as init() will not be called in the unit tests
LOG.warn("buffer size is set to {}", IOUtils.DEFAULT_BUFFER_SIZE);
bufferSize = IOUtils.DEFAULT_BUFFER_SIZE;
}
if (fileLength == 0) {
// for empty file
return bufferSize;
} else {
return fileLength < bufferSize ? (int) fileLength : bufferSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -79,6 +80,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
Expand Down Expand Up @@ -368,7 +370,7 @@ public void testPutObjectMessageDigestResetDuringException() throws OS3Exception
MessageDigest messageDigest = mock(MessageDigest.class);
try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
// For example, EOFException during put-object due to client cancelling the operation before it completes
mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), any(OutputStream.class)))
mocked.when(() -> IOUtils.copy(any(InputStream.class), any(OutputStream.class), anyInt()))
.thenThrow(IOException.class);
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);

Expand Down Expand Up @@ -553,7 +555,7 @@ public void testCopyObjectMessageDigestResetDuringException() throws IOException
try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
// Add the mocked methods only during the copy request
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);
mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), any(OutputStream.class)))
mocked.when(() -> IOUtils.copy(any(InputStream.class), any(OutputStream.class), anyInt()))
.thenThrow(IOException.class);

// Add copy header, and then call put
Expand Down Expand Up @@ -731,4 +733,17 @@ void testDirectoryCreationOverFile() throws IOException, OS3Exception {
assertEquals(S3ErrorTable.NO_OVERWRITE.getCode(), exception.getCode());
assertEquals(S3ErrorTable.NO_OVERWRITE.getHttpCode(), exception.getHttpCode());
}

@Test
public void testPutEmptyObject() throws IOException, OS3Exception {
HttpHeaders headersWithTags = Mockito.mock(HttpHeaders.class);
String emptyString = "";
ByteArrayInputStream body = new ByteArrayInputStream(emptyString.getBytes(UTF_8));
objectEndpoint.setHeaders(headersWithTags);

Response putResponse = objectEndpoint.put(BUCKET_NAME, KEY_NAME, emptyString.length(), 1, null, body);
assertEquals(200, putResponse.getStatus());
OzoneKeyDetails keyDetails = clientStub.getObjectStore().getS3Bucket(BUCKET_NAME).getKey(KEY_NAME);
assertEquals(0, keyDetails.getDataSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -234,7 +235,7 @@ public void testPartUploadMessageDigestResetDuringException() throws IOException
try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
// Add the mocked methods only during the copy request
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);
mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), any(OutputStream.class)))
mocked.when(() -> IOUtils.copy(any(InputStream.class), any(OutputStream.class), anyInt()))
.thenThrow(IOException.class);

String content = "Multipart Upload";
Expand Down

0 comments on commit 85eb89b

Please sign in to comment.