Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11483. Make s3g object get and put operation buffer configurable #7233

Merged
merged 5 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3469,9 +3469,9 @@
<property>
<name>ozone.s3g.client.buffer.size</name>
<tag>OZONE, S3GATEWAY</tag>
<value>4KB</value>
<value>4MB</value>
<description>
The size of the buffer which is for read block. (4KB by default).
The size of the buffer which is for read block. (4MB by default).
</description>
</property>
<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public final class S3GatewayConfigKeys {
public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_KEY =
"ozone.s3g.client.buffer.size";
public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT =
"4KB";
"4MB";

// S3G kerberos, principal config
public static final String OZONE_S3G_KERBEROS_KEYTAB_FILE_KEY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public Response put(
long metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
perf.appendMetaLatencyNanos(metadataLatencyNs);
putLength = IOUtils.copyLarge(digestInputStream, output);
putLength = IOUtils.copy(digestInputStream, output, getIOBufferSize(length));
eTag = DatatypeConverter.printHexBinary(
digestInputStream.getMessageDigest().digest())
.toLowerCase();
Expand Down Expand Up @@ -443,7 +443,7 @@ public Response get(
if (rangeHeaderVal == null || rangeHeader.isReadFull()) {
StreamingOutput output = dest -> {
try (OzoneInputStream key = keyDetails.getContent()) {
long readLength = IOUtils.copyLarge(key, dest);
long readLength = IOUtils.copy(key, dest, getIOBufferSize(keyDetails.getDataSize()));
getMetrics().incGetKeySuccessLength(readLength);
perf.appendSizeBytes(readLength);
}
Expand All @@ -467,7 +467,7 @@ public Response get(
try (OzoneInputStream ozoneInputStream = keyDetails.getContent()) {
ozoneInputStream.seek(startOffset);
long readLength = IOUtils.copyLarge(ozoneInputStream, dest, 0,
copyLength, new byte[bufferSize]);
copyLength, new byte[getIOBufferSize(copyLength)]);
getMetrics().incGetKeySuccessLength(readLength);
perf.appendSizeBytes(readLength);
}
Expand Down Expand Up @@ -997,7 +997,7 @@ private Response createMultipartKey(OzoneVolume volume, String bucket,
metadataLatencyNs =
getMetrics().updateCopyKeyMetadataStats(startNanos);
copyLength = IOUtils.copyLarge(
sourceObject, ozoneOutputStream, 0, length);
sourceObject, ozoneOutputStream, 0, length, new byte[getIOBufferSize(length)]);
ozoneOutputStream.getMetadata()
.putAll(sourceKeyDetails.getMetadata());
outputStream = ozoneOutputStream;
Expand All @@ -1008,7 +1008,7 @@ private Response createMultipartKey(OzoneVolume volume, String bucket,
partNumber, uploadID)) {
metadataLatencyNs =
getMetrics().updateCopyKeyMetadataStats(startNanos);
copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream);
copyLength = IOUtils.copy(sourceObject, ozoneOutputStream, getIOBufferSize(length));
ozoneOutputStream.getMetadata()
.putAll(sourceKeyDetails.getMetadata());
outputStream = ozoneOutputStream;
Expand All @@ -1024,7 +1024,7 @@ private Response createMultipartKey(OzoneVolume volume, String bucket,
partNumber, uploadID)) {
metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
putLength = IOUtils.copyLarge(digestInputStream, ozoneOutputStream);
putLength = IOUtils.copy(digestInputStream, ozoneOutputStream, getIOBufferSize(length));
byte[] digest = digestInputStream.getMessageDigest().digest();
ozoneOutputStream.getMetadata()
.put(ETAG, DatatypeConverter.printHexBinary(digest).toLowerCase());
Expand Down Expand Up @@ -1178,7 +1178,7 @@ void copy(OzoneVolume volume, DigestInputStream src, long srcKeyLen,
long metadataLatencyNs =
getMetrics().updateCopyKeyMetadataStats(startNanos);
perf.appendMetaLatencyNanos(metadataLatencyNs);
copyLength = IOUtils.copyLarge(src, dest);
copyLength = IOUtils.copy(src, dest, getIOBufferSize(srcKeyLen));
String eTag = DatatypeConverter.printHexBinary(src.getMessageDigest().digest()).toLowerCase();
dest.getMetadata().put(ETAG, eTag);
}
Expand Down Expand Up @@ -1408,4 +1408,18 @@ private String extractPartsCount(String eTag) {
}
return null;
}

private int getIOBufferSize(long fileLength) {
if (bufferSize == 0) {
// this is mainly for unit tests as init() will not be called in the unit tests
LOG.warn("buffer size is set to {}", IOUtils.DEFAULT_BUFFER_SIZE);
kerneltime marked this conversation as resolved.
Show resolved Hide resolved
bufferSize = IOUtils.DEFAULT_BUFFER_SIZE;
}
if (fileLength == 0) {
// for empty file
return bufferSize;
} else {
return fileLength < bufferSize ? (int) fileLength : bufferSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -79,6 +80,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
Expand Down Expand Up @@ -368,7 +370,7 @@ public void testPutObjectMessageDigestResetDuringException() throws OS3Exception
MessageDigest messageDigest = mock(MessageDigest.class);
try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
// For example, EOFException during put-object due to client cancelling the operation before it completes
mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), any(OutputStream.class)))
mocked.when(() -> IOUtils.copy(any(InputStream.class), any(OutputStream.class), anyInt()))
.thenThrow(IOException.class);
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);

Expand Down Expand Up @@ -553,7 +555,7 @@ public void testCopyObjectMessageDigestResetDuringException() throws IOException
try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
// Add the mocked methods only during the copy request
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);
mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), any(OutputStream.class)))
mocked.when(() -> IOUtils.copy(any(InputStream.class), any(OutputStream.class), anyInt()))
.thenThrow(IOException.class);

// Add copy header, and then call put
Expand Down Expand Up @@ -731,4 +733,17 @@ void testDirectoryCreationOverFile() throws IOException, OS3Exception {
assertEquals(S3ErrorTable.NO_OVERWRITE.getCode(), exception.getCode());
assertEquals(S3ErrorTable.NO_OVERWRITE.getHttpCode(), exception.getHttpCode());
}

@Test
public void testPutEmptyObject() throws IOException, OS3Exception {
HttpHeaders headersWithTags = Mockito.mock(HttpHeaders.class);
String emptyString = "";
ByteArrayInputStream body = new ByteArrayInputStream(emptyString.getBytes(UTF_8));
objectEndpoint.setHeaders(headersWithTags);

Response putResponse = objectEndpoint.put(BUCKET_NAME, KEY_NAME, emptyString.length(), 1, null, body);
assertEquals(200, putResponse.getStatus());
OzoneKeyDetails keyDetails = clientStub.getObjectStore().getS3Bucket(BUCKET_NAME).getKey(KEY_NAME);
assertEquals(0, keyDetails.getDataSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -234,7 +235,7 @@ public void testPartUploadMessageDigestResetDuringException() throws IOException
try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
// Add the mocked methods only during the copy request
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);
mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), any(OutputStream.class)))
mocked.when(() -> IOUtils.copy(any(InputStream.class), any(OutputStream.class), anyInt()))
.thenThrow(IOException.class);

String content = "Multipart Upload";
Expand Down