-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-38335: [C++] Implement GetFileInfo
for a single file in Azure filesystem
#38505
Changes from all commits
abf6412
80b12ee
16b6b9a
fa869ea
b3f7157
7ccafbf
fe87c17
5067a6e
1416e57
3459836
d51b4e6
4adc5e7
8de8ddf
655ac3b
49a7c69
cc6f156
b3ea915
379090a
2f59183
5906a6a
a030a99
147b9a0
d94f961
b9e8991
4c89729
218312b
22d64d3
80d0a55
d0f3311
8912dbf
3b920f1
6ab232a
89320c0
e868125
2cd51c1
6ece0c9
c562c7e
2486c86
fb03dd8
c8370b3
25a0c76
5540c70
3fd35c6
44f6aa5
0659a39
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,8 +16,10 @@ | |
// under the License. | ||
|
||
#include "arrow/filesystem/azurefs.h" | ||
#include "arrow/filesystem/azurefs_internal.h" | ||
|
||
#include <azure/storage/blobs.hpp> | ||
#include <azure/storage/files/datalake.hpp> | ||
|
||
#include "arrow/buffer.h" | ||
#include "arrow/filesystem/path_util.h" | ||
|
@@ -59,6 +61,7 @@ Status AzureOptions::ConfigureAccountKeyCredentials(const std::string& account_n | |
credentials_kind = AzureCredentialsKind::StorageCredentials; | ||
return Status::OK(); | ||
} | ||
|
||
namespace { | ||
|
||
// An AzureFileSystem represents a single Azure storage account. AzurePath describes a | ||
|
@@ -79,18 +82,17 @@ struct AzurePath { | |
"Expected an Azure object path of the form 'container/path...', got a URI: '", | ||
s, "'"); | ||
} | ||
const auto src = internal::RemoveTrailingSlash(s); | ||
auto first_sep = src.find_first_of(internal::kSep); | ||
auto first_sep = s.find_first_of(internal::kSep); | ||
if (first_sep == 0) { | ||
return Status::Invalid("Path cannot start with a separator ('", s, "')"); | ||
} | ||
if (first_sep == std::string::npos) { | ||
return AzurePath{std::string(src), std::string(src), "", {}}; | ||
return AzurePath{std::string(s), std::string(s), "", {}}; | ||
} | ||
AzurePath path; | ||
path.full_path = std::string(src); | ||
path.container = std::string(src.substr(0, first_sep)); | ||
path.path_to_file = std::string(src.substr(first_sep + 1)); | ||
path.full_path = std::string(s); | ||
path.container = std::string(s.substr(0, first_sep)); | ||
path.path_to_file = std::string(s.substr(first_sep + 1)); | ||
path.path_to_file_parts = internal::SplitAbstractPath(path.path_to_file); | ||
RETURN_NOT_OK(Validate(path)); | ||
return path; | ||
|
@@ -146,11 +148,6 @@ Status ValidateFilePath(const AzurePath& path) { | |
return Status::OK(); | ||
} | ||
|
||
Status ErrorToStatus(const std::string& prefix, | ||
const Azure::Storage::StorageException& exception) { | ||
return Status::IOError(prefix, " Azure Error: ", exception.what()); | ||
} | ||
|
||
template <typename ArrowType> | ||
std::string FormatValue(typename TypeTraits<ArrowType>::CType value) { | ||
struct StringAppender { | ||
|
@@ -316,11 +313,13 @@ class ObjectInputFile final : public io::RandomAccessFile { | |
return Status::OK(); | ||
} catch (const Azure::Storage::StorageException& exception) { | ||
if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { | ||
// Could be either container or blob not found. | ||
return PathNotFound(path_); | ||
} | ||
return ErrorToStatus( | ||
"When fetching properties for '" + blob_client_->GetUrl() + "': ", exception); | ||
return internal::ExceptionToStatus( | ||
"GetProperties failed for '" + blob_client_->GetUrl() + | ||
"' with an unexpected Azure error. Can not initialise an ObjectInputFile " | ||
"without knowing the file size.", | ||
exception); | ||
} | ||
} | ||
|
||
|
@@ -397,10 +396,12 @@ class ObjectInputFile final : public io::RandomAccessFile { | |
->DownloadTo(reinterpret_cast<uint8_t*>(out), nbytes, download_options) | ||
.Value.ContentRange.Length.Value(); | ||
} catch (const Azure::Storage::StorageException& exception) { | ||
return ErrorToStatus("When reading from '" + blob_client_->GetUrl() + | ||
"' at position " + std::to_string(position) + " for " + | ||
std::to_string(nbytes) + " bytes: ", | ||
exception); | ||
return internal::ExceptionToStatus("DownloadTo from '" + blob_client_->GetUrl() + | ||
"' at position " + std::to_string(position) + | ||
" for " + std::to_string(nbytes) + | ||
" bytes failed with an Azure error. ReadAt " | ||
"failed to read the required byte range.", | ||
exception); | ||
} | ||
} | ||
|
||
|
@@ -444,7 +445,6 @@ class ObjectInputFile final : public io::RandomAccessFile { | |
int64_t content_length_ = kNoSize; | ||
std::shared_ptr<const KeyValueMetadata> metadata_; | ||
}; | ||
|
||
} // namespace | ||
|
||
// ----------------------------------------------------------------------- | ||
|
@@ -453,27 +453,136 @@ class ObjectInputFile final : public io::RandomAccessFile { | |
class AzureFileSystem::Impl { | ||
public: | ||
io::IOContext io_context_; | ||
std::shared_ptr<Azure::Storage::Blobs::BlobServiceClient> service_client_; | ||
std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeServiceClient> | ||
datalake_service_client_; | ||
std::unique_ptr<Azure::Storage::Blobs::BlobServiceClient> blob_service_client_; | ||
AzureOptions options_; | ||
internal::HierarchicalNamespaceDetector hierarchical_namespace_; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that How about moving There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made it separate because I wanted to keep the cached value I think one possibility is to use a non-smart pointer in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I decided to just make my preferred change. If you think its a bad idea I'm happy to change it again to something else. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. Let's use the approach. |
||
|
||
explicit Impl(AzureOptions options, io::IOContext io_context) | ||
: io_context_(io_context), options_(std::move(options)) {} | ||
|
||
Status Init() { | ||
service_client_ = std::make_shared<Azure::Storage::Blobs::BlobServiceClient>( | ||
blob_service_client_ = std::make_unique<Azure::Storage::Blobs::BlobServiceClient>( | ||
options_.account_blob_url, options_.storage_credentials_provider); | ||
datalake_service_client_ = | ||
std::make_unique<Azure::Storage::Files::DataLake::DataLakeServiceClient>( | ||
options_.account_dfs_url, options_.storage_credentials_provider); | ||
RETURN_NOT_OK(hierarchical_namespace_.Init(datalake_service_client_.get())); | ||
return Status::OK(); | ||
} | ||
|
||
const AzureOptions& options() const { return options_; } | ||
|
||
public: | ||
Result<FileInfo> GetFileInfo(const AzurePath& path) { | ||
FileInfo info; | ||
info.set_path(path.full_path); | ||
|
||
if (path.container.empty()) { | ||
DCHECK(path.path_to_file.empty()); // The path is invalid if the container is empty | ||
// but not path_to_file. | ||
// path must refer to the root of the Azure storage account. This is a directory, | ||
// and there isn't any extra metadata to fetch. | ||
info.set_type(FileType::Directory); | ||
return info; | ||
} | ||
if (path.path_to_file.empty()) { | ||
// path refers to a container. This is a directory if it exists. | ||
auto container_client = | ||
blob_service_client_->GetBlobContainerClient(path.container); | ||
try { | ||
auto properties = container_client.GetProperties(); | ||
info.set_type(FileType::Directory); | ||
info.set_mtime( | ||
std::chrono::system_clock::time_point(properties.Value.LastModified)); | ||
return info; | ||
} catch (const Azure::Storage::StorageException& exception) { | ||
if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { | ||
info.set_type(FileType::NotFound); | ||
return info; | ||
} | ||
return internal::ExceptionToStatus( | ||
"GetProperties for '" + container_client.GetUrl() + | ||
"' failed with an unexpected Azure error. GetFileInfo is unable to " | ||
"determine whether the container exists.", | ||
exception); | ||
} | ||
} | ||
auto file_client = datalake_service_client_->GetFileSystemClient(path.container) | ||
.GetFileClient(path.path_to_file); | ||
try { | ||
auto properties = file_client.GetProperties(); | ||
if (properties.Value.IsDirectory) { | ||
info.set_type(FileType::Directory); | ||
} else if (internal::HasTrailingSlash(path.path_to_file)) { | ||
// For a path with a trailing slash a hierarchical namespace may return a blob | ||
// with that trailing slash removed. For consistency with flat namespace and | ||
// other filesystems we chose to return NotFound. | ||
info.set_type(FileType::NotFound); | ||
return info; | ||
} else { | ||
info.set_type(FileType::File); | ||
info.set_size(properties.Value.FileSize); | ||
} | ||
info.set_mtime( | ||
std::chrono::system_clock::time_point(properties.Value.LastModified)); | ||
return info; | ||
} catch (const Azure::Storage::StorageException& exception) { | ||
if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { | ||
ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled, | ||
hierarchical_namespace_.Enabled(path.container)); | ||
if (hierarchical_namespace_enabled) { | ||
// If the hierarchical namespace is enabled, then the storage account will have | ||
// explicit directories. Neither a file nor a directory was found. | ||
info.set_type(FileType::NotFound); | ||
return info; | ||
} | ||
// On flat namespace accounts there are no real directories. Directories are only | ||
// implied by using `/` in the blob name. | ||
Azure::Storage::Blobs::ListBlobsOptions list_blob_options; | ||
|
||
// If listing the prefix `path.path_to_file` with trailing slash returns at least | ||
// one result then `path` refers to an implied directory. | ||
auto prefix = internal::EnsureTrailingSlash(path.path_to_file); | ||
list_blob_options.Prefix = prefix; | ||
// We only need to know if there is at least one result, so minimise page size | ||
// for efficiency. | ||
list_blob_options.PageSizeHint = 1; | ||
|
||
try { | ||
auto paged_list_result = | ||
blob_service_client_->GetBlobContainerClient(path.container) | ||
.ListBlobs(list_blob_options); | ||
if (paged_list_result.Blobs.size() > 0) { | ||
info.set_type(FileType::Directory); | ||
} else { | ||
info.set_type(FileType::NotFound); | ||
} | ||
return info; | ||
} catch (const Azure::Storage::StorageException& exception) { | ||
return internal::ExceptionToStatus( | ||
"ListBlobs for '" + prefix + | ||
"' failed with an unexpected Azure error. GetFileInfo is unable to " | ||
"determine whether the path should be considered an implied directory.", | ||
exception); | ||
} | ||
} | ||
return internal::ExceptionToStatus( | ||
"GetProperties for '" + file_client.GetUrl() + | ||
"' failed with an unexpected " | ||
"Azure error. GetFileInfo is unable to determine whether the path exists.", | ||
exception); | ||
} | ||
} | ||
|
||
Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s, | ||
AzureFileSystem* fs) { | ||
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(s)); | ||
ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); | ||
RETURN_NOT_OK(ValidateFilePath(path)); | ||
auto blob_client = std::make_shared<Azure::Storage::Blobs::BlobClient>( | ||
service_client_->GetBlobContainerClient(path.container) | ||
blob_service_client_->GetBlobContainerClient(path.container) | ||
.GetBlobClient(path.path_to_file)); | ||
|
||
auto ptr = | ||
|
@@ -494,7 +603,7 @@ class AzureFileSystem::Impl { | |
ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(info.path())); | ||
RETURN_NOT_OK(ValidateFilePath(path)); | ||
auto blob_client = std::make_shared<Azure::Storage::Blobs::BlobClient>( | ||
service_client_->GetBlobContainerClient(path.container) | ||
blob_service_client_->GetBlobContainerClient(path.container) | ||
.GetBlobClient(path.path_to_file)); | ||
|
||
auto ptr = std::make_shared<ObjectInputFile>(blob_client, fs->io_context(), | ||
|
@@ -518,7 +627,8 @@ bool AzureFileSystem::Equals(const FileSystem& other) const { | |
} | ||
|
||
Result<FileInfo> AzureFileSystem::GetFileInfo(const std::string& path) { | ||
return Status::NotImplemented("The Azure FileSystem is not fully implemented"); | ||
ARROW_ASSIGN_OR_RAISE(auto p, AzurePath::FromString(path)); | ||
return impl_->GetFileInfo(p); | ||
} | ||
|
||
Result<FileInfoVector> AzureFileSystem::GetFileInfo(const FileSelector& select) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#include "arrow/filesystem/azurefs_internal.h" | ||
|
||
#include <azure/storage/files/datalake.hpp> | ||
|
||
#include "arrow/result.h" | ||
|
||
namespace arrow::fs::internal { | ||
|
||
Status ExceptionToStatus(const std::string& prefix, | ||
const Azure::Storage::StorageException& exception) { | ||
return Status::IOError(prefix, " Azure Error: ", exception.what()); | ||
} | ||
|
||
Status HierarchicalNamespaceDetector::Init( | ||
Azure::Storage::Files::DataLake::DataLakeServiceClient* datalake_service_client) { | ||
datalake_service_client_ = datalake_service_client; | ||
return Status::OK(); | ||
} | ||
|
||
Result<bool> HierarchicalNamespaceDetector::Enabled(const std::string& container_name) { | ||
// Hierarchical namespace can't easily be changed after the storage account is created | ||
// and its common across all containers in the storage account. Do nothing until we've | ||
// checked for a cached result. | ||
if (enabled_.has_value()) { | ||
return enabled_.value(); | ||
} | ||
|
||
// This approach is inspired by hadoop-azure | ||
// https://github.com/apache/hadoop/blob/7c6af6a5f626d18d68b656d085cc23e4c1f7a1ef/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java#L356. | ||
// Unfortunately `blob_service_client->GetAccountInfo()` requires significantly | ||
// elevated permissions. | ||
// https://learn.microsoft.com/en-us/rest/api/storageservices/get-blob-service-properties?tabs=azure-ad#authorization | ||
auto filesystem_client = datalake_service_client_->GetFileSystemClient(container_name); | ||
auto directory_client = filesystem_client.GetDirectoryClient("/"); | ||
try { | ||
directory_client.GetAccessControlList(); | ||
enabled_ = true; | ||
} catch (const Azure::Storage::StorageException& exception) { | ||
// GetAccessControlList will fail on storage accounts without hierarchical | ||
// namespace enabled. | ||
|
||
if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::BadRequest || | ||
exception.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict) { | ||
// Flat namespace storage accounts with soft delete enabled return | ||
// Conflict - This endpoint does not support BlobStorageEvents or SoftDelete | ||
// otherwise it returns: BadRequest - This operation is only supported on a | ||
// hierarchical namespace account. | ||
enabled_ = false; | ||
} else if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { | ||
// Azurite returns NotFound. | ||
try { | ||
filesystem_client.GetProperties(); | ||
enabled_ = false; | ||
} catch (const Azure::Storage::StorageException& exception) { | ||
return ExceptionToStatus("Failed to confirm '" + filesystem_client.GetUrl() + | ||
"' is an accessible container. Therefore the " | ||
"hierarchical namespace check was invalid.", | ||
exception); | ||
} | ||
} else { | ||
return ExceptionToStatus( | ||
"GetAccessControlList for '" + directory_client.GetUrl() + | ||
"' failed with an unexpected Azure error, while checking " | ||
"whether the storage account has hierarchical namespace enabled.", | ||
exception); | ||
} | ||
} | ||
return enabled_.value(); | ||
} | ||
|
||
} // namespace arrow::fs::internal |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#pragma once | ||
|
||
#include <optional> | ||
|
||
#include <azure/storage/files/datalake.hpp> | ||
|
||
#include "arrow/result.h" | ||
|
||
namespace arrow::fs::internal { | ||
|
||
Status ExceptionToStatus(const std::string& prefix, | ||
const Azure::Storage::StorageException& exception); | ||
|
||
class HierarchicalNamespaceDetector { | ||
public: | ||
Status Init( | ||
Azure::Storage::Files::DataLake::DataLakeServiceClient* datalake_service_client); | ||
Result<bool> Enabled(const std::string& container_name); | ||
|
||
private: | ||
Azure::Storage::Files::DataLake::DataLakeServiceClient* datalake_service_client_; | ||
std::optional<bool> enabled_; | ||
}; | ||
|
||
} // namespace arrow::fs::internal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was preventing
GetFileInfo
working on directories. The other filesystems did not have this.