Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid send empty record batch to client #796

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion common_types/src/record_batch.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Record batch

Expand Down Expand Up @@ -234,23 +234,33 @@ impl RecordBatch {
&self.schema
}

#[inline]
pub fn is_empty(&self) -> bool {
self.num_rows() == 0
}

// REQUIRE: index is valid
#[inline]
pub fn column(&self, index: usize) -> &ColumnBlock {
&self.data.column_blocks[index]
}

#[inline]
pub fn num_columns(&self) -> usize {
self.schema.num_columns()
}

#[inline]
pub fn num_rows(&self) -> usize {
self.data.num_rows()
}

#[inline]
pub fn as_arrow_record_batch(&self) -> &ArrowRecordBatch {
&self.data.arrow_record_batch
}

#[inline]
pub fn into_arrow_record_batch(self) -> ArrowRecordBatch {
self.data.arrow_record_batch
}
Expand Down
3 changes: 2 additions & 1 deletion components/arrow_ext/src/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Utilities for `RecordBatch` serialization using Arrow IPC

Expand Down Expand Up @@ -109,6 +109,7 @@ impl RecordBatchesEncoder {
let stream_writer = if let Some(v) = &mut self.stream_writer {
v
} else {
// TODO: pre-allocate the buffer.
let buffer: Vec<u8> = Vec::new();
let stream_writer =
StreamWriter::try_new(buffer, &batch.schema()).context(ArrowError)?;
Expand Down
4 changes: 4 additions & 0 deletions server/src/proxy/grpc/sql_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,10 @@ impl QueryResponseWriter {
}

pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
if batch.is_empty() {
return Ok(());
}

self.encoder
.write(batch.as_arrow_record_batch())
.box_err()
Expand Down