Skip to content

Commit

Permalink
Add UnlockedBufferedOutputStream (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Jan 10, 2022
1 parent 8081dde commit abbb323
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 0 deletions.
151 changes: 151 additions & 0 deletions cpp/src/arrow/io/buffered.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,157 @@ Status BufferedOutputStream::Flush() { return impl_->Flush(); }

std::shared_ptr<OutputStream> BufferedOutputStream::raw() const { return impl_->raw(); }

// ----------------------------------------------------------------------
// UnlockedBufferedOutputStream implementation

class UnlockedBufferedOutputStream::Impl : public BufferedBase {
public:
explicit Impl(std::shared_ptr<OutputStream> raw, MemoryPool* pool)
: BufferedBase(pool), raw_(std::move(raw)) {}

Status Close() {
if (is_open_) {
Status st = FlushUnlocked();
is_open_ = false;
RETURN_NOT_OK(raw_->Close());
return st;
}
return Status::OK();
}

Status Abort() {
if (is_open_) {
is_open_ = false;
return raw_->Abort();
}
return Status::OK();
}

Result<int64_t> Tell() const {
if (raw_pos_ == -1) {
ARROW_ASSIGN_OR_RAISE(raw_pos_, raw_->Tell());
DCHECK_GE(raw_pos_, 0);
}
return raw_pos_ + buffer_pos_;
}

Status Write(const void* data, int64_t nbytes) { return DoWrite(data, nbytes); }

Status Write(const std::shared_ptr<Buffer>& buffer) {
return DoWrite(buffer->data(), buffer->size(), buffer);
}

Status DoWrite(const void* data, int64_t nbytes,
const std::shared_ptr<Buffer>& buffer = nullptr) {
if (nbytes < 0) {
return Status::Invalid("write count should be >= 0");
}
if (nbytes == 0) {
return Status::OK();
}
if (nbytes + buffer_pos_ >= buffer_size_) {
RETURN_NOT_OK(FlushUnlocked());
DCHECK_EQ(buffer_pos_, 0);
if (nbytes >= buffer_size_) {
// Invalidate cached raw pos
raw_pos_ = -1;
// Direct write
if (buffer) {
return raw_->Write(buffer);
} else {
return raw_->Write(data, nbytes);
}
}
}
AppendToBuffer(data, nbytes);
return Status::OK();
}

Status FlushUnlocked() {
if (buffer_pos_ > 0) {
// Invalidate cached raw pos
raw_pos_ = -1;
RETURN_NOT_OK(raw_->Write(buffer_data_, buffer_pos_));
buffer_pos_ = 0;
}
return Status::OK();
}

Status Flush() {
return FlushUnlocked();
}

Result<std::shared_ptr<OutputStream>> Detach() {
RETURN_NOT_OK(FlushUnlocked());
is_open_ = false;
return std::move(raw_);
}

Status SetBufferSize(int64_t new_buffer_size) {
if (new_buffer_size <= 0) {
return Status::Invalid("Buffer size should be positive");
}
if (buffer_pos_ >= new_buffer_size) {
// If the buffer is shrinking, first flush to the raw OutputStream
RETURN_NOT_OK(FlushUnlocked());
}
return ResizeBuffer(new_buffer_size);
}

std::shared_ptr<OutputStream> raw() const { return raw_; }

private:
std::shared_ptr<OutputStream> raw_;
};

UnlockedBufferedOutputStream::UnlockedBufferedOutputStream(std::shared_ptr<OutputStream> raw,
MemoryPool* pool) {
impl_.reset(new Impl(std::move(raw), pool));
}

Result<std::shared_ptr<UnlockedBufferedOutputStream>> UnlockedBufferedOutputStream::Create(
int64_t buffer_size, MemoryPool* pool, std::shared_ptr<OutputStream> raw) {
auto result = std::shared_ptr<UnlockedBufferedOutputStream>(
new UnlockedBufferedOutputStream(std::move(raw), pool));
RETURN_NOT_OK(result->SetBufferSize(buffer_size));
return result;
}

UnlockedBufferedOutputStream::~UnlockedBufferedOutputStream() { internal::CloseFromDestructor(this); }

Status UnlockedBufferedOutputStream::SetBufferSize(int64_t new_buffer_size) {
return impl_->SetBufferSize(new_buffer_size);
}

int64_t UnlockedBufferedOutputStream::buffer_size() const { return impl_->buffer_size(); }

int64_t UnlockedBufferedOutputStream::bytes_buffered() const { return impl_->buffer_pos(); }

Result<std::shared_ptr<OutputStream>> UnlockedBufferedOutputStream::Detach() {
return impl_->Detach();
}

Status UnlockedBufferedOutputStream::Close() { return impl_->Close(); }

Status UnlockedBufferedOutputStream::Abort() { return impl_->Abort(); }

bool UnlockedBufferedOutputStream::closed() const { return impl_->closed(); }

Result<int64_t> UnlockedBufferedOutputStream::Tell() const { return impl_->Tell(); }

Status UnlockedBufferedOutputStream::Write(const void* data, int64_t nbytes) {
return impl_->Write(data, nbytes);
}

Status UnlockedBufferedOutputStream::Write(const std::shared_ptr<Buffer>& data) {
return impl_->Write(data);
}

Status UnlockedBufferedOutputStream::Flush() { return impl_->Flush(); }

std::shared_ptr<OutputStream> UnlockedBufferedOutputStream::raw() const { return impl_->raw(); }


// ----------------------------------------------------------------------
// BufferedInputStream implementation

Expand Down
55 changes: 55 additions & 0 deletions cpp/src/arrow/io/buffered.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,61 @@ class ARROW_EXPORT BufferedOutputStream : public OutputStream {
std::unique_ptr<Impl> impl_;
};


class ARROW_EXPORT UnlockedBufferedOutputStream : public OutputStream {
public:
~UnlockedBufferedOutputStream() override;

/// \brief Create a buffered output stream wrapping the given output stream.
/// \param[in] buffer_size the size of the temporary write buffer
/// \param[in] pool a MemoryPool to use for allocations
/// \param[in] raw another OutputStream
/// \return the created BufferedOutputStream
static Result<std::shared_ptr<UnlockedBufferedOutputStream>> Create(
int64_t buffer_size, MemoryPool* pool, std::shared_ptr<OutputStream> raw);

/// \brief Resize internal buffer
/// \param[in] new_buffer_size the new buffer size
/// \return Status
Status SetBufferSize(int64_t new_buffer_size);

/// \brief Return the current size of the internal buffer
int64_t buffer_size() const;

/// \brief Return the number of remaining bytes that have not been flushed to
/// the raw OutputStream
int64_t bytes_buffered() const;

/// \brief Flush any buffered writes and release the raw
/// OutputStream. Further operations on this object are invalid
/// \return the underlying OutputStream
Result<std::shared_ptr<OutputStream>> Detach();

// OutputStream interface

/// \brief Close the buffered output stream. This implicitly closes the
/// underlying raw output stream.
Status Close() override;
Status Abort() override;
bool closed() const override;

Result<int64_t> Tell() const override;
// Write bytes to the stream. Thread-safe
Status Write(const void* data, int64_t nbytes) override;
Status Write(const std::shared_ptr<Buffer>& data) override;

Status Flush() override;

/// \brief Return the underlying raw output stream.
std::shared_ptr<OutputStream> raw() const;

private:
explicit UnlockedBufferedOutputStream(std::shared_ptr<OutputStream> raw, MemoryPool* pool);

class ARROW_NO_EXPORT Impl;
std::unique_ptr<Impl> impl_;
};

/// \class BufferedInputStream
/// \brief An InputStream that performs buffered reads from an unbuffered
/// InputStream, which can mitigate the overhead of many small reads in some
Expand Down

0 comments on commit abbb323

Please sign in to comment.