Skip to content

Commit

Permalink
HDDS-11691. Support object tags in ObjectEndpointStreaming#put (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
ivandika3 authored Dec 9, 2024
1 parent 9854591 commit f7fe30a
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.hadoop.ozone.s3.awssdk.v1;

import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Timeout;

import java.io.IOException;

/**
* Tests the AWS S3 SDK basic operations with OM Ratis enabled and Streaming Write Pipeline.
*/
@Timeout(300)
public class TestS3SDKV1WithRatisStreaming extends AbstractS3SDKV1Tests {

@BeforeAll
public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setBoolean(ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE,
false);
conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
conf.setBoolean(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
true);
conf.setBoolean(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED, true);
conf.setBoolean(OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED, true);
// Ensure that all writes use datastream
conf.set(OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD, "0MB");
startCluster(conf);
}

@AfterAll
public static void shutdown() throws IOException {
shutdownCluster();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ public Response put(
perf.appendStreamMode();
Pair<String, Long> keyWriteResult = ObjectEndpointStreaming
.put(bucket, keyPath, length, replicationConfig, chunkSize,
customMetadata, digestInputStream, perf);
customMetadata, tags, digestInputStream, perf);
eTag = keyWriteResult.getKey();
putLength = keyWriteResult.getValue();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@ public static Pair<String, Long> put(
OzoneBucket bucket, String keyPath,
long length, ReplicationConfig replicationConfig,
int chunkSize, Map<String, String> keyMetadata,
Map<String, String> tags,
DigestInputStream body, PerformanceStringBuilder perf)
throws IOException, OS3Exception {

try {
return putKeyWithStream(bucket, keyPath,
length, chunkSize, replicationConfig, keyMetadata, body, perf);
length, chunkSize, replicationConfig, keyMetadata, tags, body, perf);
} catch (IOException ex) {
LOG.error("Exception occurred in PutObject", ex);
if (ex instanceof OMException) {
Expand Down Expand Up @@ -97,13 +98,14 @@ public static Pair<String, Long> putKeyWithStream(
int bufferSize,
ReplicationConfig replicationConfig,
Map<String, String> keyMetadata,
Map<String, String> tags,
DigestInputStream body, PerformanceStringBuilder perf)
throws IOException {
long startNanos = Time.monotonicNowNanos();
long writeLen;
String eTag;
try (OzoneDataStreamOutput streamOutput = bucket.createStreamKey(keyPath,
length, replicationConfig, keyMetadata)) {
length, replicationConfig, keyMetadata, tags)) {
long metadataLatencyNs = METRICS.updatePutKeyMetadataStats(startNanos);
writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length);
eTag = DatatypeConverter.printHexBinary(body.getMessageDigest().digest())
Expand Down

0 comments on commit f7fe30a

Please sign in to comment.