diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index dff656383991..222823c454a7 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.client.ClientTrustManager; +import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; import org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -1834,11 +1835,16 @@ public OzoneOutputStream createMultipartKey( long size, int partNumber, String uploadID) throws IOException { final OpenKeySession openKey = newMultipartOpenKey( volumeName, bucketName, keyName, size, partNumber, uploadID, false); + return createMultipartOutputStream(openKey, uploadID, partNumber); + } + + private OzoneOutputStream createMultipartOutputStream( + OpenKeySession openKey, String uploadID, int partNumber + ) throws IOException { KeyOutputStream keyOutputStream = createKeyOutputStream(openKey) .setMultipartNumber(partNumber) .setMultipartUploadID(uploadID) .setIsMultipartKey(true) - .setAtomicKeyCreation(isS3GRequest.get()) .build(); return createOutputStream(openKey, keyOutputStream); } @@ -1854,29 +1860,25 @@ public OzoneDataStreamOutput createMultipartStreamKey( throws IOException { final OpenKeySession openKey = newMultipartOpenKey( volumeName, bucketName, keyName, size, partNumber, uploadID, true); - // Amazon S3 never adds partial objects, So for S3 requests we need to - // set atomicKeyCreation to true - // refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html - KeyDataStreamOutput keyOutputStream = - new KeyDataStreamOutput.Builder() - .setHandler(openKey) - .setXceiverClientManager(xceiverClientManager) - .setOmClient(ozoneManagerClient) - .setReplicationConfig(openKey.getKeyInfo().getReplicationConfig()) - .setMultipartNumber(partNumber) - .setMultipartUploadID(uploadID) - .setIsMultipartKey(true) - .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) - .setConfig(clientConfig) - .setAtomicKeyCreation(isS3GRequest.get()) - .build(); - keyOutputStream - .addPreallocateBlocks( - openKey.getKeyInfo().getLatestVersionLocations(), - openKey.getOpenVersion()); - final OzoneOutputStream out = createSecureOutputStream( - openKey, keyOutputStream, null); - return new OzoneDataStreamOutput(out != null ? out : keyOutputStream); + final ByteBufferStreamOutput out; + ReplicationConfig replicationConfig = openKey.getKeyInfo().getReplicationConfig(); + if (replicationConfig.getReplicationType() == HddsProtos.ReplicationType.RATIS) { + KeyDataStreamOutput keyOutputStream = newKeyOutputStreamBuilder() + .setHandler(openKey) + .setReplicationConfig(replicationConfig) + .setMultipartNumber(partNumber) + .setMultipartUploadID(uploadID) + .setIsMultipartKey(true) + .build(); + keyOutputStream.addPreallocateBlocks( + openKey.getKeyInfo().getLatestVersionLocations(), + openKey.getOpenVersion()); + final OzoneOutputStream secureOut = createSecureOutputStream(openKey, keyOutputStream, null); + out = secureOut != null ? secureOut : keyOutputStream; + } else { + out = createMultipartOutputStream(openKey, uploadID, partNumber); + } + return new OzoneDataStreamOutput(out); } @Override @@ -2270,25 +2272,33 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey) throws IOException { final ReplicationConfig replicationConfig = openKey.getKeyInfo().getReplicationConfig(); + final ByteBufferStreamOutput out; + if (replicationConfig.getReplicationType() == HddsProtos.ReplicationType.RATIS) { + KeyDataStreamOutput keyOutputStream = newKeyOutputStreamBuilder() + .setHandler(openKey) + .setReplicationConfig(replicationConfig) + .build(); + keyOutputStream.addPreallocateBlocks( + openKey.getKeyInfo().getLatestVersionLocations(), + openKey.getOpenVersion()); + final OzoneOutputStream secureOut = createSecureOutputStream(openKey, keyOutputStream, null); + out = secureOut != null ? secureOut : keyOutputStream; + } else { + out = createOutputStream(openKey); + } + return new OzoneDataStreamOutput(out); + } + + private KeyDataStreamOutput.Builder newKeyOutputStreamBuilder() { // Amazon S3 never adds partial objects, So for S3 requests we need to // set atomicKeyCreation to true // refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html - KeyDataStreamOutput keyOutputStream = - new KeyDataStreamOutput.Builder() - .setHandler(openKey) - .setXceiverClientManager(xceiverClientManager) - .setOmClient(ozoneManagerClient) - .setReplicationConfig(replicationConfig) - .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) - .setConfig(clientConfig) - .setAtomicKeyCreation(isS3GRequest.get()) - .build(); - keyOutputStream - .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(), - openKey.getOpenVersion()); - final OzoneOutputStream out = createSecureOutputStream( - openKey, keyOutputStream, null); - return new OzoneDataStreamOutput(out != null ? out : keyOutputStream); + return new KeyDataStreamOutput.Builder() + .setXceiverClientManager(xceiverClientManager) + .setOmClient(ozoneManagerClient) + .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) + .setConfig(clientConfig) + .setAtomicKeyCreation(isS3GRequest.get()); } private OzoneOutputStream createOutputStream(OpenKeySession openKey) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java index c3bbd793dc1c..3a8d117bc6d5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java @@ -28,6 +28,7 @@ import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.HashMap; +import java.util.Locale; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; @@ -65,6 +66,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE; @@ -183,12 +186,13 @@ public void testGetKeyAndFileWithNetworkTopology() throws IOException { } } - @Test - public void testMultiPartUploadWithStream() + @ParameterizedTest + @MethodSource("replicationConfigs") + void testMultiPartUploadWithStream(ReplicationConfig replicationConfig) throws IOException, NoSuchAlgorithmException { String volumeName = UUID.randomUUID().toString(); - String bucketName = UUID.randomUUID().toString(); - String keyName = UUID.randomUUID().toString(); + String bucketName = replicationConfig.getReplicationType().name().toLowerCase(Locale.ROOT) + "-bucket"; + String keyName = replicationConfig.getReplication(); byte[] sampleData = new byte[1024 * 8]; @@ -199,11 +203,6 @@ public void testMultiPartUploadWithStream() volume.createBucket(bucketName); OzoneBucket bucket = volume.getBucket(bucketName); - ReplicationConfig replicationConfig = - ReplicationConfig.fromTypeAndFactor( - ReplicationType.RATIS, - THREE); - OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName, replicationConfig); @@ -226,7 +225,7 @@ public void testMultiPartUploadWithStream() OzoneMultipartUploadPartListParts parts = bucket.listParts(keyName, uploadID, 0, 1); - Assert.assertEquals(parts.getPartInfoList().size(), 1); + Assert.assertEquals(1, parts.getPartInfoList().size()); OzoneMultipartUploadPartListParts.PartInfo partInfo = parts.getPartInfoList().get(0);