Skip to content

Commit

Permalink
Add GetFileInfoWithSelectorFromFileSystem()
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Apr 10, 2024
1 parent 3ac1198 commit ff8d266
Showing 1 changed file with 70 additions and 27 deletions.
97 changes: 70 additions & 27 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,15 @@ Status CreateContainerIfNotExists(const std::string& container_name,
}
}

FileInfo FileInfoFromPath(std::string_view container,
const DataLake::Models::PathItem& path) {
FileInfo info{internal::ConcatAbstractPath(container, path.Name),
path.IsDirectory ? FileType::Directory : FileType::File};
info.set_size(path.FileSize);
info.set_mtime(std::chrono::system_clock::time_point{path.LastModified});
return info;
}

FileInfo DirectoryFileInfoFromPath(std::string_view path) {
return FileInfo{std::string{internal::RemoveTrailingSlash(path)}, FileType::Directory};
}
Expand Down Expand Up @@ -1576,7 +1585,7 @@ class AzureFileSystem::Impl {
}

private:
/// \pref location.container is not empty.
/// \pre location.container is not empty.
template <typename ContainerClient>
Status CheckDirExists(const ContainerClient& container_client,
const AzureLocation& location) {
Expand Down Expand Up @@ -1623,6 +1632,50 @@ class AzureFileSystem::Impl {
return result;
}

/// \brief List the paths at the root of a filesystem or some dir in a filesystem.
///
/// \pre adlfs_client is the client for the filesystem named like the first
/// segment of select.base_dir.
Status GetFileInfoWithSelectorFromFileSystem(
const DataLake::DataLakeFileSystemClient& adlfs_client,
const Core::Context& context, Azure::Nullable<int32_t> page_size_hint,
const FileSelector& select, FileInfoVector* acc_results) {
ARROW_ASSIGN_OR_RAISE(auto base_location, AzureLocation::FromString(select.base_dir));

auto directory_client = adlfs_client.GetDirectoryClient(base_location.path);
bool found = false;
DataLake::ListPathsOptions options;
options.PageSizeHint = page_size_hint;

try {
auto list_response = directory_client.ListPaths(select.recursive, options, context);
for (; list_response.HasPage(); list_response.MoveToNextPage(context)) {
if (list_response.Paths.empty()) {
continue;
}
found = true;
for (const auto& path : list_response.Paths) {
if (path.Name == base_location.path && !path.IsDirectory) {
return NotADir(base_location);
}
acc_results->push_back(FileInfoFromPath(base_location.container, path));
}
}
} catch (const Storage::StorageException& exception) {
if (IsContainerNotFound(exception) || exception.ErrorCode == "PathNotFound") {
found = false;
} else {
return ExceptionToStatus(exception,
"Failed to list paths in a directory: ", select.base_dir,
": ", directory_client.GetUrl());
}
}

return found || select.allow_not_found
? Status::OK()
: ::arrow::fs::internal::PathNotFound(select.base_dir);
}

/// \brief List the blobs at the root of a container or some dir in a container.
///
/// \pre container_client is the client for the container named like the first
Expand All @@ -1642,27 +1695,11 @@ class AzureFileSystem::Impl {
options.Prefix = {};
found = true; // Unless the container itself is not found later!
} else {
ARROW_ASSIGN_OR_RAISE(
auto prefix, AzureLocation::FromString(
std::string(internal::EnsureTrailingSlash(select.base_dir))));
ARROW_ASSIGN_OR_RAISE(auto info, GetFileInfo(container_client, prefix));
if (info.type() == FileType::NotFound) {
if (select.allow_not_found) {
return Status::OK();
} else {
return PathNotFound(base_location);
}
} else if (info.type() != FileType::Directory) {
return NotADir(base_location);
}
options.Prefix = prefix.path;
options.Prefix = internal::EnsureTrailingSlash(base_location.path);
}
options.PageSizeHint = page_size_hint;
options.Include = Blobs::Models::ListBlobsIncludeFlags::Metadata;

auto adlfs_client = GetFileSystemClient(base_location.container);
ARROW_ASSIGN_OR_RAISE(auto hns_support, HierarchicalNamespaceSupport(adlfs_client));

auto recurse = [&](const std::string& blob_prefix) noexcept -> Status {
if (select.recursive && select.max_recursion > 0) {
FileSelector sub_select;
Expand All @@ -1687,15 +1724,7 @@ class AzureFileSystem::Impl {
};
auto process_prefix = [&](const std::string& prefix) noexcept -> Status {
const auto path = internal::ConcatAbstractPath(base_location.container, prefix);
if (hns_support == HNSSupport::kEnabled) {
ARROW_ASSIGN_OR_RAISE(
auto location,
AzureLocation::FromString(std::string(internal::RemoveTrailingSlash(path))));
ARROW_ASSIGN_OR_RAISE(auto info, GetFileInfo(adlfs_client, location));
acc_results->push_back(std::move(info));
} else {
acc_results->push_back(DirectoryFileInfoFromPath(path));
}
acc_results->push_back(DirectoryFileInfoFromPath(path));
return recurse(prefix);
};

Expand Down Expand Up @@ -1796,6 +1825,20 @@ class AzureFileSystem::Impl {
return VisitContainers(context, std::move(on_container));
}

auto adlfs_client = GetFileSystemClient(base_location.container);
ARROW_ASSIGN_OR_RAISE(auto hns_support, HierarchicalNamespaceSupport(adlfs_client));
if (hns_support == HNSSupport::kContainerNotFound) {
if (select.allow_not_found) {
return Status::OK();
} else {
return ::arrow::fs::internal::PathNotFound(select.base_dir);
}
}
if (hns_support == HNSSupport::kEnabled) {
return GetFileInfoWithSelectorFromFileSystem(adlfs_client, context, page_size_hint,
select, acc_results);
}
DCHECK_EQ(hns_support, HNSSupport::kDisabled);
auto container_client =
blob_service_client_->GetBlobContainerClient(base_location.container);
return GetFileInfoWithSelectorFromContainer(container_client, context, page_size_hint,
Expand Down

0 comments on commit ff8d266

Please sign in to comment.