Skip to content

Commit

Permalink
ARROW-14347: [C++] random access files for GcsFileSystem
Browse files Browse the repository at this point in the history
Closes apache#11812 from coryan/ARROW-14347-implement-gcs-random-access-reads

Authored-by: Carlos O'Ryan <coryan@google.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
coryan authored and kou committed Dec 1, 2021
1 parent 4a5ce6f commit faf26e9
Show file tree
Hide file tree
Showing 2 changed files with 264 additions and 17 deletions.
132 changes: 122 additions & 10 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ class GcsInputStream : public arrow::io::InputStream {
public:
explicit GcsInputStream(gcs::ObjectReadStream stream, std::string bucket_name,
std::string object_name, gcs::Generation generation,
gcs::Client client)
gcs::ReadFromOffset offset, gcs::Client client)
: stream_(std::move(stream)),
bucket_name_(std::move(bucket_name)),
object_name_(std::move(object_name)),
generation_(generation),
offset_(offset.value_or(0)),
client_(std::move(client)) {}

~GcsInputStream() override = default;
Expand All @@ -95,7 +96,7 @@ class GcsInputStream : public arrow::io::InputStream {
if (!stream_) {
return Status::IOError("invalid stream");
}
return stream_.tellg();
return stream_.tellg() + offset_;
}

bool closed() const override { return !stream_.IsOpen(); }
Expand Down Expand Up @@ -132,6 +133,7 @@ class GcsInputStream : public arrow::io::InputStream {
std::string bucket_name_;
std::string object_name_;
gcs::Generation generation_;
std::int64_t offset_;
gcs::Client client_;
};

Expand Down Expand Up @@ -172,6 +174,79 @@ class GcsOutputStream : public arrow::io::OutputStream {
int64_t tell_ = 0;
};

using InputStreamFactory = std::function<Result<std::shared_ptr<io::InputStream>>(
const std::string&, const std::string&, gcs::Generation, gcs::ReadFromOffset)>;

class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
public:
GcsRandomAccessFile(InputStreamFactory factory, gcs::ObjectMetadata metadata,
std::shared_ptr<io::InputStream> stream)
: factory_(std::move(factory)),
metadata_(std::move(metadata)),
stream_(std::move(stream)) {}
~GcsRandomAccessFile() override = default;

//@{
// @name FileInterface
Status Close() override { return stream_->Close(); }
Status Abort() override { return stream_->Abort(); }
Result<int64_t> Tell() const override { return stream_->Tell(); }
bool closed() const override { return stream_->closed(); }
//@}

//@{
// @name Readable
Result<int64_t> Read(int64_t nbytes, void* out) override {
return stream_->Read(nbytes, out);
}
Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
return stream_->Read(nbytes);
}
const arrow::io::IOContext& io_context() const override {
return stream_->io_context();
}
//@}

//@{
// @name InputStream
Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override {
return internal::FromObjectMetadata(metadata_);
}
//@}

//@{
// @name RandomAccessFile
Result<int64_t> GetSize() override { return metadata_.size(); }
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
std::shared_ptr<io::InputStream> stream;
ARROW_ASSIGN_OR_RAISE(stream, factory_(metadata_.bucket(), metadata_.name(),
gcs::Generation(metadata_.generation()),
gcs::ReadFromOffset(position)));
return stream->Read(nbytes, out);
}
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
std::shared_ptr<io::InputStream> stream;
ARROW_ASSIGN_OR_RAISE(stream, factory_(metadata_.bucket(), metadata_.name(),
gcs::Generation(metadata_.generation()),
gcs::ReadFromOffset(position)));
return stream->Read(nbytes);
}
//@}

// from Seekable
Status Seek(int64_t position) override {
ARROW_ASSIGN_OR_RAISE(stream_, factory_(metadata_.bucket(), metadata_.name(),
gcs::Generation(metadata_.generation()),
gcs::ReadFromOffset(position)));
return Status::OK();
}

private:
InputStreamFactory factory_;
gcs::ObjectMetadata metadata_;
std::shared_ptr<io::InputStream> stream_;
};

} // namespace

