From df573088dbdc7dc5a38803fdf87260af7b6180e3 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Mon, 4 Dec 2023 17:30:08 +0900 Subject: [PATCH] GH-38705: [C++][FS][Azure] Implement CopyFile() --- cpp/src/arrow/filesystem/azurefs.cc | 48 ++++++++++++++++-- cpp/src/arrow/filesystem/azurefs_test.cc | 64 ++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index ecc8d06f97567..6a544dc5ed719 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -115,6 +115,10 @@ struct AzureLocation { return parent; } + Result join(const std::string& stem) const { + return FromString(internal::ConcatAbstractPath(all, stem)); + } + bool has_parent() const { return !path.empty(); } bool empty() const { return container.empty() && path.empty(); } @@ -971,7 +975,7 @@ class AzureFileSystem::Impl { } private: - Status DeleteDirContentsWihtoutHierarchicalNamespace(const AzureLocation& location, + Status DeleteDirContentsWithoutHierarchicalNamespace(const AzureLocation& location, bool missing_dir_ok) { auto container_client = blob_service_client_->GetBlobContainerClient(location.container); @@ -1092,7 +1096,7 @@ class AzureFileSystem::Impl { exception); } } else { - return DeleteDirContentsWihtoutHierarchicalNamespace(location, + return DeleteDirContentsWithoutHierarchicalNamespace(location, /*missing_dir_ok=*/true); } } @@ -1149,7 +1153,41 @@ class AzureFileSystem::Impl { } return Status::OK(); } else { - return DeleteDirContentsWihtoutHierarchicalNamespace(location, missing_dir_ok); + return DeleteDirContentsWithoutHierarchicalNamespace(location, missing_dir_ok); + } + } + + private: + Status CopyFileInternal(const AzureLocation& src, const AzureLocation& dest) { + if (src == dest) { + return Status::OK(); + } + auto dest_blob_client = blob_service_client_->GetBlobContainerClient(dest.container) + .GetBlobClient(dest.path); + auto src_url = blob_service_client_->GetBlobContainerClient(src.container) + .GetBlobClient(src.path) + .GetUrl(); + try { + dest_blob_client.CopyFromUri(src_url); + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "Failed to copy a blob. (" + src_url + " -> " + dest_blob_client.GetUrl() + ")", + exception); + } + return Status::OK(); + } + + public: + Status CopyFile(const AzureLocation& src, const AzureLocation& dest) { + RETURN_NOT_OK(ValidateFileLocation(src)); + if (dest.container.empty()) { + return PathNotFound(dest); + } + if (dest.path.empty() || internal::HasTrailingSlash(dest.path)) { + ARROW_ASSIGN_OR_RAISE(auto real_dest, dest.join(src.path_parts.back())); + return CopyFileInternal(src, real_dest); + } else { + return CopyFileInternal(src, dest); } } }; @@ -1208,7 +1246,9 @@ Status AzureFileSystem::Move(const std::string& src, const std::string& dest) { } Status AzureFileSystem::CopyFile(const std::string& src, const std::string& dest) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + ARROW_ASSIGN_OR_RAISE(auto src_location, AzureLocation::FromString(src)); + ARROW_ASSIGN_OR_RAISE(auto dest_location, AzureLocation::FromString(dest)); + return impl_->CopyFile(src_location, dest_location); } Result> AzureFileSystem::OpenInputStream( diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index b6f75987693c5..71d85e10585bd 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -771,6 +771,70 @@ TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsFailureNonexis ASSERT_RAISES(IOError, fs_->DeleteDirContents(directory_path, false)); } +TEST_F(AzuriteFileSystemTest, CopyFileSuccessDestinationNonexistent) { + const auto destination_path = + internal::ConcatAbstractPath(PreexistingContainerName(), "copy-destionation"); + ASSERT_OK(fs_->CopyFile(PreexistingObjectPath(), destination_path)); + ASSERT_OK_AND_ASSIGN(auto info, fs_->GetFileInfo(destination_path)); + ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(info)); + ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024)); + EXPECT_EQ(kLoremIpsum, buffer->ToString()); +} + +TEST_F(AzuriteFileSystemTest, CopyFileSuccessDestinationSame) { + ASSERT_OK(fs_->CopyFile(PreexistingObjectPath(), PreexistingObjectPath())); + ASSERT_OK_AND_ASSIGN(auto info, fs_->GetFileInfo(PreexistingObjectPath())); + ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(info)); + ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024)); + EXPECT_EQ(kLoremIpsum, buffer->ToString()); +} + +TEST_F(AzuriteFileSystemTest, CopyFileSuccessDestinationContainer) { + const auto container_name = RandomContainerName(); + ASSERT_OK(fs_->CreateDir(container_name)); + ASSERT_OK(fs_->CopyFile(PreexistingObjectPath(), container_name)); + const auto destination_path = + internal::ConcatAbstractPath(container_name, PreexistingObjectName()); + ASSERT_OK_AND_ASSIGN(auto info, fs_->GetFileInfo(destination_path)); + ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(info)); + ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024)); + EXPECT_EQ(kLoremIpsum, buffer->ToString()); +} + +TEST_F(AzuriteFileSystemTest, CopyFileSuccessDestinationDirectory) { + const auto directory_path = internal::EnsureTrailingSlash( + internal::ConcatAbstractPath(RandomContainerName(), RandomDirectoryName())); + ASSERT_OK(fs_->CreateDir(directory_path, true)); + ASSERT_OK(fs_->CopyFile(PreexistingObjectPath(), directory_path)); + const auto destination_path = + internal::ConcatAbstractPath(directory_path, PreexistingObjectName()); + ASSERT_OK_AND_ASSIGN(auto info, fs_->GetFileInfo(destination_path)); + ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(info)); + ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024)); + EXPECT_EQ(kLoremIpsum, buffer->ToString()); +} + +TEST_F(AzuriteFileSystemTest, CopyFileFailureSourceNonexistent) { + const auto destination_path = + internal::ConcatAbstractPath(PreexistingContainerName(), "copy-destionation"); + ASSERT_RAISES(IOError, fs_->CopyFile(NotFoundObjectPath(), destination_path)); +} + +TEST_F(AzuriteFileSystemTest, CopyFileFailureDestinationParentNonexistent) { + const auto destination_path = + internal::ConcatAbstractPath(RandomContainerName(), "copy-destionation"); + ASSERT_RAISES(IOError, fs_->CopyFile(PreexistingObjectPath(), destination_path)); +} + +TEST_F(AzuriteFileSystemTest, CopyFileUri) { + const auto destination_path = + internal::ConcatAbstractPath(PreexistingContainerName(), "copy-destionation"); + ASSERT_RAISES(Invalid, + fs_->CopyFile("abfs://" + PreexistingObjectPath(), destination_path)); + ASSERT_RAISES(Invalid, + fs_->CopyFile(PreexistingObjectPath(), "abfs://" + destination_path)); +} + TEST_F(AzuriteFileSystemTest, OpenInputStreamString) { std::shared_ptr stream; ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));