Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix grpc transcoding bugs for unknown fields #797

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions src/grpc/zero_copy_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,29 @@ GrpcZeroCopyInputStream::GrpcZeroCopyInputStream()
: current_buffer_(nullptr),
current_buffer_size_(0),
position_(0),
bytes_read_(0),
finished_(false) {}

void GrpcZeroCopyInputStream::AddMessage(grpc_byte_buffer* message,
void GrpcZeroCopyInputStream::AddMessage(grpc_byte_buffer *message,
bool take_ownership) {
serializer_.AddMessage(message, take_ownership);
}

bool GrpcZeroCopyInputStream::Next(const void** data, int* size) {
bool GrpcZeroCopyInputStream::Next(const void **data, int *size) {
if (position_ >= current_buffer_size_) {
position_ = 0;
if (!serializer_.Next(&current_buffer_, &current_buffer_size_)) {
// No data
*size = 0;
current_buffer_size_ = 0;
return !finished_;
}
position_ = 0;
}

// Return [position_, current_buffer_size_) interval of the current buffer
*data = current_buffer_ + position_;
*size = static_cast<int>(current_buffer_size_ - position_);
bytes_read_ += *size;

// Move the position to the end of the current buffer
position_ = current_buffer_size_;
Expand All @@ -65,9 +68,40 @@ bool GrpcZeroCopyInputStream::Next(const void** data, int* size) {
void GrpcZeroCopyInputStream::BackUp(int count) {
if (0 < count && static_cast<size_t>(count) <= position_) {
position_ -= count;
bytes_read_ -= count;
}
}

bool GrpcZeroCopyInputStream::Skip(int count) {
if (count < 0) {
// Safe guard against wrong usage.
return false;
}
size_t count_left = static_cast<size_t>(count);
while (position_ + count_left > current_buffer_size_) {
// Skipping past the current buffer, read the next one.
int delta = static_cast<int>(current_buffer_size_ - position_);
count_left -= delta;
bytes_read_ += delta;
position_ = 0;
if (!serializer_.Next(&current_buffer_, &current_buffer_size_)) {
// No data. We are potentially not at the end of the stream yet, but we
// don't know that and can only skip to the end and return an error.
current_buffer_size_ = 0;
return false;
}
}

// Move the position ahead the requested number of bytes.
position_ += count_left;
bytes_read_ += count_left;
return true;
}

::google::protobuf::int64 GrpcZeroCopyInputStream::ByteCount() const {
return static_cast<::google::protobuf::int64>(bytes_read_);
}

int64_t GrpcZeroCopyInputStream::BytesAvailable() const {
return (current_buffer_size_ - position_) + serializer_.ByteCount();
}
Expand Down
11 changes: 6 additions & 5 deletions src/grpc/zero_copy_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,27 @@ class GrpcZeroCopyInputStream
GrpcZeroCopyInputStream();

// Add a message to the end of the stream
void AddMessage(grpc_byte_buffer* message, bool take_ownership);
void AddMessage(grpc_byte_buffer *message, bool take_ownership);

// Marks the end of the stream, which means that ZeroCopyInputStream will
// return false after all the existing messages are consumed.
void Finish() { finished_ = true; }

// ZeroCopyInputStream implementation

bool Next(const void** data, int* size);
bool Next(const void **data, int *size);
void BackUp(int count);
bool Skip(int count) { return false; } // not supported
::google::protobuf::int64 ByteCount() const { return 0; } // Not implemented
bool Skip(int count);
::google::protobuf::int64 ByteCount() const;
int64_t BytesAvailable() const;
bool Finished() const { return finished_; }

private:
GrpcMessageSerializer serializer_;
const unsigned char* current_buffer_;
const unsigned char *current_buffer_;
size_t current_buffer_size_;
size_t position_;
size_t bytes_read_;
bool finished_;
};

Expand Down
Loading