Skip to content

Commit

Permalink
apacheGH-40670: [C++][FS][Azure] Add support for reading user defined…
Browse files Browse the repository at this point in the history
… metadata
  • Loading branch information
kou committed Mar 19, 2024
1 parent 7d3f7b3 commit fbba12e
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 33 deletions.
42 changes: 28 additions & 14 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,6 @@ std::shared_ptr<const KeyValueMetadata> PropertiesToMetadata(
auto metadata = std::make_shared<KeyValueMetadata>();
// Not supported yet:
// * properties.ObjectReplicationSourceProperties
// * properties.Metadata
//
// They may have the same key defined in the following
// metadata->Append() list. If we have duplicated key in metadata,
Expand Down Expand Up @@ -669,16 +668,33 @@ std::shared_ptr<const KeyValueMetadata> PropertiesToMetadata(
metadata->Append("Last-Accessed-On", properties.LastAccessedOn.Value().ToString());
}
metadata->Append("Has-Legal-Hold", FormatValue<BooleanType>(properties.HasLegalHold));
for (const auto& [key, value] : properties.Metadata) {
metadata->Append(key, value);
}
return metadata;
}

Storage::Metadata ArrowMetadataToAzureMetadata(
const std::shared_ptr<const KeyValueMetadata>& arrow_metadata) {
Storage::Metadata azure_metadata;
for (auto key_value : arrow_metadata->sorted_pairs()) {
azure_metadata[key_value.first] = key_value.second;
void ArrowMetadataToCommitBlockListOptions(
const std::shared_ptr<const KeyValueMetadata>& arrow_metadata,
Blobs::CommitBlockListOptions& options) {
using ::arrow::internal::AsciiEqualsCaseInsensitive;
for (auto& [key, value] : arrow_metadata->sorted_pairs()) {
if (AsciiEqualsCaseInsensitive(key, "Content-Type")) {
options.HttpHeaders.ContentType = value;
} else if (AsciiEqualsCaseInsensitive(key, "Content-Encoding")) {
options.HttpHeaders.ContentEncoding = value;
} else if (AsciiEqualsCaseInsensitive(key, "Content-Language")) {
options.HttpHeaders.ContentLanguage = value;
} else if (AsciiEqualsCaseInsensitive(key, "Content-Hash")) {
// Ignore: auto-generated value
} else if (AsciiEqualsCaseInsensitive(key, "Content-Disposition")) {
options.HttpHeaders.ContentDisposition = value;
} else if (AsciiEqualsCaseInsensitive(key, "Cache-Control")) {
options.HttpHeaders.CacheControl = value;
} else {
options.Metadata[key] = value;
}
}
return azure_metadata;
}

class ObjectInputFile final : public io::RandomAccessFile {
Expand Down Expand Up @@ -864,9 +880,7 @@ Result<Blobs::Models::GetBlockListResult> GetBlockList(

Status CommitBlockList(std::shared_ptr<Storage::Blobs::BlockBlobClient> block_blob_client,
const std::vector<std::string>& block_ids,
const Storage::Metadata& metadata) {
Blobs::CommitBlockListOptions options;
options.Metadata = metadata;
const Blobs::CommitBlockListOptions& options) {
try {
// CommitBlockList puts all block_ids in the latest element. That means in the case of
// overlapping block_ids the newly staged block ids will always replace the
Expand All @@ -891,9 +905,9 @@ class ObjectAppendStream final : public io::OutputStream {
io_context_(io_context),
location_(location) {
if (metadata && metadata->size() != 0) {
metadata_ = ArrowMetadataToAzureMetadata(metadata);
ArrowMetadataToCommitBlockListOptions(metadata, commit_block_list_options_);
} else if (options.default_metadata && options.default_metadata->size() != 0) {
metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata);
ArrowMetadataToCommitBlockListOptions(options.default_metadata, commit_block_list_options_);
}
}

Expand Down Expand Up @@ -996,7 +1010,7 @@ class ObjectAppendStream final : public io::OutputStream {
// flush. This also avoids some unhandled errors when flushing in the destructor.
return Status::OK();
}
return CommitBlockList(block_blob_client_, block_ids_, metadata_);
return CommitBlockList(block_blob_client_, block_ids_, commit_block_list_options_);
}

private:
Expand Down Expand Up @@ -1053,7 +1067,7 @@ class ObjectAppendStream final : public io::OutputStream {
bool initialised_ = false;
int64_t pos_ = 0;
std::vector<std::string> block_ids_;
Storage::Metadata metadata_;
Blobs::CommitBlockListOptions commit_block_list_options_;
};

bool IsDfsEmulator(const AzureOptions& options) {
Expand Down
48 changes: 29 additions & 19 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2458,38 +2458,48 @@ TEST_F(TestAzuriteFileSystem, WriteMetadata) {
ASSERT_OK(output->Close());

// Verify the metadata has been set.
// TODO(GH-40025): Use `AzureFileSystem` to fetch metadata for this assertion.
auto blob_metadata = blob_service_client_->GetBlobContainerClient(data.container_name)
.GetBlockBlobClient(blob_path)
.GetProperties()
.Value.Metadata;
EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("foo", "bar")}, blob_metadata);
ASSERT_OK_AND_ASSIGN(auto input, fs()->OpenInputStream(full_path));
ASSERT_OK_AND_ASSIGN(auto metadata, input->ReadMetadata());
ASSERT_OK_AND_ASSIGN(auto foo_value, metadata->Get("foo"));
ASSERT_EQ("bar", foo_value);

// Check that explicit metadata overrides the defaults.
ASSERT_OK_AND_ASSIGN(
output, fs_with_defaults->OpenOutputStream(
full_path, /*metadata=*/arrow::key_value_metadata({{"bar", "foo"}})));
full_path, arrow::key_value_metadata({{"bar", "foo"}})));
ASSERT_OK(output->Write(expected));
ASSERT_OK(output->Close());
// TODO(GH-40025): Use `AzureFileSystem` to fetch metadata for this assertion.
blob_metadata = blob_service_client_->GetBlobContainerClient(data.container_name)
.GetBlockBlobClient(blob_path)
.GetProperties()
.Value.Metadata;
ASSERT_OK_AND_ASSIGN(input, fs()->OpenInputStream(full_path));
ASSERT_OK_AND_ASSIGN(metadata, input->ReadMetadata());
// Defaults are overwritten and not merged.
EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "foo")}, blob_metadata);
ASSERT_NOT_OK(metadata->Get("foo"));
ASSERT_OK_AND_ASSIGN(auto bar_value, metadata->Get("bar"));
ASSERT_EQ("foo", bar_value);

