Skip to content

Commit

Permalink
apacheGH-38330: [C++][Azure] Use properties for input stream metadata (
Browse files Browse the repository at this point in the history
…apache#38524)

### Rationale for this change

We use user defined metadata for input stream metadata for now. But we should use properties returned from Azure like other remove filesystem implementations such as S3 and GCS.

### What changes are included in this PR?

Convert `Azure::Storage::Blobs::Models::BlobProperties` to `KeyValueMetadata`. The following values aren't supported yet:

* `BlobProperties::ObjectReplicationSourceProperties`
* `BlobProperties::Metadata`

If they need, we will add support for them as a follow-up task.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* Closes: apache#38330

Lead-authored-by: Sutou Kouhei <kou@clear-code.com>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
2 people authored and loicalleyne committed Nov 13, 2023
1 parent 650f822 commit de08076
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 16 deletions.
150 changes: 143 additions & 7 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "arrow/filesystem/util_internal.h"
#include "arrow/result.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/formatting.h"
#include "arrow/util/future.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
Expand Down Expand Up @@ -150,13 +151,148 @@ Status ErrorToStatus(const std::string& prefix,
return Status::IOError(prefix, " Azure Error: ", exception.what());
}

template <typename ObjectResult>
std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult& result) {
auto md = std::make_shared<KeyValueMetadata>();
for (auto prop : result) {
md->Append(prop.first, prop.second);
template <typename ArrowType>
std::string FormatValue(typename TypeTraits<ArrowType>::CType value) {
struct StringAppender {
std::string string;
Status operator()(std::string_view view) {
string.append(view.data(), view.size());
return Status::OK();
}
} appender;
arrow::internal::StringFormatter<ArrowType> formatter;
ARROW_UNUSED(formatter(value, appender));
return appender.string;
}

std::shared_ptr<const KeyValueMetadata> PropertiesToMetadata(
const Azure::Storage::Blobs::Models::BlobProperties& properties) {
auto metadata = std::make_shared<KeyValueMetadata>();
// Not supported yet:
// * properties.ObjectReplicationSourceProperties
// * properties.Metadata
//
// They may have the same key defined in the following
// metadata->Append() list. If we have duplicated key in metadata,
// the first value may be only used by users because
// KeyValueMetadata::Get() returns the first found value. Note that
// users can use all values by using KeyValueMetadata::keys() and
// KeyValueMetadata::values().
if (properties.ImmutabilityPolicy.HasValue()) {
metadata->Append("Immutability-Policy-Expires-On",
properties.ImmutabilityPolicy.Value().ExpiresOn.ToString());
metadata->Append("Immutability-Policy-Mode",
properties.ImmutabilityPolicy.Value().PolicyMode.ToString());
}
metadata->Append("Content-Type", properties.HttpHeaders.ContentType);
metadata->Append("Content-Encoding", properties.HttpHeaders.ContentEncoding);
metadata->Append("Content-Language", properties.HttpHeaders.ContentLanguage);
const auto& content_hash = properties.HttpHeaders.ContentHash.Value;
metadata->Append("Content-Hash", HexEncode(content_hash.data(), content_hash.size()));
metadata->Append("Content-Disposition", properties.HttpHeaders.ContentDisposition);
metadata->Append("Cache-Control", properties.HttpHeaders.CacheControl);
metadata->Append("Last-Modified", properties.LastModified.ToString());
metadata->Append("Created-On", properties.CreatedOn.ToString());
if (properties.ObjectReplicationDestinationPolicyId.HasValue()) {
metadata->Append("Object-Replication-Destination-Policy-Id",
properties.ObjectReplicationDestinationPolicyId.Value());
}
metadata->Append("Blob-Type", properties.BlobType.ToString());
if (properties.CopyCompletedOn.HasValue()) {
metadata->Append("Copy-Completed-On", properties.CopyCompletedOn.Value().ToString());
}
if (properties.CopyStatusDescription.HasValue()) {
metadata->Append("Copy-Status-Description", properties.CopyStatusDescription.Value());
}
if (properties.CopyId.HasValue()) {
metadata->Append("Copy-Id", properties.CopyId.Value());
}
if (properties.CopyProgress.HasValue()) {
metadata->Append("Copy-Progress", properties.CopyProgress.Value());
}
if (properties.CopySource.HasValue()) {
metadata->Append("Copy-Source", properties.CopySource.Value());
}
if (properties.CopyStatus.HasValue()) {
metadata->Append("Copy-Status", properties.CopyStatus.Value().ToString());
}
if (properties.IsIncrementalCopy.HasValue()) {
metadata->Append("Is-Incremental-Copy",
FormatValue<BooleanType>(properties.IsIncrementalCopy.Value()));
}
if (properties.IncrementalCopyDestinationSnapshot.HasValue()) {
metadata->Append("Incremental-Copy-Destination-Snapshot",
properties.IncrementalCopyDestinationSnapshot.Value());
}
if (properties.LeaseDuration.HasValue()) {
metadata->Append("Lease-Duration", properties.LeaseDuration.Value().ToString());
}
if (properties.LeaseState.HasValue()) {
metadata->Append("Lease-State", properties.LeaseState.Value().ToString());
}
if (properties.LeaseStatus.HasValue()) {
metadata->Append("Lease-Status", properties.LeaseStatus.Value().ToString());
}
metadata->Append("Content-Length", FormatValue<Int64Type>(properties.BlobSize));
if (properties.ETag.HasValue()) {
metadata->Append("ETag", properties.ETag.ToString());
}
if (properties.SequenceNumber.HasValue()) {
metadata->Append("Sequence-Number",
FormatValue<Int64Type>(properties.SequenceNumber.Value()));
}
if (properties.CommittedBlockCount.HasValue()) {
metadata->Append("Committed-Block-Count",
FormatValue<Int32Type>(properties.CommittedBlockCount.Value()));
}
metadata->Append("IsServerEncrypted",
FormatValue<BooleanType>(properties.IsServerEncrypted));
if (properties.EncryptionKeySha256.HasValue()) {
const auto& sha256 = properties.EncryptionKeySha256.Value();
metadata->Append("Encryption-Key-Sha-256", HexEncode(sha256.data(), sha256.size()));
}
if (properties.EncryptionScope.HasValue()) {
metadata->Append("Encryption-Scope", properties.EncryptionScope.Value());
}
if (properties.AccessTier.HasValue()) {
metadata->Append("Access-Tier", properties.AccessTier.Value().ToString());
}
if (properties.IsAccessTierInferred.HasValue()) {
metadata->Append("Is-Access-Tier-Inferred",
FormatValue<BooleanType>(properties.IsAccessTierInferred.Value()));
}
if (properties.ArchiveStatus.HasValue()) {
metadata->Append("Archive-Status", properties.ArchiveStatus.Value().ToString());
}
if (properties.AccessTierChangedOn.HasValue()) {
metadata->Append("Access-Tier-Changed-On",
properties.AccessTierChangedOn.Value().ToString());
}
if (properties.VersionId.HasValue()) {
metadata->Append("Version-Id", properties.VersionId.Value());
}
if (properties.IsCurrentVersion.HasValue()) {
metadata->Append("Is-Current-Version",
FormatValue<BooleanType>(properties.IsCurrentVersion.Value()));
}
if (properties.TagCount.HasValue()) {
metadata->Append("Tag-Count", FormatValue<Int32Type>(properties.TagCount.Value()));
}
if (properties.ExpiresOn.HasValue()) {
metadata->Append("Expires-On", properties.ExpiresOn.Value().ToString());
}
if (properties.IsSealed.HasValue()) {
metadata->Append("Is-Sealed", FormatValue<BooleanType>(properties.IsSealed.Value()));
}
if (properties.RehydratePriority.HasValue()) {
metadata->Append("Rehydrate-Priority",
properties.RehydratePriority.Value().ToString());
}
if (properties.LastAccessedOn.HasValue()) {
metadata->Append("Last-Accessed-On", properties.LastAccessedOn.Value().ToString());
}
return md;
metadata->Append("Has-Legal-Hold", FormatValue<BooleanType>(properties.HasLegalHold));
return metadata;
}

class ObjectInputFile final : public io::RandomAccessFile {
Expand All @@ -176,7 +312,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
try {
auto properties = blob_client_->GetProperties();
content_length_ = properties.Value.BlobSize;
metadata_ = GetObjectMetadata(properties.Value.Metadata);
metadata_ = PropertiesToMetadata(properties.Value);
return Status::OK();
} catch (const Azure::Storage::StorageException& exception) {
if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
Expand Down
67 changes: 58 additions & 9 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
#include "arrow/testing/util.h"
#include "arrow/util/io_util.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/string.h"
#include "arrow/util/value_parsing.h"

namespace arrow {
using internal::TemporaryDir;
Expand Down Expand Up @@ -296,21 +298,68 @@ TEST_F(TestAzureFileSystem, OpenInputStreamTrailingSlash) {
ASSERT_RAISES(IOError, fs_->OpenInputStream(PreexistingObjectPath() + '/'));
}

TEST_F(TestAzureFileSystem, OpenInputStreamReadMetadata) {
const std::string object_name = "OpenInputStreamMetadataTest/simple.txt";

service_client_->GetBlobContainerClient(PreexistingContainerName())
.GetBlobClient(PreexistingObjectName())
.SetMetadata(Azure::Storage::Metadata{{"key0", "value0"}});
namespace {
std::shared_ptr<const KeyValueMetadata> NormalizerKeyValueMetadata(
std::shared_ptr<const KeyValueMetadata> metadata) {
auto normalized = std::make_shared<KeyValueMetadata>();
for (int64_t i = 0; i < metadata->size(); ++i) {
auto key = metadata->key(i);
auto value = metadata->value(i);
if (key == "Content-Hash") {
std::vector<uint8_t> output;
output.reserve(value.size() / 2);
if (ParseHexValues(value, output.data()).ok()) {
// Valid value
value = std::string(value.size(), 'F');
}
} else if (key == "Last-Modified" || key == "Created-On" ||
key == "Access-Tier-Changed-On") {
auto parser = TimestampParser::MakeISO8601();
int64_t output;
if ((*parser)(value.data(), value.size(), TimeUnit::NANO, &output)) {
// Valid value
value = "2023-10-31T08:15:20Z";
}
} else if (key == "ETag") {
if (internal::StartsWith(value, "\"") && internal::EndsWith(value, "\"")) {
// Valid value
value = "\"ETagValue\"";
}
}
normalized->Append(key, value);
}
return normalized;
}
}; // namespace

TEST_F(TestAzureFileSystem, OpenInputStreamReadMetadata) {
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));

std::shared_ptr<const KeyValueMetadata> actual;
ASSERT_OK_AND_ASSIGN(actual, stream->ReadMetadata());
// TODO(GH-38330): This is asserting that the user defined metadata is returned but this
// is probably not the correct behaviour.
ASSERT_OK_AND_EQ("value0", actual->Get("key0"));
ASSERT_EQ(
"\n"
"-- metadata --\n"
"Content-Type: application/octet-stream\n"
"Content-Encoding: \n"
"Content-Language: \n"
"Content-Hash: FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF\n"
"Content-Disposition: \n"
"Cache-Control: \n"
"Last-Modified: 2023-10-31T08:15:20Z\n"
"Created-On: 2023-10-31T08:15:20Z\n"
"Blob-Type: BlockBlob\n"
"Lease-State: available\n"
"Lease-Status: unlocked\n"
"Content-Length: 447\n"
"ETag: \"ETagValue\"\n"
"IsServerEncrypted: true\n"
"Access-Tier: Hot\n"
"Is-Access-Tier-Inferred: true\n"
"Access-Tier-Changed-On: 2023-10-31T08:15:20Z\n"
"Has-Legal-Hold: false",
NormalizerKeyValueMetadata(actual)->ToString());
}

TEST_F(TestAzureFileSystem, OpenInputStreamClosed) {
Expand Down

0 comments on commit de08076

Please sign in to comment.