Skip to content

Commit

Permalink
HDDS-10832. Client should switch to streaming based on OpenKeySession…
Browse files Browse the repository at this point in the history
… replication (apache#6683)
  • Loading branch information
adoroszlai authored May 22, 2024
1 parent 8a23991 commit c1bcdea
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -1959,11 +1960,16 @@ public OzoneOutputStream createMultipartKey(
long size, int partNumber, String uploadID) throws IOException {
final OpenKeySession openKey = newMultipartOpenKey(
volumeName, bucketName, keyName, size, partNumber, uploadID, false);
return createMultipartOutputStream(openKey, uploadID, partNumber);
}

private OzoneOutputStream createMultipartOutputStream(
OpenKeySession openKey, String uploadID, int partNumber
) throws IOException {
KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
.setMultipartNumber(partNumber)
.setMultipartUploadID(uploadID)
.setIsMultipartKey(true)
.setAtomicKeyCreation(isS3GRequest.get())
.build();
return createOutputStream(openKey, keyOutputStream);
}
Expand All @@ -1979,29 +1985,25 @@ public OzoneDataStreamOutput createMultipartStreamKey(
throws IOException {
final OpenKeySession openKey = newMultipartOpenKey(
volumeName, bucketName, keyName, size, partNumber, uploadID, true);
// Amazon S3 never adds partial objects, So for S3 requests we need to
// set atomicKeyCreation to true
// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
KeyDataStreamOutput keyOutputStream =
new KeyDataStreamOutput.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient)
.setReplicationConfig(openKey.getKeyInfo().getReplicationConfig())
.setMultipartNumber(partNumber)
.setMultipartUploadID(uploadID)
.setIsMultipartKey(true)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(clientConfig)
.setAtomicKeyCreation(isS3GRequest.get())
.build();
keyOutputStream
.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
final OzoneOutputStream out = createSecureOutputStream(
openKey, keyOutputStream, null);
return new OzoneDataStreamOutput(out != null ? out : keyOutputStream);
final ByteBufferStreamOutput out;
ReplicationConfig replicationConfig = openKey.getKeyInfo().getReplicationConfig();
if (replicationConfig.getReplicationType() == HddsProtos.ReplicationType.RATIS) {
KeyDataStreamOutput keyOutputStream = newKeyOutputStreamBuilder()
.setHandler(openKey)
.setReplicationConfig(replicationConfig)
.setMultipartNumber(partNumber)
.setMultipartUploadID(uploadID)
.setIsMultipartKey(true)
.build();
keyOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
final OzoneOutputStream secureOut = createSecureOutputStream(openKey, keyOutputStream, null);
out = secureOut != null ? secureOut : keyOutputStream;
} else {
out = createMultipartOutputStream(openKey, uploadID, partNumber);
}
return new OzoneDataStreamOutput(out);
}

@Override
Expand Down Expand Up @@ -2403,25 +2405,33 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey)
throws IOException {
final ReplicationConfig replicationConfig
= openKey.getKeyInfo().getReplicationConfig();
final ByteBufferStreamOutput out;
if (replicationConfig.getReplicationType() == HddsProtos.ReplicationType.RATIS) {
KeyDataStreamOutput keyOutputStream = newKeyOutputStreamBuilder()
.setHandler(openKey)
.setReplicationConfig(replicationConfig)
.build();
keyOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
final OzoneOutputStream secureOut = createSecureOutputStream(openKey, keyOutputStream, null);
out = secureOut != null ? secureOut : keyOutputStream;
} else {
out = createOutputStream(openKey);
}
return new OzoneDataStreamOutput(out);
}

private KeyDataStreamOutput.Builder newKeyOutputStreamBuilder() {
// Amazon S3 never adds partial objects, So for S3 requests we need to
// set atomicKeyCreation to true
// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
KeyDataStreamOutput keyOutputStream =
new KeyDataStreamOutput.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient)
.setReplicationConfig(replicationConfig)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(clientConfig)
.setAtomicKeyCreation(isS3GRequest.get())
.build();
keyOutputStream
.addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
final OzoneOutputStream out = createSecureOutputStream(
openKey, keyOutputStream, null);
return new OzoneDataStreamOutput(out != null ? out : keyOutputStream);
return new KeyDataStreamOutput.Builder()
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(clientConfig)
.setAtomicKeyCreation(isS3GRequest.get());
}

private OzoneOutputStream createOutputStream(OpenKeySession openKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -61,6 +62,8 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
Expand Down Expand Up @@ -170,12 +173,13 @@ void testGetKeyAndFileWithNetworkTopology() throws IOException {
}
}

@Test
public void testMultiPartUploadWithStream()
@ParameterizedTest
@MethodSource("replicationConfigs")
void testMultiPartUploadWithStream(ReplicationConfig replicationConfig)
throws IOException, NoSuchAlgorithmException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
String bucketName = replicationConfig.getReplicationType().name().toLowerCase(Locale.ROOT) + "-bucket";
String keyName = replicationConfig.getReplication();

byte[] sampleData = new byte[1024 * 8];

Expand All @@ -186,11 +190,6 @@ public void testMultiPartUploadWithStream()
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);

ReplicationConfig replicationConfig =
ReplicationConfig.fromTypeAndFactor(
ReplicationType.RATIS,
THREE);

OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
replicationConfig);

Expand All @@ -210,7 +209,7 @@ public void testMultiPartUploadWithStream()
OzoneMultipartUploadPartListParts parts =
bucket.listParts(keyName, uploadID, 0, 1);

assertEquals(parts.getPartInfoList().size(), 1);
assertEquals(1, parts.getPartInfoList().size());

OzoneMultipartUploadPartListParts.PartInfo partInfo =
parts.getPartInfoList().get(0);
Expand Down

0 comments on commit c1bcdea

Please sign in to comment.