Skip to content

Commit

Permalink
apacheGH-38705: [C++][FS][Azure] Implement CopyFile()
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Dec 4, 2023
1 parent f7947cc commit df57308
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 4 deletions.
48 changes: 44 additions & 4 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ struct AzureLocation {
return parent;
}

Result<AzureLocation> join(const std::string& stem) const {
return FromString(internal::ConcatAbstractPath(all, stem));
}

bool has_parent() const { return !path.empty(); }

bool empty() const { return container.empty() && path.empty(); }
Expand Down Expand Up @@ -971,7 +975,7 @@ class AzureFileSystem::Impl {
}

private:
Status DeleteDirContentsWihtoutHierarchicalNamespace(const AzureLocation& location,
Status DeleteDirContentsWithoutHierarchicalNamespace(const AzureLocation& location,
bool missing_dir_ok) {
auto container_client =
blob_service_client_->GetBlobContainerClient(location.container);
Expand Down Expand Up @@ -1092,7 +1096,7 @@ class AzureFileSystem::Impl {
exception);
}
} else {
return DeleteDirContentsWihtoutHierarchicalNamespace(location,
return DeleteDirContentsWithoutHierarchicalNamespace(location,
/*missing_dir_ok=*/true);
}
}
Expand Down Expand Up @@ -1149,7 +1153,41 @@ class AzureFileSystem::Impl {
}
return Status::OK();
} else {
return DeleteDirContentsWihtoutHierarchicalNamespace(location, missing_dir_ok);
return DeleteDirContentsWithoutHierarchicalNamespace(location, missing_dir_ok);
}
}

private:
Status CopyFileInternal(const AzureLocation& src, const AzureLocation& dest) {
if (src == dest) {
return Status::OK();
}
auto dest_blob_client = blob_service_client_->GetBlobContainerClient(dest.container)
.GetBlobClient(dest.path);
auto src_url = blob_service_client_->GetBlobContainerClient(src.container)
.GetBlobClient(src.path)
.GetUrl();
try {
dest_blob_client.CopyFromUri(src_url);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to copy a blob. (" + src_url + " -> " + dest_blob_client.GetUrl() + ")",
exception);
}
return Status::OK();
}

public:
Status CopyFile(const AzureLocation& src, const AzureLocation& dest) {
RETURN_NOT_OK(ValidateFileLocation(src));
if (dest.container.empty()) {
return PathNotFound(dest);
}
if (dest.path.empty() || internal::HasTrailingSlash(dest.path)) {
ARROW_ASSIGN_OR_RAISE(auto real_dest, dest.join(src.path_parts.back()));
return CopyFileInternal(src, real_dest);
} else {
return CopyFileInternal(src, dest);
}
}
};
Expand Down Expand Up @@ -1208,7 +1246,9 @@ Status AzureFileSystem::Move(const std::string& src, const std::string& dest) {
}

Status AzureFileSystem::CopyFile(const std::string& src, const std::string& dest) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto src_location, AzureLocation::FromString(src));
ARROW_ASSIGN_OR_RAISE(auto dest_location, AzureLocation::FromString(dest));
return impl_->CopyFile(src_location, dest_location);
}

Result<std::shared_ptr<io::InputStream>> AzureFileSystem::OpenInputStream(
Expand Down
64 changes: 64 additions & 0 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,70 @@ TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsFailureNonexis
ASSERT_RAISES(IOError, fs_->DeleteDirContents(directory_path, false));
}

TEST_F(AzuriteFileSystemTest, CopyFileSuccessDestinationNonexistent) {
const auto destination_path =
internal::ConcatAbstractPath(PreexistingContainerName(), "copy-destionation");
ASSERT_OK(fs_->CopyFile(PreexistingObjectPath(), destination_path));
ASSERT_OK_AND_ASSIGN(auto info, fs_->GetFileInfo(destination_path));
ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(info));
ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024));
EXPECT_EQ(kLoremIpsum, buffer->ToString());
}

TEST_F(AzuriteFileSystemTest, CopyFileSuccessDestinationSame) {
ASSERT_OK(fs_->CopyFile(PreexistingObjectPath(), PreexistingObjectPath()));
ASSERT_OK_AND_ASSIGN(auto info, fs_->GetFileInfo(PreexistingObjectPath()));
ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(info));
ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024));
EXPECT_EQ(kLoremIpsum, buffer->ToString());
}

TEST_F(AzuriteFileSystemTest, CopyFileSuccessDestinationContainer) {
const auto container_name = RandomContainerName();
ASSERT_OK(fs_->CreateDir(container_name));
ASSERT_OK(fs_->CopyFile(PreexistingObjectPath(), container_name));
const auto destination_path =
internal::ConcatAbstractPath(container_name, PreexistingObjectName());
ASSERT_OK_AND_ASSIGN(auto info, fs_->GetFileInfo(destination_path));
ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(info));
ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024));
EXPECT_EQ(kLoremIpsum, buffer->ToString());
}

TEST_F(AzuriteFileSystemTest, CopyFileSuccessDestinationDirectory) {
const auto directory_path = internal::EnsureTrailingSlash(
internal::ConcatAbstractPath(RandomContainerName(), RandomDirectoryName()));
ASSERT_OK(fs_->CreateDir(directory_path, true));
ASSERT_OK(fs_->CopyFile(PreexistingObjectPath(), directory_path));
const auto destination_path =
internal::ConcatAbstractPath(directory_path, PreexistingObjectName());
ASSERT_OK_AND_ASSIGN(auto info, fs_->GetFileInfo(destination_path));
ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(info));
ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024));
EXPECT_EQ(kLoremIpsum, buffer->ToString());
}

TEST_F(AzuriteFileSystemTest, CopyFileFailureSourceNonexistent) {
const auto destination_path =
internal::ConcatAbstractPath(PreexistingContainerName(), "copy-destionation");
ASSERT_RAISES(IOError, fs_->CopyFile(NotFoundObjectPath(), destination_path));
}

TEST_F(AzuriteFileSystemTest, CopyFileFailureDestinationParentNonexistent) {
const auto destination_path =
internal::ConcatAbstractPath(RandomContainerName(), "copy-destionation");
ASSERT_RAISES(IOError, fs_->CopyFile(PreexistingObjectPath(), destination_path));
}

TEST_F(AzuriteFileSystemTest, CopyFileUri) {
const auto destination_path =
internal::ConcatAbstractPath(PreexistingContainerName(), "copy-destionation");
ASSERT_RAISES(Invalid,
fs_->CopyFile("abfs://" + PreexistingObjectPath(), destination_path));
ASSERT_RAISES(Invalid,
fs_->CopyFile(PreexistingObjectPath(), "abfs://" + destination_path));
}

TEST_F(AzuriteFileSystemTest, OpenInputStreamString) {
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
Expand Down

0 comments on commit df57308

Please sign in to comment.