// Metadata can be written without writing any data.
ASSERT_OK_AND_ASSIGN(
output, fs_with_defaults->OpenAppendStream(
full_path, /*metadata=*/arrow::key_value_metadata({{"bar", "baz"}})));
full_path, arrow::key_value_metadata({{"bar", "baz"}})));
ASSERT_OK(output->Close());
blob_metadata = blob_service_client_->GetBlobContainerClient(data.container_name)
.GetBlockBlobClient(blob_path)
.GetProperties()
.Value.Metadata;
ASSERT_OK_AND_ASSIGN(input, fs()->OpenInputStream(full_path));
ASSERT_OK_AND_ASSIGN(metadata, input->ReadMetadata());
// Defaults are overwritten and not merged.
EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "baz")}, blob_metadata);
ASSERT_NOT_OK(metadata->Get("foo"));
ASSERT_OK_AND_ASSIGN(bar_value, metadata->Get("bar"));
ASSERT_EQ("baz", bar_value);
}

TEST_F(TestAzuriteFileSystem, WriteMetadataHttpHeaders) {
auto data = SetUpPreexistingData();
ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(data.ObjectPath(),
arrow::key_value_metadata({{"Content-Type", "text/plain"}})));
ASSERT_OK(output->Write(PreexistingData::kLoremIpsum));
ASSERT_OK(output->Close());

ASSERT_OK_AND_ASSIGN(auto input, fs()->OpenInputStream(data.ObjectPath()));
ASSERT_OK_AND_ASSIGN(auto metadata, input->ReadMetadata());
ASSERT_OK_AND_ASSIGN(auto content_type, metadata->Get("Content-Type"));
ASSERT_EQ("text/plain", content_type);
}

TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) {
Expand Down

0 comments on commit fbba12e

Please sign in to comment.