diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/TestS3SDKV1WithRatisStreaming.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/TestS3SDKV1WithRatisStreaming.java new file mode 100644 index 00000000000..571d4c64908 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/TestS3SDKV1WithRatisStreaming.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.s3.awssdk.v1; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; + +/** + * Tests the AWS S3 SDK basic operations with OM Ratis enabled and Streaming Write Pipeline. + */ +@Timeout(300) +public class TestS3SDKV1WithRatisStreaming extends AbstractS3SDKV1Tests { + + @BeforeAll + public static void init() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setBoolean(ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, + false); + conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true); + conf.setBoolean(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, + true); + conf.setBoolean(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED, true); + conf.setBoolean(OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED, true); + // Ensure that all writes use datastream + conf.set(OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD, "0MB"); + startCluster(conf); + } + + @AfterAll + public static void shutdown() throws IOException { + shutdownCluster(); + } +} diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 0a928981613..9311fb7fa4b 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -323,7 +323,7 @@ public Response put( perf.appendStreamMode(); Pair keyWriteResult = ObjectEndpointStreaming .put(bucket, keyPath, length, replicationConfig, chunkSize, - customMetadata, digestInputStream, perf); + customMetadata, tags, digestInputStream, perf); eTag = keyWriteResult.getKey(); putLength = keyWriteResult.getValue(); } else { diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java index cb9499aa20d..f5d185fc76b 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java @@ -61,12 +61,13 @@ public static Pair put( OzoneBucket bucket, String keyPath, long length, ReplicationConfig replicationConfig, int chunkSize, Map keyMetadata, + Map tags, DigestInputStream body, PerformanceStringBuilder perf) throws IOException, OS3Exception { try { return putKeyWithStream(bucket, keyPath, - length, chunkSize, replicationConfig, keyMetadata, body, perf); + length, chunkSize, replicationConfig, keyMetadata, tags, body, perf); } catch (IOException ex) { LOG.error("Exception occurred in PutObject", ex); if (ex instanceof OMException) { @@ -97,13 +98,14 @@ public static Pair putKeyWithStream( int bufferSize, ReplicationConfig replicationConfig, Map keyMetadata, + Map tags, DigestInputStream body, PerformanceStringBuilder perf) throws IOException { long startNanos = Time.monotonicNowNanos(); long writeLen; String eTag; try (OzoneDataStreamOutput streamOutput = bucket.createStreamKey(keyPath, - length, replicationConfig, keyMetadata)) { + length, replicationConfig, keyMetadata, tags)) { long metadataLatencyNs = METRICS.updatePutKeyMetadataStats(startNanos); writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length); eTag = DatatypeConverter.printHexBinary(body.getMessageDigest().digest())