Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10832. Client should switch to streaming based on OpenKeySession replication #6683

Merged
merged 1 commit into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -1905,11 +1906,16 @@ public OzoneOutputStream createMultipartKey(
long size, int partNumber, String uploadID) throws IOException {
final OpenKeySession openKey = newMultipartOpenKey(
volumeName, bucketName, keyName, size, partNumber, uploadID, false);
return createMultipartOutputStream(openKey, uploadID, partNumber);
}

private OzoneOutputStream createMultipartOutputStream(
OpenKeySession openKey, String uploadID, int partNumber
) throws IOException {
KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
.setMultipartNumber(partNumber)
.setMultipartUploadID(uploadID)
.setIsMultipartKey(true)
.setAtomicKeyCreation(isS3GRequest.get())
.build();
return createOutputStream(openKey, keyOutputStream);
}
Expand All @@ -1925,29 +1931,25 @@ public OzoneDataStreamOutput createMultipartStreamKey(
throws IOException {
final OpenKeySession openKey = newMultipartOpenKey(
volumeName, bucketName, keyName, size, partNumber, uploadID, true);
// Amazon S3 never adds partial objects, So for S3 requests we need to
// set atomicKeyCreation to true
// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
KeyDataStreamOutput keyOutputStream =
new KeyDataStreamOutput.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient)
.setReplicationConfig(openKey.getKeyInfo().getReplicationConfig())
.setMultipartNumber(partNumber)
.setMultipartUploadID(uploadID)
.setIsMultipartKey(true)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(clientConfig)
.setAtomicKeyCreation(isS3GRequest.get())
.build();
keyOutputStream
.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
final OzoneOutputStream out = createSecureOutputStream(
openKey, keyOutputStream, null);
return new OzoneDataStreamOutput(out != null ? out : keyOutputStream);
final ByteBufferStreamOutput out;
ReplicationConfig replicationConfig = openKey.getKeyInfo().getReplicationConfig();
if (replicationConfig.getReplicationType() == HddsProtos.ReplicationType.RATIS) {
KeyDataStreamOutput keyOutputStream = newKeyOutputStreamBuilder()
.setHandler(openKey)
.setReplicationConfig(replicationConfig)
.setMultipartNumber(partNumber)
.setMultipartUploadID(uploadID)
.setIsMultipartKey(true)
.build();
keyOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
final OzoneOutputStream secureOut = createSecureOutputStream(openKey, keyOutputStream, null);
out = secureOut != null ? secureOut : keyOutputStream;
} else {
out = createMultipartOutputStream(openKey, uploadID, partNumber);
}
return new OzoneDataStreamOutput(out);
}

@Override
Expand Down Expand Up @@ -2349,25 +2351,33 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey)
throws IOException {
final ReplicationConfig replicationConfig
= openKey.getKeyInfo().getReplicationConfig();
final ByteBufferStreamOutput out;
if (replicationConfig.getReplicationType() == HddsProtos.ReplicationType.RATIS) {
KeyDataStreamOutput keyOutputStream = newKeyOutputStreamBuilder()
.setHandler(openKey)
.setReplicationConfig(replicationConfig)
.build();
keyOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
final OzoneOutputStream secureOut = createSecureOutputStream(openKey, keyOutputStream, null);
out = secureOut != null ? secureOut : keyOutputStream;
} else {
out = createOutputStream(openKey);
}
return new OzoneDataStreamOutput(out);
}

private KeyDataStreamOutput.Builder newKeyOutputStreamBuilder() {
// Amazon S3 never adds partial objects, So for S3 requests we need to
// set atomicKeyCreation to true
// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
KeyDataStreamOutput keyOutputStream =
new KeyDataStreamOutput.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient)
.setReplicationConfig(replicationConfig)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(clientConfig)
.setAtomicKeyCreation(isS3GRequest.get())
.build();
keyOutputStream
.addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
final OzoneOutputStream out = createSecureOutputStream(
openKey, keyOutputStream, null);
return new OzoneDataStreamOutput(out != null ? out : keyOutputStream);
return new KeyDataStreamOutput.Builder()
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(clientConfig)
.setAtomicKeyCreation(isS3GRequest.get());
}

private OzoneOutputStream createOutputStream(OpenKeySession openKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -61,6 +62,8 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
Expand Down Expand Up @@ -170,12 +173,13 @@ void testGetKeyAndFileWithNetworkTopology() throws IOException {
}
}

@Test
public void testMultiPartUploadWithStream()
@ParameterizedTest
@MethodSource("replicationConfigs")
void testMultiPartUploadWithStream(ReplicationConfig replicationConfig)
throws IOException, NoSuchAlgorithmException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
String bucketName = replicationConfig.getReplicationType().name().toLowerCase(Locale.ROOT) + "-bucket";
String keyName = replicationConfig.getReplication();

byte[] sampleData = new byte[1024 * 8];

Expand All @@ -186,11 +190,6 @@ public void testMultiPartUploadWithStream()
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);

ReplicationConfig replicationConfig =
ReplicationConfig.fromTypeAndFactor(
ReplicationType.RATIS,
THREE);

OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
replicationConfig);

Expand All @@ -210,7 +209,7 @@ public void testMultiPartUploadWithStream()
OzoneMultipartUploadPartListParts parts =
bucket.listParts(keyName, uploadID, 0, 1);

assertEquals(parts.getPartInfoList().size(), 1);
assertEquals(1, parts.getPartInfoList().size());

OzoneMultipartUploadPartListParts.PartInfo partInfo =
parts.getPartInfoList().get(0);
Expand Down
Loading