Skip to content

Commit

Permalink
apacheGH-38700: [C++][FS][Azure] Implement DeleteDir() (apache#38793)
Browse files Browse the repository at this point in the history
### Rationale for this change

`DeleteDir()` deletes the given directory recursively like other filesystem implementations.

### What changes are included in this PR?

* Container can be deleted with/without hierarchical namespace support.
* Directory can be deleted with hierarchical namespace support.
* Directory can't be deleted without hierarchical namespace support. But blobs under the given path can be deleted. So these blobs are deleted and the given virtual directory is also deleted.
    
### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* Closes: apache#38700

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
kou authored and dgreiss committed Feb 17, 2024
1 parent 99a4bd8 commit d23baa1
Show file tree
Hide file tree
Showing 2 changed files with 275 additions and 5 deletions.
116 changes: 115 additions & 1 deletion cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,119 @@ class AzureFileSystem::Impl {
RETURN_NOT_OK(stream->Init());
return stream;
}

Status DeleteDir(const AzureLocation& location) {
if (location.container.empty()) {
return Status::Invalid("Cannot delete an empty container");
}

if (location.path.empty()) {
auto container_client =
blob_service_client_->GetBlobContainerClient(location.container);
try {
auto response = container_client.Delete();
if (response.Value.Deleted) {
return Status::OK();
} else {
return StatusFromErrorResponse(
container_client.GetUrl(), response.RawResponse.get(),
"Failed to delete a container: " + location.container);
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete a container: " + location.container + ": " +
container_client.GetUrl(),
exception);
}
}

ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled,
hierarchical_namespace_.Enabled(location.container));
if (hierarchical_namespace_enabled) {
auto directory_client =
datalake_service_client_->GetFileSystemClient(location.container)
.GetDirectoryClient(location.path);
try {
auto response = directory_client.DeleteRecursive();
if (response.Value.Deleted) {
return Status::OK();
} else {
return StatusFromErrorResponse(
directory_client.GetUrl(), response.RawResponse.get(),
"Failed to delete a directory: " + location.path);
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete a directory: " + location.path + ": " +
directory_client.GetUrl(),
exception);
}
} else {
auto container_client =
blob_service_client_->GetBlobContainerClient(location.container);
Azure::Storage::Blobs::ListBlobsOptions options;
options.Prefix = internal::EnsureTrailingSlash(location.path);
// https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks
//
// Only supports up to 256 subrequests in a single batch. The
// size of the body for a batch request can't exceed 4 MB.
const int32_t kNumMaxRequestsInBatch = 256;
options.PageSizeHint = kNumMaxRequestsInBatch;
try {
auto list_response = container_client.ListBlobs(options);
while (list_response.HasPage() && !list_response.Blobs.empty()) {
auto batch = container_client.CreateBatch();
std::vector<Azure::Storage::DeferredResponse<
Azure::Storage::Blobs::Models::DeleteBlobResult>>
deferred_responses;
for (const auto& blob_item : list_response.Blobs) {
deferred_responses.push_back(batch.DeleteBlob(blob_item.Name));
}
try {
container_client.SubmitBatch(batch);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
}
std::vector<std::string> failed_blob_names;
for (size_t i = 0; i < deferred_responses.size(); ++i) {
const auto& deferred_response = deferred_responses[i];
bool success = true;
try {
auto delete_result = deferred_response.GetResponse();
success = delete_result.Value.Deleted;
} catch (const Azure::Storage::StorageException& exception) {
success = false;
}
if (!success) {
const auto& blob_item = list_response.Blobs[i];
failed_blob_names.push_back(blob_item.Name);
}
}
if (!failed_blob_names.empty()) {
if (failed_blob_names.size() == 1) {
return Status::IOError("Failed to delete a blob: ", failed_blob_names[0],
": " + container_client.GetUrl());
} else {
return Status::IOError(
"Failed to delete blobs: [",
arrow::internal::JoinStrings(failed_blob_names, ", "),
"]: " + container_client.GetUrl());
}
}
list_response.MoveToNextPage();
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to list blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
}
return Status::OK();
}
}
};

const AzureOptions& AzureFileSystem::options() const { return impl_->options(); }
Expand Down Expand Up @@ -1003,7 +1116,8 @@ Status AzureFileSystem::CreateDir(const std::string& path, bool recursive) {
}