google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) {
Expand Down Expand Up @@ -222,11 +297,14 @@ class GcsFileSystem::Impl {
return internal::ToArrowStatus(metadata.status());
}

Result<std::shared_ptr<io::InputStream>> OpenInputStream(const GcsPath& path) {
auto stream = client_.ReadObject(path.bucket, path.object);
Result<std::shared_ptr<io::InputStream>> OpenInputStream(const std::string& bucket_name,
const std::string& object_name,
gcs::Generation generation,
gcs::ReadFromOffset offset) {
auto stream = client_.ReadObject(bucket_name, object_name, generation, offset);
ARROW_GCS_RETURN_NOT_OK(stream.status());
return std::make_shared<GcsInputStream>(std::move(stream), path.bucket, path.object,
gcs::Generation(), client_);
return std::make_shared<GcsInputStream>(std::move(stream), bucket_name, object_name,
gcs::Generation(), offset, client_);
}

Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
Expand All @@ -246,6 +324,10 @@ class GcsFileSystem::Impl {
return std::make_shared<GcsOutputStream>(std::move(stream));
}

google::cloud::StatusOr<gcs::ObjectMetadata> GetObjectMetadata(const GcsPath& path) {
return client_.GetObjectMetadata(path.bucket, path.object);
}

private:
static Result<FileInfo> GetFileInfoImpl(const GcsPath& path,
const google::cloud::Status& status,
Expand Down Expand Up @@ -324,7 +406,8 @@ Status GcsFileSystem::CopyFile(const std::string& src, const std::string& dest)
Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
const std::string& path) {
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
return impl_->OpenInputStream(p);
return impl_->OpenInputStream(p.bucket, p.object, gcs::Generation(),
gcs::ReadFromOffset());
}

Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
Expand All @@ -333,17 +416,46 @@ Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
return Status::IOError("Only files can be opened as input streams");
}
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
return impl_->OpenInputStream(p);
return impl_->OpenInputStream(p.bucket, p.object, gcs::Generation(),
gcs::ReadFromOffset());
}

Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
const std::string& path) {
return Status::NotImplemented("The GCS FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
auto metadata = impl_->GetObjectMetadata(p);
ARROW_GCS_RETURN_NOT_OK(metadata.status());
auto impl = impl_;
auto open_stream = [impl](const std::string& b, const std::string& o, gcs::Generation g,
gcs::ReadFromOffset offset) {
return impl->OpenInputStream(b, o, g, offset);
};
ARROW_ASSIGN_OR_RAISE(
auto stream,
impl_->OpenInputStream(p.bucket, p.object, gcs::Generation(metadata->generation()),
gcs::ReadFromOffset()));

return std::make_shared<GcsRandomAccessFile>(std::move(open_stream),
*std::move(metadata), std::move(stream));
}

Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
const FileInfo& info) {
return Status::NotImplemented("The GCS FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
auto metadata = impl_->GetObjectMetadata(p);
ARROW_GCS_RETURN_NOT_OK(metadata.status());
auto impl = impl_;
auto open_stream = [impl](const std::string& b, const std::string& o, gcs::Generation g,
gcs::ReadFromOffset offset) {
return impl->OpenInputStream(b, o, g, offset);
};
ARROW_ASSIGN_OR_RAISE(
auto stream,
impl_->OpenInputStream(p.bucket, p.object, gcs::Generation(metadata->generation()),
gcs::ReadFromOffset()));

return std::make_shared<GcsRandomAccessFile>(std::move(open_stream),
*std::move(metadata), std::move(stream));
}

Result<std::shared_ptr<io::OutputStream>> GcsFileSystem::OpenOutputStream(
Expand Down
149 changes: 142 additions & 7 deletions cpp/src/arrow/filesystem/gcsfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include <array>
#include <boost/process.hpp>
#include <random>
#include <string>

#include "arrow/filesystem/gcsfs_internal.h"
Expand Down Expand Up @@ -73,6 +74,8 @@ class GcsIntegrationTest : public ::testing::Test {

protected:
void SetUp() override {
// Initialize a PRNG with a small amount of entropy.
generator_ = std::mt19937_64(std::random_device()());
port_ = std::to_string(GetListenPort());
auto exe_path = bp::search_path("python3");
ASSERT_THAT(exe_path, Not(IsEmpty()));
Expand Down Expand Up @@ -119,7 +122,23 @@ class GcsIntegrationTest : public ::testing::Test {
.set<gc::UnifiedCredentialsOption>(gc::MakeInsecureCredentials()));
}

std::string RandomLine(int lineno, std::size_t width) {
auto const fillers = std::string("abcdefghijlkmnopqrstuvwxyz0123456789");
std::uniform_int_distribution<std::size_t> d(0, fillers.size() - 1);

auto line = std::to_string(lineno) + ": ";
std::generate_n(std::back_inserter(line), width - line.size() - 1,
[&] { return fillers[d(generator_)]; });
line += '\n';
return line;
}

int RandomIndex(std::size_t end) {
return std::uniform_int_distribution<std::size_t>(0, end - 1)(generator_);
}

private:
std::mt19937_64 generator_;
std::string port_;
bp::child server_process_;
};
Expand Down Expand Up @@ -401,22 +420,18 @@ TEST_F(GcsIntegrationTest, ReadObjectInfo) {
TEST_F(GcsIntegrationTest, ReadObjectNotFound) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

auto result = fs->OpenInputStream(NotFoundObjectPath());
EXPECT_EQ(result.status().code(), StatusCode::IOError);
ASSERT_RAISES(IOError, fs->OpenInputStream(NotFoundObjectPath()));
}

TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

arrow::fs::FileInfo info;
ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(kPreexistingBucket));

auto result = fs->OpenInputStream(NotFoundObjectPath());
EXPECT_EQ(result.status().code(), StatusCode::IOError);
ASSERT_RAISES(IOError, fs->OpenInputStream(info));

ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(NotFoundObjectPath()));
result = fs->OpenInputStream(NotFoundObjectPath());
EXPECT_EQ(result.status().code(), StatusCode::IOError);
ASSERT_RAISES(IOError, fs->OpenInputStream(info));
}

