diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 2c3d81ca24c51..4dde275da135f 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -969,6 +969,119 @@ class AzureFileSystem::Impl { RETURN_NOT_OK(stream->Init()); return stream; } + + Status DeleteDir(const AzureLocation& location) { + if (location.container.empty()) { + return Status::Invalid("Cannot delete an empty container"); + } + + if (location.path.empty()) { + auto container_client = + blob_service_client_->GetBlobContainerClient(location.container); + try { + auto response = container_client.Delete(); + if (response.Value.Deleted) { + return Status::OK(); + } else { + return StatusFromErrorResponse( + container_client.GetUrl(), response.RawResponse.get(), + "Failed to delete a container: " + location.container); + } + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "Failed to delete a container: " + location.container + ": " + + container_client.GetUrl(), + exception); + } + } + + ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled, + hierarchical_namespace_.Enabled(location.container)); + if (hierarchical_namespace_enabled) { + auto directory_client = + datalake_service_client_->GetFileSystemClient(location.container) + .GetDirectoryClient(location.path); + try { + auto response = directory_client.DeleteRecursive(); + if (response.Value.Deleted) { + return Status::OK(); + } else { + return StatusFromErrorResponse( + directory_client.GetUrl(), response.RawResponse.get(), + "Failed to delete a directory: " + location.path); + } + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "Failed to delete a directory: " + location.path + ": " + + directory_client.GetUrl(), + exception); + } + } else { + auto container_client = + blob_service_client_->GetBlobContainerClient(location.container); + Azure::Storage::Blobs::ListBlobsOptions options; + options.Prefix = internal::EnsureTrailingSlash(location.path); + // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks + // + // Only supports up to 256 subrequests in a single batch. The + // size of the body for a batch request can't exceed 4 MB. + const int32_t kNumMaxRequestsInBatch = 256; + options.PageSizeHint = kNumMaxRequestsInBatch; + try { + auto list_response = container_client.ListBlobs(options); + while (list_response.HasPage() && !list_response.Blobs.empty()) { + auto batch = container_client.CreateBatch(); + std::vector> + deferred_responses; + for (const auto& blob_item : list_response.Blobs) { + deferred_responses.push_back(batch.DeleteBlob(blob_item.Name)); + } + try { + container_client.SubmitBatch(batch); + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "Failed to delete blobs in a directory: " + location.path + ": " + + container_client.GetUrl(), + exception); + } + std::vector failed_blob_names; + for (size_t i = 0; i < deferred_responses.size(); ++i) { + const auto& deferred_response = deferred_responses[i]; + bool success = true; + try { + auto delete_result = deferred_response.GetResponse(); + success = delete_result.Value.Deleted; + } catch (const Azure::Storage::StorageException& exception) { + success = false; + } + if (!success) { + const auto& blob_item = list_response.Blobs[i]; + failed_blob_names.push_back(blob_item.Name); + } + } + if (!failed_blob_names.empty()) { + if (failed_blob_names.size() == 1) { + return Status::IOError("Failed to delete a blob: ", failed_blob_names[0], + ": " + container_client.GetUrl()); + } else { + return Status::IOError( + "Failed to delete blobs: [", + arrow::internal::JoinStrings(failed_blob_names, ", "), + "]: " + container_client.GetUrl()); + } + } + list_response.MoveToNextPage(); + } + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "Failed to list blobs in a directory: " + location.path + ": " + + container_client.GetUrl(), + exception); + } + return Status::OK(); + } + } }; const AzureOptions& AzureFileSystem::options() const { return impl_->options(); } @@ -1003,7 +1116,8 @@ Status AzureFileSystem::CreateDir(const std::string& path, bool recursive) { } Status AzureFileSystem::DeleteDir(const std::string& path) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path)); + return impl_->DeleteDir(location); } Status AzureFileSystem::DeleteDirContents(const std::string& path, bool missing_dir_ok) { diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index e9b9a6f34b88c..7c86385126d40 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -56,6 +56,7 @@ #include "arrow/testing/util.h" #include "arrow/util/io_util.h" #include "arrow/util/key_value_metadata.h" +#include "arrow/util/logging.h" #include "arrow/util/string.h" #include "arrow/util/value_parsing.h" @@ -92,9 +93,15 @@ class AzuriteEnv : public ::testing::Environment { return; } auto temp_dir_ = *TemporaryDir::Make("azurefs-test-"); - server_process_ = bp::child(boost::this_process::environment(), exe_path, "--silent", - "--location", temp_dir_->path().ToString(), "--debug", - temp_dir_->path().ToString() + "/debug.log"); + auto debug_log_path_result = temp_dir_->path().Join("debug.log"); + if (!debug_log_path_result.ok()) { + status_ = debug_log_path_result.status(); + return; + } + debug_log_path_ = *debug_log_path_result; + server_process_ = + bp::child(boost::this_process::environment(), exe_path, "--silent", "--location", + temp_dir_->path().ToString(), "--debug", debug_log_path_.ToString()); if (!(server_process_.valid() && server_process_.running())) { auto error = "Could not start Azurite emulator."; server_process_.terminate(); @@ -110,6 +117,44 @@ class AzuriteEnv : public ::testing::Environment { server_process_.wait(); } + Result GetDebugLogSize() { + ARROW_ASSIGN_OR_RAISE(auto exists, arrow::internal::FileExists(debug_log_path_)); + if (!exists) { + return 0; + } + ARROW_ASSIGN_OR_RAISE(auto file_descriptor, + arrow::internal::FileOpenReadable(debug_log_path_)); + ARROW_RETURN_NOT_OK(arrow::internal::FileSeek(file_descriptor.fd(), 0, SEEK_END)); + return arrow::internal::FileTell(file_descriptor.fd()); + } + + Status DumpDebugLog(int64_t position = 0) { + ARROW_ASSIGN_OR_RAISE(auto exists, arrow::internal::FileExists(debug_log_path_)); + if (!exists) { + return Status::OK(); + } + ARROW_ASSIGN_OR_RAISE(auto file_descriptor, + arrow::internal::FileOpenReadable(debug_log_path_)); + if (position > 0) { + ARROW_RETURN_NOT_OK(arrow::internal::FileSeek(file_descriptor.fd(), position)); + } + std::vector buffer; + const int64_t buffer_size = 4096; + buffer.reserve(buffer_size); + while (true) { + ARROW_ASSIGN_OR_RAISE( + auto n_read_bytes, + arrow::internal::FileRead(file_descriptor.fd(), buffer.data(), buffer_size)); + if (n_read_bytes <= 0) { + break; + } + std::cerr << std::string_view(reinterpret_cast(buffer.data()), + n_read_bytes); + } + std::cerr << std::endl; + return Status::OK(); + } + const std::string& account_name() const { return account_name_; } const std::string& account_key() const { return account_key_; } const Status status() const { return status_; } @@ -120,6 +165,7 @@ class AzuriteEnv : public ::testing::Environment { bp::child server_process_; Status status_; std::unique_ptr temp_dir_; + arrow::internal::PlatformFilename debug_log_path_; }; auto* azurite_env = ::testing::AddGlobalTestEnvironment(new AzuriteEnv); @@ -244,15 +290,28 @@ class AzureFileSystemTest : public ::testing::Test { }; class AzuriteFileSystemTest : public AzureFileSystemTest { - Result MakeOptions() { + Result MakeOptions() override { EXPECT_THAT(GetAzuriteEnv(), NotNull()); ARROW_EXPECT_OK(GetAzuriteEnv()->status()); + ARROW_ASSIGN_OR_RAISE(debug_log_start_, GetAzuriteEnv()->GetDebugLogSize()); AzureOptions options; options.backend = AzureBackend::Azurite; ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials( GetAzuriteEnv()->account_name(), GetAzuriteEnv()->account_key())); return options; } + + void TearDown() override { + AzureFileSystemTest::TearDown(); + if (HasFailure()) { + // XXX: This may not include all logs in the target test because + // Azurite doesn't flush debug logs immediately... You may want + // to check the log manually... + ARROW_IGNORE_EXPR(GetAzuriteEnv()->DumpDebugLog(debug_log_start_)); + } + } + + int64_t debug_log_start_ = 0; }; class AzureFlatNamespaceFileSystemTest : public AzureFileSystemTest { @@ -510,6 +569,103 @@ TEST_F(AzuriteFileSystemTest, CreateDirUri) { ASSERT_RAISES(Invalid, fs_->CreateDir("abfs://" + RandomContainerName(), true)); } +TEST_F(AzuriteFileSystemTest, DeleteDirSuccessContainer) { + const auto container_name = RandomContainerName(); + ASSERT_OK(fs_->CreateDir(container_name)); + arrow::fs::AssertFileInfo(fs_.get(), container_name, FileType::Directory); + ASSERT_OK(fs_->DeleteDir(container_name)); + arrow::fs::AssertFileInfo(fs_.get(), container_name, FileType::NotFound); +} + +TEST_F(AzuriteFileSystemTest, DeleteDirSuccessEmpty) { + const auto directory_path = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + // There is only virtual directory without hierarchical namespace + // support. So the CreateDir() and DeleteDir() do nothing. + ASSERT_OK(fs_->CreateDir(directory_path)); + arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound); + ASSERT_OK(fs_->DeleteDir(directory_path)); + arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound); +} + +TEST_F(AzuriteFileSystemTest, DeleteDirSuccessNonexistent) { + const auto directory_path = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + // There is only virtual directory without hierarchical namespace + // support. So the DeleteDir() for nonexistent directory does nothing. + ASSERT_OK(fs_->DeleteDir(directory_path)); + arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound); +} + +TEST_F(AzuriteFileSystemTest, DeleteDirSuccessHaveBlobs) { +#ifdef __APPLE__ + GTEST_SKIP() << "This test fails by an Azurite problem: " + "https://github.com/Azure/Azurite/pull/2302"; +#endif + const auto directory_path = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + // We must use 257 or more blobs here to test pagination of ListBlobs(). + // Because we can't add 257 or more delete blob requests to one SubmitBatch(). + int64_t n_blobs = 257; + for (int64_t i = 0; i < n_blobs; ++i) { + const auto blob_path = + internal::ConcatAbstractPath(directory_path, std::to_string(i) + ".txt"); + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(blob_path)); + ASSERT_OK(output->Write(std::string_view(std::to_string(i)))); + ASSERT_OK(output->Close()); + arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::File); + } + ASSERT_OK(fs_->DeleteDir(directory_path)); + for (int64_t i = 0; i < n_blobs; ++i) { + const auto blob_path = + internal::ConcatAbstractPath(directory_path, std::to_string(i) + ".txt"); + arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::NotFound); + } +} + +TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirSuccessEmpty) { + const auto directory_path = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + ASSERT_OK(fs_->CreateDir(directory_path, true)); + arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::Directory); + ASSERT_OK(fs_->DeleteDir(directory_path)); + arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound); +} + +TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirFailureNonexistent) { + const auto path = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + ASSERT_RAISES(IOError, fs_->DeleteDir(path)); +} + +TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirSuccessHaveBlob) { + const auto directory_path = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + const auto blob_path = internal::ConcatAbstractPath(directory_path, "hello.txt"); + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(blob_path)); + ASSERT_OK(output->Write(std::string_view("hello"))); + ASSERT_OK(output->Close()); + arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::File); + ASSERT_OK(fs_->DeleteDir(directory_path)); + arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::NotFound); +} + +TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirSuccessHaveDirectory) { + const auto parent = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + const auto path = internal::ConcatAbstractPath(parent, "new-sub"); + ASSERT_OK(fs_->CreateDir(path, true)); + arrow::fs::AssertFileInfo(fs_.get(), path, FileType::Directory); + arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::Directory); + ASSERT_OK(fs_->DeleteDir(parent)); + arrow::fs::AssertFileInfo(fs_.get(), path, FileType::NotFound); + arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::NotFound); +} + +TEST_F(AzuriteFileSystemTest, DeleteDirUri) { + ASSERT_RAISES(Invalid, fs_->DeleteDir("abfs://" + PreexistingContainerPath())); +} + TEST_F(AzuriteFileSystemTest, OpenInputStreamString) { std::shared_ptr stream; ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));