Status AzureFileSystem::DeleteDir(const std::string& path) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
return impl_->DeleteDir(location);
}

Status AzureFileSystem::DeleteDirContents(const std::string& path, bool missing_dir_ok) {
Expand Down
164 changes: 160 additions & 4 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "arrow/testing/util.h"
#include "arrow/util/io_util.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/string.h"
#include "arrow/util/value_parsing.h"

Expand Down Expand Up @@ -92,9 +93,15 @@ class AzuriteEnv : public ::testing::Environment {
return;
}
auto temp_dir_ = *TemporaryDir::Make("azurefs-test-");
server_process_ = bp::child(boost::this_process::environment(), exe_path, "--silent",
"--location", temp_dir_->path().ToString(), "--debug",
temp_dir_->path().ToString() + "/debug.log");
auto debug_log_path_result = temp_dir_->path().Join("debug.log");
if (!debug_log_path_result.ok()) {
status_ = debug_log_path_result.status();
return;
}
debug_log_path_ = *debug_log_path_result;
server_process_ =
bp::child(boost::this_process::environment(), exe_path, "--silent", "--location",
temp_dir_->path().ToString(), "--debug", debug_log_path_.ToString());
if (!(server_process_.valid() && server_process_.running())) {
auto error = "Could not start Azurite emulator.";
server_process_.terminate();
Expand All @@ -110,6 +117,44 @@ class AzuriteEnv : public ::testing::Environment {
server_process_.wait();
}

Result<int64_t> GetDebugLogSize() {
ARROW_ASSIGN_OR_RAISE(auto exists, arrow::internal::FileExists(debug_log_path_));
if (!exists) {
return 0;
}
ARROW_ASSIGN_OR_RAISE(auto file_descriptor,
arrow::internal::FileOpenReadable(debug_log_path_));
ARROW_RETURN_NOT_OK(arrow::internal::FileSeek(file_descriptor.fd(), 0, SEEK_END));
return arrow::internal::FileTell(file_descriptor.fd());
}

Status DumpDebugLog(int64_t position = 0) {
ARROW_ASSIGN_OR_RAISE(auto exists, arrow::internal::FileExists(debug_log_path_));
if (!exists) {
return Status::OK();
}
ARROW_ASSIGN_OR_RAISE(auto file_descriptor,
arrow::internal::FileOpenReadable(debug_log_path_));
if (position > 0) {
ARROW_RETURN_NOT_OK(arrow::internal::FileSeek(file_descriptor.fd(), position));
}
std::vector<uint8_t> buffer;
const int64_t buffer_size = 4096;
buffer.reserve(buffer_size);
while (true) {
ARROW_ASSIGN_OR_RAISE(
auto n_read_bytes,
arrow::internal::FileRead(file_descriptor.fd(), buffer.data(), buffer_size));
if (n_read_bytes <= 0) {
break;
}
std::cerr << std::string_view(reinterpret_cast<const char*>(buffer.data()),
n_read_bytes);
}
std::cerr << std::endl;
return Status::OK();
}

const std::string& account_name() const { return account_name_; }
const std::string& account_key() const { return account_key_; }
const Status status() const { return status_; }
Expand All @@ -120,6 +165,7 @@ class AzuriteEnv : public ::testing::Environment {
bp::child server_process_;
Status status_;
std::unique_ptr<TemporaryDir> temp_dir_;
arrow::internal::PlatformFilename debug_log_path_;
};

auto* azurite_env = ::testing::AddGlobalTestEnvironment(new AzuriteEnv);
Expand Down Expand Up @@ -244,15 +290,28 @@ class AzureFileSystemTest : public ::testing::Test {
};

class AzuriteFileSystemTest : public AzureFileSystemTest {
Result<AzureOptions> MakeOptions() {
Result<AzureOptions> MakeOptions() override {
EXPECT_THAT(GetAzuriteEnv(), NotNull());
ARROW_EXPECT_OK(GetAzuriteEnv()->status());
ARROW_ASSIGN_OR_RAISE(debug_log_start_, GetAzuriteEnv()->GetDebugLogSize());
AzureOptions options;
options.backend = AzureBackend::Azurite;
ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(
GetAzuriteEnv()->account_name(), GetAzuriteEnv()->account_key()));
return options;
}

void TearDown() override {
AzureFileSystemTest::TearDown();
if (HasFailure()) {
// XXX: This may not include all logs in the target test because
// Azurite doesn't flush debug logs immediately... You may want
// to check the log manually...
ARROW_IGNORE_EXPR(GetAzuriteEnv()->DumpDebugLog(debug_log_start_));
}
}

int64_t debug_log_start_ = 0;
};

class AzureFlatNamespaceFileSystemTest : public AzureFileSystemTest {
Expand Down Expand Up @@ -510,6 +569,103 @@ TEST_F(AzuriteFileSystemTest, CreateDirUri) {
ASSERT_RAISES(Invalid, fs_->CreateDir("abfs://" + RandomContainerName(), true));
}

TEST_F(AzuriteFileSystemTest, DeleteDirSuccessContainer) {
const auto container_name = RandomContainerName();
ASSERT_OK(fs_->CreateDir(container_name));
arrow::fs::AssertFileInfo(fs_.get(), container_name, FileType::Directory);
ASSERT_OK(fs_->DeleteDir(container_name));
arrow::fs::AssertFileInfo(fs_.get(), container_name, FileType::NotFound);
}

TEST_F(AzuriteFileSystemTest, DeleteDirSuccessEmpty) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
// There is only virtual directory without hierarchical namespace
// support. So the CreateDir() and DeleteDir() do nothing.
ASSERT_OK(fs_->CreateDir(directory_path));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
ASSERT_OK(fs_->DeleteDir(directory_path));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
}

TEST_F(AzuriteFileSystemTest, DeleteDirSuccessNonexistent) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
// There is only virtual directory without hierarchical namespace
// support. So the DeleteDir() for nonexistent directory does nothing.
ASSERT_OK(fs_->DeleteDir(directory_path));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
}

TEST_F(AzuriteFileSystemTest, DeleteDirSuccessHaveBlobs) {
#ifdef __APPLE__
GTEST_SKIP() << "This test fails by an Azurite problem: "
"https://github.com/Azure/Azurite/pull/2302";
#endif
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
// We must use 257 or more blobs here to test pagination of ListBlobs().
// Because we can't add 257 or more delete blob requests to one SubmitBatch().
int64_t n_blobs = 257;
for (int64_t i = 0; i < n_blobs; ++i) {
const auto blob_path =
internal::ConcatAbstractPath(directory_path, std::to_string(i) + ".txt");
ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(blob_path));
ASSERT_OK(output->Write(std::string_view(std::to_string(i))));
ASSERT_OK(output->Close());
arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::File);
}
ASSERT_OK(fs_->DeleteDir(directory_path));
for (int64_t i = 0; i < n_blobs; ++i) {
const auto blob_path =
internal::ConcatAbstractPath(directory_path, std::to_string(i) + ".txt");
arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::NotFound);
}
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirSuccessEmpty) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_OK(fs_->CreateDir(directory_path, true));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::Directory);
ASSERT_OK(fs_->DeleteDir(directory_path));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirFailureNonexistent) {
const auto path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_RAISES(IOError, fs_->DeleteDir(path));
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirSuccessHaveBlob) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
const auto blob_path = internal::ConcatAbstractPath(directory_path, "hello.txt");
ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(blob_path));
ASSERT_OK(output->Write(std::string_view("hello")));
ASSERT_OK(output->Close());
arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::File);
ASSERT_OK(fs_->DeleteDir(directory_path));
arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::NotFound);
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirSuccessHaveDirectory) {
const auto parent =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
const auto path = internal::ConcatAbstractPath(parent, "new-sub");
ASSERT_OK(fs_->CreateDir(path, true));
arrow::fs::AssertFileInfo(fs_.get(), path, FileType::Directory);
arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::Directory);
ASSERT_OK(fs_->DeleteDir(parent));
arrow::fs::AssertFileInfo(fs_.get(), path, FileType::NotFound);
arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::NotFound);
}

TEST_F(AzuriteFileSystemTest, DeleteDirUri) {
ASSERT_RAISES(Invalid, fs_->DeleteDir("abfs://" + PreexistingContainerPath()));
}

TEST_F(AzuriteFileSystemTest, OpenInputStreamString) {
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
Expand Down

0 comments on commit d23baa1

Please sign in to comment.