Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-38700: [C++][FS][Azure] Implement DeleteDir() #38793

Merged
merged 7 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 115 additions & 1 deletion cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,119 @@ class AzureFileSystem::Impl {
RETURN_NOT_OK(stream->Init());
return stream;
}

Status DeleteDir(const AzureLocation& location) {
if (location.container.empty()) {
return Status::Invalid("Cannot delete an empty container");
}

if (location.path.empty()) {
auto container_client =
blob_service_client_->GetBlobContainerClient(location.container);
try {
auto response = container_client.Delete();
if (response.Value.Deleted) {
return Status::OK();
} else {
return StatusFromErrorResponse(
container_client.GetUrl(), response.RawResponse.get(),
"Failed to delete a container: " + location.container);
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete a container: " + location.container + ": " +
container_client.GetUrl(),
exception);
}
}

ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled,
hierarchical_namespace_.Enabled(location.container));
if (hierarchical_namespace_enabled) {
auto directory_client =
datalake_service_client_->GetFileSystemClient(location.container)
.GetDirectoryClient(location.path);
try {
auto response = directory_client.DeleteRecursive();
if (response.Value.Deleted) {
return Status::OK();
} else {
return StatusFromErrorResponse(
directory_client.GetUrl(), response.RawResponse.get(),
"Failed to delete a directory: " + location.path);
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete a directory: " + location.path + ": " +
directory_client.GetUrl(),
exception);
}
} else {
auto container_client =
blob_service_client_->GetBlobContainerClient(location.container);
Azure::Storage::Blobs::ListBlobsOptions options;
options.Prefix = internal::EnsureTrailingSlash(location.path);
// https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks
//
// Only supports up to 256 subrequests in a single batch. The
// size of the body for a batch request can't exceed 4 MB.
const int32_t kNumMaxRequestsInBatch = 256;
options.PageSizeHint = kNumMaxRequestsInBatch;
try {
auto list_response = container_client.ListBlobs(options);
while (list_response.HasPage() && !list_response.Blobs.empty()) {
auto batch = container_client.CreateBatch();
std::vector<Azure::Storage::DeferredResponse<
Azure::Storage::Blobs::Models::DeleteBlobResult>>
deferred_responses;
for (const auto& blob_item : list_response.Blobs) {
deferred_responses.push_back(batch.DeleteBlob(blob_item.Name));
}
try {
container_client.SubmitBatch(batch);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
}
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
std::vector<std::string> failed_blob_names;
for (size_t i = 0; i < deferred_responses.size(); ++i) {
const auto& deferred_response = deferred_responses[i];
bool success = true;
try {
auto delete_result = deferred_response.GetResponse();
success = delete_result.Value.Deleted;
} catch (const Azure::Storage::StorageException& exception) {
success = false;
}
if (!success) {
const auto& blob_item = list_response.Blobs[i];
failed_blob_names.push_back(blob_item.Name);
}
}
if (!failed_blob_names.empty()) {
if (failed_blob_names.size() == 1) {
return Status::IOError("Failed to delete a blob: ", failed_blob_names[0],
": " + container_client.GetUrl());
} else {
return Status::IOError(
"Failed to delete blobs: [",
arrow::internal::JoinStrings(failed_blob_names, ", "),
"]: " + container_client.GetUrl());
}
}
list_response.MoveToNextPage();
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to list blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
}
return Status::OK();
}
}
};

const AzureOptions& AzureFileSystem::options() const { return impl_->options(); }
Expand Down Expand Up @@ -1003,7 +1116,8 @@ Status AzureFileSystem::CreateDir(const std::string& path, bool recursive) {
}

Status AzureFileSystem::DeleteDir(const std::string& path) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
return impl_->DeleteDir(location);
}