TEST_F(GcsIntegrationTest, ReadObjectReadMetadata) {
Expand Down Expand Up @@ -538,6 +553,126 @@ TEST_F(GcsIntegrationTest, WriteObjectLarge) {
EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]);
}

TEST_F(GcsIntegrationTest, OpenInputFileMixedReadVsReadAt) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

// Create a file large enough to make the random access tests non-trivial.
auto constexpr kLineWidth = 100;
auto constexpr kLineCount = 4096;
std::vector<std::string> lines(kLineCount);
int lineno = 0;
std::generate_n(lines.begin(), lines.size(),
[&] { return RandomLine(++lineno, kLineWidth); });

const auto path =
kPreexistingBucket + std::string("/OpenInputFileMixedReadVsReadAt/object-name");
std::shared_ptr<io::OutputStream> output;
ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
for (auto const& line : lines) {
ASSERT_OK(output->Write(line.data(), line.size()));
}
ASSERT_OK(output->Close());

std::shared_ptr<io::RandomAccessFile> file;
ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
for (int i = 0; i != 32; ++i) {
SCOPED_TRACE("Iteration " + std::to_string(i));
// Verify sequential reads work as expected.
std::array<char, kLineWidth> buffer{};
std::int64_t size;
{
ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth));
EXPECT_EQ(lines[2 * i], actual->ToString());
}
{
ASSERT_OK_AND_ASSIGN(size, file->Read(buffer.size(), buffer.data()));
EXPECT_EQ(size, kLineWidth);
auto actual = std::string{buffer.begin(), buffer.end()};
EXPECT_EQ(lines[2 * i + 1], actual);
}

// Verify random reads interleave too.
auto const index = RandomIndex(kLineCount);
auto const position = index * kLineWidth;
ASSERT_OK_AND_ASSIGN(size, file->ReadAt(position, buffer.size(), buffer.data()));
EXPECT_EQ(size, kLineWidth);
auto actual = std::string{buffer.begin(), buffer.end()};
EXPECT_EQ(lines[index], actual);

// Verify random reads using buffers work.
ASSERT_OK_AND_ASSIGN(auto b, file->ReadAt(position, kLineWidth));
EXPECT_EQ(lines[index], b->ToString());
}
}

TEST_F(GcsIntegrationTest, OpenInputFileRandomSeek) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

// Create a file large enough to make the random access tests non-trivial.
auto constexpr kLineWidth = 100;
auto constexpr kLineCount = 4096;
std::vector<std::string> lines(kLineCount);
int lineno = 0;
std::generate_n(lines.begin(), lines.size(),
[&] { return RandomLine(++lineno, kLineWidth); });

const auto path =
kPreexistingBucket + std::string("/OpenInputFileRandomSeek/object-name");
std::shared_ptr<io::OutputStream> output;
ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
for (auto const& line : lines) {
ASSERT_OK(output->Write(line.data(), line.size()));
}
ASSERT_OK(output->Close());

std::shared_ptr<io::RandomAccessFile> file;
ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
for (int i = 0; i != 32; ++i) {
SCOPED_TRACE("Iteration " + std::to_string(i));
// Verify sequential reads work as expected.
auto const index = RandomIndex(kLineCount);
auto const position = index * kLineWidth;
ASSERT_OK(file->Seek(position));
ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth));
EXPECT_EQ(lines[index], actual->ToString());
}
}

TEST_F(GcsIntegrationTest, OpenInputFileInfo) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

arrow::fs::FileInfo info;
ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingObjectPath()));

std::shared_ptr<io::RandomAccessFile> file;
ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(info));

std::array<char, 1024> buffer{};
std::int64_t size;
auto constexpr kStart = 16;
ASSERT_OK_AND_ASSIGN(size, file->ReadAt(kStart, buffer.size(), buffer.data()));

auto const expected = std::string(kLoremIpsum).substr(kStart);
EXPECT_EQ(std::string(buffer.data(), size), expected);
}

TEST_F(GcsIntegrationTest, OpenInputFileNotFound) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

ASSERT_RAISES(IOError, fs->OpenInputFile(NotFoundObjectPath()));
}

TEST_F(GcsIntegrationTest, OpenInputFileInfoInvalid) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

arrow::fs::FileInfo info;
ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(kPreexistingBucket));
ASSERT_RAISES(IOError, fs->OpenInputFile(info));

ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(NotFoundObjectPath()));
ASSERT_RAISES(IOError, fs->OpenInputFile(info));
}

} // namespace
} // namespace fs
} // namespace arrow

0 comments on commit faf26e9

Please sign in to comment.