diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index 742f822c6ea6c..3587b949fe9de 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -75,11 +75,12 @@ class GcsInputStream : public arrow::io::InputStream { public: explicit GcsInputStream(gcs::ObjectReadStream stream, std::string bucket_name, std::string object_name, gcs::Generation generation, - gcs::Client client) + gcs::ReadFromOffset offset, gcs::Client client) : stream_(std::move(stream)), bucket_name_(std::move(bucket_name)), object_name_(std::move(object_name)), generation_(generation), + offset_(offset.value_or(0)), client_(std::move(client)) {} ~GcsInputStream() override = default; @@ -95,7 +96,7 @@ class GcsInputStream : public arrow::io::InputStream { if (!stream_) { return Status::IOError("invalid stream"); } - return stream_.tellg(); + return stream_.tellg() + offset_; } bool closed() const override { return !stream_.IsOpen(); } @@ -132,6 +133,7 @@ class GcsInputStream : public arrow::io::InputStream { std::string bucket_name_; std::string object_name_; gcs::Generation generation_; + std::int64_t offset_; gcs::Client client_; }; @@ -172,6 +174,79 @@ class GcsOutputStream : public arrow::io::OutputStream { int64_t tell_ = 0; }; +using InputStreamFactory = std::function>( + const std::string&, const std::string&, gcs::Generation, gcs::ReadFromOffset)>; + +class GcsRandomAccessFile : public arrow::io::RandomAccessFile { + public: + GcsRandomAccessFile(InputStreamFactory factory, gcs::ObjectMetadata metadata, + std::shared_ptr stream) + : factory_(std::move(factory)), + metadata_(std::move(metadata)), + stream_(std::move(stream)) {} + ~GcsRandomAccessFile() override = default; + + //@{ + // @name FileInterface + Status Close() override { return stream_->Close(); } + Status Abort() override { return stream_->Abort(); } + Result Tell() const override { return stream_->Tell(); } + bool closed() const override { return stream_->closed(); } + //@} + + //@{ + // @name Readable + Result Read(int64_t nbytes, void* out) override { + return stream_->Read(nbytes, out); + } + Result> Read(int64_t nbytes) override { + return stream_->Read(nbytes); + } + const arrow::io::IOContext& io_context() const override { + return stream_->io_context(); + } + //@} + + //@{ + // @name InputStream + Result> ReadMetadata() override { + return internal::FromObjectMetadata(metadata_); + } + //@} + + //@{ + // @name RandomAccessFile + Result GetSize() override { return metadata_.size(); } + Result ReadAt(int64_t position, int64_t nbytes, void* out) override { + std::shared_ptr stream; + ARROW_ASSIGN_OR_RAISE(stream, factory_(metadata_.bucket(), metadata_.name(), + gcs::Generation(metadata_.generation()), + gcs::ReadFromOffset(position))); + return stream->Read(nbytes, out); + } + Result> ReadAt(int64_t position, int64_t nbytes) override { + std::shared_ptr stream; + ARROW_ASSIGN_OR_RAISE(stream, factory_(metadata_.bucket(), metadata_.name(), + gcs::Generation(metadata_.generation()), + gcs::ReadFromOffset(position))); + return stream->Read(nbytes); + } + //@} + + // from Seekable + Status Seek(int64_t position) override { + ARROW_ASSIGN_OR_RAISE(stream_, factory_(metadata_.bucket(), metadata_.name(), + gcs::Generation(metadata_.generation()), + gcs::ReadFromOffset(position))); + return Status::OK(); + } + + private: + InputStreamFactory factory_; + gcs::ObjectMetadata metadata_; + std::shared_ptr stream_; +}; + } // namespace google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) { @@ -222,11 +297,14 @@ class GcsFileSystem::Impl { return internal::ToArrowStatus(metadata.status()); } - Result> OpenInputStream(const GcsPath& path) { - auto stream = client_.ReadObject(path.bucket, path.object); + Result> OpenInputStream(const std::string& bucket_name, + const std::string& object_name, + gcs::Generation generation, + gcs::ReadFromOffset offset) { + auto stream = client_.ReadObject(bucket_name, object_name, generation, offset); ARROW_GCS_RETURN_NOT_OK(stream.status()); - return std::make_shared(std::move(stream), path.bucket, path.object, - gcs::Generation(), client_); + return std::make_shared(std::move(stream), bucket_name, object_name, + gcs::Generation(), offset, client_); } Result> OpenOutputStream( @@ -246,6 +324,10 @@ class GcsFileSystem::Impl { return std::make_shared(std::move(stream)); } + google::cloud::StatusOr GetObjectMetadata(const GcsPath& path) { + return client_.GetObjectMetadata(path.bucket, path.object); + } + private: static Result GetFileInfoImpl(const GcsPath& path, const google::cloud::Status& status, @@ -324,7 +406,8 @@ Status GcsFileSystem::CopyFile(const std::string& src, const std::string& dest) Result> GcsFileSystem::OpenInputStream( const std::string& path) { ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path)); - return impl_->OpenInputStream(p); + return impl_->OpenInputStream(p.bucket, p.object, gcs::Generation(), + gcs::ReadFromOffset()); } Result> GcsFileSystem::OpenInputStream( @@ -333,17 +416,46 @@ Result> GcsFileSystem::OpenInputStream( return Status::IOError("Only files can be opened as input streams"); } ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path())); - return impl_->OpenInputStream(p); + return impl_->OpenInputStream(p.bucket, p.object, gcs::Generation(), + gcs::ReadFromOffset()); } Result> GcsFileSystem::OpenInputFile( const std::string& path) { - return Status::NotImplemented("The GCS FileSystem is not fully implemented"); + ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path)); + auto metadata = impl_->GetObjectMetadata(p); + ARROW_GCS_RETURN_NOT_OK(metadata.status()); + auto impl = impl_; + auto open_stream = [impl](const std::string& b, const std::string& o, gcs::Generation g, + gcs::ReadFromOffset offset) { + return impl->OpenInputStream(b, o, g, offset); + }; + ARROW_ASSIGN_OR_RAISE( + auto stream, + impl_->OpenInputStream(p.bucket, p.object, gcs::Generation(metadata->generation()), + gcs::ReadFromOffset())); + + return std::make_shared(std::move(open_stream), + *std::move(metadata), std::move(stream)); } Result> GcsFileSystem::OpenInputFile( const FileInfo& info) { - return Status::NotImplemented("The GCS FileSystem is not fully implemented"); + ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path())); + auto metadata = impl_->GetObjectMetadata(p); + ARROW_GCS_RETURN_NOT_OK(metadata.status()); + auto impl = impl_; + auto open_stream = [impl](const std::string& b, const std::string& o, gcs::Generation g, + gcs::ReadFromOffset offset) { + return impl->OpenInputStream(b, o, g, offset); + }; + ARROW_ASSIGN_OR_RAISE( + auto stream, + impl_->OpenInputStream(p.bucket, p.object, gcs::Generation(metadata->generation()), + gcs::ReadFromOffset())); + + return std::make_shared(std::move(open_stream), + *std::move(metadata), std::move(stream)); } Result> GcsFileSystem::OpenOutputStream( diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc b/cpp/src/arrow/filesystem/gcsfs_test.cc index 730f7fce86ee4..9849cc68b573a 100644 --- a/cpp/src/arrow/filesystem/gcsfs_test.cc +++ b/cpp/src/arrow/filesystem/gcsfs_test.cc @@ -27,6 +27,7 @@ #include #include +#include #include #include "arrow/filesystem/gcsfs_internal.h" @@ -73,6 +74,8 @@ class GcsIntegrationTest : public ::testing::Test { protected: void SetUp() override { + // Initialize a PRNG with a small amount of entropy. + generator_ = std::mt19937_64(std::random_device()()); port_ = std::to_string(GetListenPort()); auto exe_path = bp::search_path("python3"); ASSERT_THAT(exe_path, Not(IsEmpty())); @@ -119,7 +122,23 @@ class GcsIntegrationTest : public ::testing::Test { .set(gc::MakeInsecureCredentials())); } + std::string RandomLine(int lineno, std::size_t width) { + auto const fillers = std::string("abcdefghijlkmnopqrstuvwxyz0123456789"); + std::uniform_int_distribution d(0, fillers.size() - 1); + + auto line = std::to_string(lineno) + ": "; + std::generate_n(std::back_inserter(line), width - line.size() - 1, + [&] { return fillers[d(generator_)]; }); + line += '\n'; + return line; + } + + int RandomIndex(std::size_t end) { + return std::uniform_int_distribution(0, end - 1)(generator_); + } + private: + std::mt19937_64 generator_; std::string port_; bp::child server_process_; }; @@ -401,8 +420,7 @@ TEST_F(GcsIntegrationTest, ReadObjectInfo) { TEST_F(GcsIntegrationTest, ReadObjectNotFound) { auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); - auto result = fs->OpenInputStream(NotFoundObjectPath()); - EXPECT_EQ(result.status().code(), StatusCode::IOError); + ASSERT_RAISES(IOError, fs->OpenInputStream(NotFoundObjectPath())); } TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) { @@ -410,13 +428,10 @@ TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) { arrow::fs::FileInfo info; ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(kPreexistingBucket)); - - auto result = fs->OpenInputStream(NotFoundObjectPath()); - EXPECT_EQ(result.status().code(), StatusCode::IOError); + ASSERT_RAISES(IOError, fs->OpenInputStream(info)); ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(NotFoundObjectPath())); - result = fs->OpenInputStream(NotFoundObjectPath()); - EXPECT_EQ(result.status().code(), StatusCode::IOError); + ASSERT_RAISES(IOError, fs->OpenInputStream(info)); } TEST_F(GcsIntegrationTest, ReadObjectReadMetadata) { @@ -538,6 +553,126 @@ TEST_F(GcsIntegrationTest, WriteObjectLarge) { EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]); } +TEST_F(GcsIntegrationTest, OpenInputFileMixedReadVsReadAt) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + // Create a file large enough to make the random access tests non-trivial. + auto constexpr kLineWidth = 100; + auto constexpr kLineCount = 4096; + std::vector lines(kLineCount); + int lineno = 0; + std::generate_n(lines.begin(), lines.size(), + [&] { return RandomLine(++lineno, kLineWidth); }); + + const auto path = + kPreexistingBucket + std::string("/OpenInputFileMixedReadVsReadAt/object-name"); + std::shared_ptr output; + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); + for (auto const& line : lines) { + ASSERT_OK(output->Write(line.data(), line.size())); + } + ASSERT_OK(output->Close()); + + std::shared_ptr file; + ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path)); + for (int i = 0; i != 32; ++i) { + SCOPED_TRACE("Iteration " + std::to_string(i)); + // Verify sequential reads work as expected. + std::array buffer{}; + std::int64_t size; + { + ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth)); + EXPECT_EQ(lines[2 * i], actual->ToString()); + } + { + ASSERT_OK_AND_ASSIGN(size, file->Read(buffer.size(), buffer.data())); + EXPECT_EQ(size, kLineWidth); + auto actual = std::string{buffer.begin(), buffer.end()}; + EXPECT_EQ(lines[2 * i + 1], actual); + } + + // Verify random reads interleave too. + auto const index = RandomIndex(kLineCount); + auto const position = index * kLineWidth; + ASSERT_OK_AND_ASSIGN(size, file->ReadAt(position, buffer.size(), buffer.data())); + EXPECT_EQ(size, kLineWidth); + auto actual = std::string{buffer.begin(), buffer.end()}; + EXPECT_EQ(lines[index], actual); + + // Verify random reads using buffers work. + ASSERT_OK_AND_ASSIGN(auto b, file->ReadAt(position, kLineWidth)); + EXPECT_EQ(lines[index], b->ToString()); + } +} + +TEST_F(GcsIntegrationTest, OpenInputFileRandomSeek) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + // Create a file large enough to make the random access tests non-trivial. + auto constexpr kLineWidth = 100; + auto constexpr kLineCount = 4096; + std::vector lines(kLineCount); + int lineno = 0; + std::generate_n(lines.begin(), lines.size(), + [&] { return RandomLine(++lineno, kLineWidth); }); + + const auto path = + kPreexistingBucket + std::string("/OpenInputFileRandomSeek/object-name"); + std::shared_ptr output; + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); + for (auto const& line : lines) { + ASSERT_OK(output->Write(line.data(), line.size())); + } + ASSERT_OK(output->Close()); + + std::shared_ptr file; + ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path)); + for (int i = 0; i != 32; ++i) { + SCOPED_TRACE("Iteration " + std::to_string(i)); + // Verify sequential reads work as expected. + auto const index = RandomIndex(kLineCount); + auto const position = index * kLineWidth; + ASSERT_OK(file->Seek(position)); + ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth)); + EXPECT_EQ(lines[index], actual->ToString()); + } +} + +TEST_F(GcsIntegrationTest, OpenInputFileInfo) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + arrow::fs::FileInfo info; + ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingObjectPath())); + + std::shared_ptr file; + ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(info)); + + std::array buffer{}; + std::int64_t size; + auto constexpr kStart = 16; + ASSERT_OK_AND_ASSIGN(size, file->ReadAt(kStart, buffer.size(), buffer.data())); + + auto const expected = std::string(kLoremIpsum).substr(kStart); + EXPECT_EQ(std::string(buffer.data(), size), expected); +} + +TEST_F(GcsIntegrationTest, OpenInputFileNotFound) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + ASSERT_RAISES(IOError, fs->OpenInputFile(NotFoundObjectPath())); +} + +TEST_F(GcsIntegrationTest, OpenInputFileInfoInvalid) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + arrow::fs::FileInfo info; + ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(kPreexistingBucket)); + ASSERT_RAISES(IOError, fs->OpenInputFile(info)); + + ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(NotFoundObjectPath())); + ASSERT_RAISES(IOError, fs->OpenInputFile(info)); +} + } // namespace } // namespace fs } // namespace arrow