Status AzureFileSystem::DeleteDirContents(const std::string& path, bool missing_dir_ok) {
Expand Down
164 changes: 160 additions & 4 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "arrow/testing/util.h"
#include "arrow/util/io_util.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/string.h"
#include "arrow/util/value_parsing.h"

Expand Down Expand Up @@ -92,9 +93,15 @@ class AzuriteEnv : public ::testing::Environment {
return;
}
auto temp_dir_ = *TemporaryDir::Make("azurefs-test-");
server_process_ = bp::child(boost::this_process::environment(), exe_path, "--silent",
"--location", temp_dir_->path().ToString(), "--debug",
temp_dir_->path().ToString() + "/debug.log");
auto debug_log_path_result = temp_dir_->path().Join("debug.log");
if (!debug_log_path_result.ok()) {
status_ = debug_log_path_result.status();
return;
}
debug_log_path_ = *debug_log_path_result;
server_process_ =
bp::child(boost::this_process::environment(), exe_path, "--silent", "--location",
temp_dir_->path().ToString(), "--debug", debug_log_path_.ToString());
if (!(server_process_.valid() && server_process_.running())) {
auto error = "Could not start Azurite emulator.";
server_process_.terminate();
Expand All @@ -110,6 +117,44 @@ class AzuriteEnv : public ::testing::Environment {
server_process_.wait();
}

Result<int64_t> GetDebugLogSize() {
ARROW_ASSIGN_OR_RAISE(auto exists, arrow::internal::FileExists(debug_log_path_));
if (!exists) {
return 0;
}
ARROW_ASSIGN_OR_RAISE(auto file_descriptor,
arrow::internal::FileOpenReadable(debug_log_path_));
ARROW_RETURN_NOT_OK(arrow::internal::FileSeek(file_descriptor.fd(), 0, SEEK_END));
return arrow::internal::FileTell(file_descriptor.fd());
}

Status DumpDebugLog(int64_t position = 0) {
ARROW_ASSIGN_OR_RAISE(auto exists, arrow::internal::FileExists(debug_log_path_));
if (!exists) {
return Status::OK();
}
ARROW_ASSIGN_OR_RAISE(auto file_descriptor,
arrow::internal::FileOpenReadable(debug_log_path_));
if (position > 0) {
ARROW_RETURN_NOT_OK(arrow::internal::FileSeek(file_descriptor.fd(), position));
}
std::vector<uint8_t> buffer;
const int64_t buffer_size = 4096;
buffer.reserve(buffer_size);
while (true) {
ARROW_ASSIGN_OR_RAISE(
auto n_read_bytes,
arrow::internal::FileRead(file_descriptor.fd(), buffer.data(), buffer_size));
if (n_read_bytes <= 0) {
break;
}
std::cerr << std::string_view(reinterpret_cast<const char*>(buffer.data()),
n_read_bytes);
}
std::cerr << std::endl;
return Status::OK();
}

const std::string& account_name() const { return account_name_; }
const std::string& account_key() const { return account_key_; }
const Status status() const { return status_; }
Expand All @@ -120,6 +165,7 @@ class AzuriteEnv : public ::testing::Environment {
bp::child server_process_;
Status status_;
std::unique_ptr<TemporaryDir> temp_dir_;
arrow::internal::PlatformFilename debug_log_path_;
};

auto* azurite_env = ::testing::AddGlobalTestEnvironment(new AzuriteEnv);
Expand Down Expand Up @@ -244,15 +290,28 @@ class AzureFileSystemTest : public ::testing::Test {
};

class AzuriteFileSystemTest : public AzureFileSystemTest {
Result<AzureOptions> MakeOptions() {
Result<AzureOptions> MakeOptions() override {
EXPECT_THAT(GetAzuriteEnv(), NotNull());
ARROW_EXPECT_OK(GetAzuriteEnv()->status());
ARROW_ASSIGN_OR_RAISE(debug_log_start_, GetAzuriteEnv()->GetDebugLogSize());
AzureOptions options;
options.backend = AzureBackend::Azurite;
ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(
GetAzuriteEnv()->account_name(), GetAzuriteEnv()->account_key()));
return options;
}

void TearDown() override {
AzureFileSystemTest::TearDown();
if (HasFailure()) {
// XXX: This may not include all logs in the target test because
// Azurite doesn't flush debug logs immediately... You may want
// to check the log manually...
ARROW_IGNORE_EXPR(GetAzuriteEnv()->DumpDebugLog(debug_log_start_));
}
}

int64_t debug_log_start_ = 0;
};

class AzureFlatNamespaceFileSystemTest : public AzureFileSystemTest {
Expand Down Expand Up @@ -510,6 +569,103 @@ TEST_F(AzuriteFileSystemTest, CreateDirUri) {
ASSERT_RAISES(Invalid, fs_->CreateDir("abfs://" + RandomContainerName(), true));
}

TEST_F(AzuriteFileSystemTest, DeleteDirSuccessContainer) {
const auto container_name = RandomContainerName();
ASSERT_OK(fs_->CreateDir(container_name));
arrow::fs::AssertFileInfo(fs_.get(), container_name, FileType::Directory);
ASSERT_OK(fs_->DeleteDir(container_name));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a arrow::fs::AssertFileInfo(fs_.get(), path, FileType::NotFound); at the end? Personally I would probably also add an assertion that the container does exist before deleting it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I'll add them.

arrow::fs::AssertFileInfo(fs_.get(), container_name, FileType::NotFound);
}

TEST_F(AzuriteFileSystemTest, DeleteDirSuccessEmpty) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
// There is only virtual directory without hierarchical namespace
// support. So the CreateDir() and DeleteDir() do nothing.
ASSERT_OK(fs_->CreateDir(directory_path));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
ASSERT_OK(fs_->DeleteDir(directory_path));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
}

TEST_F(AzuriteFileSystemTest, DeleteDirSuccessNonexistent) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
// There is only virtual directory without hierarchical namespace
// support. So the DeleteDir() for nonexistent directory does nothing.
ASSERT_OK(fs_->DeleteDir(directory_path));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
}

TEST_F(AzuriteFileSystemTest, DeleteDirSuccessHaveBlobs) {
#ifdef __APPLE__
GTEST_SKIP() << "This test fails by an Azurite problem: "
"https://github.com/Azure/Azurite/pull/2302";
#endif
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
// We must use 257 or more blobs here to test pagination of ListBlobs().
// Because we can't add 257 or more delete blob requests to one SubmitBatch().
int64_t n_blobs = 257;
for (int64_t i = 0; i < n_blobs; ++i) {
const auto blob_path =
internal::ConcatAbstractPath(directory_path, std::to_string(i) + ".txt");
ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(blob_path));
ASSERT_OK(output->Write(std::string_view(std::to_string(i))));
ASSERT_OK(output->Close());
arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::File);
}
ASSERT_OK(fs_->DeleteDir(directory_path));
for (int64_t i = 0; i < n_blobs; ++i) {
const auto blob_path =
internal::ConcatAbstractPath(directory_path, std::to_string(i) + ".txt");
arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::NotFound);
}
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirSuccessEmpty) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_OK(fs_->CreateDir(directory_path, true));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::Directory);
ASSERT_OK(fs_->DeleteDir(directory_path));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirFailureNonexistent) {
const auto path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_RAISES(IOError, fs_->DeleteDir(path));
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirSuccessHaveBlob) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
const auto blob_path = internal::ConcatAbstractPath(directory_path, "hello.txt");
ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(blob_path));
ASSERT_OK(output->Write(std::string_view("hello")));
ASSERT_OK(output->Close());
arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::File);
ASSERT_OK(fs_->DeleteDir(directory_path));
arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::NotFound);
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirSuccessHaveDirectory) {
const auto parent =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
const auto path = internal::ConcatAbstractPath(parent, "new-sub");
ASSERT_OK(fs_->CreateDir(path, true));
arrow::fs::AssertFileInfo(fs_.get(), path, FileType::Directory);
arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::Directory);
ASSERT_OK(fs_->DeleteDir(parent));
arrow::fs::AssertFileInfo(fs_.get(), path, FileType::NotFound);
arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::NotFound);
}

TEST_F(AzuriteFileSystemTest, DeleteDirUri) {
ASSERT_RAISES(Invalid, fs_->DeleteDir("abfs://" + PreexistingContainerPath()));
}

TEST_F(AzuriteFileSystemTest, OpenInputStreamString) {
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
Expand Down
Loading