Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>* r
_arrow_data_arrival.wait_for(l, std::chrono::milliseconds(20));
}

if (!_status.ok()) {
return _status;
}
if (_is_cancelled) {
return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id)));
}
Expand All @@ -311,17 +314,20 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>* r

// normal path end
if (_is_close) {
if (!_status.ok()) {
return _status;
}
std::stringstream ss;
_profile.pretty_print(&ss);
VLOG_NOTICE << fmt::format(
LOG(INFO) << fmt::format(
"BufferControlBlock finished, fragment_id={}, is_close={}, is_cancelled={}, "
"packet_num={}, peak_memory_usage={}, profile={}",
print_id(_fragment_id), _is_close, _is_cancelled, _packet_num,
_mem_tracker->peak_consumption(), ss.str());
return Status::OK();
}
return Status::InternalError(
fmt::format("Get Arrow Batch Abnormal Ending ()", print_id(_fragment_id)));
fmt::format("Get Arrow Batch Abnormal Ending (), ()", print_id(_fragment_id), _status));
}

void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {
Expand Down Expand Up @@ -354,10 +360,14 @@ void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {

// normal path end
if (_is_close) {
if (!_status.ok()) {
ctx->on_failure(_status);
return;
}
ctx->on_close(_packet_num);
std::stringstream ss;
_profile.pretty_print(&ss);
VLOG_NOTICE << fmt::format(
LOG(INFO) << fmt::format(
"BufferControlBlock finished, fragment_id={}, is_close={}, is_cancelled={}, "
"packet_num={}, peak_memory_usage={}, profile={}",
print_id(_fragment_id), _is_close, _is_cancelled, _packet_num,
Expand Down Expand Up @@ -391,8 +401,8 @@ Status BufferControlBlock::find_arrow_schema(std::shared_ptr<arrow::Schema>* arr
if (_is_close) {
return Status::RuntimeError(fmt::format("Closed ()", print_id(_fragment_id)));
}
return Status::InternalError(
fmt::format("Get Arrow Schema Abnormal Ending ()", print_id(_fragment_id)));
return Status::InternalError(fmt::format("Get Arrow Schema Abnormal Ending (), ()",
print_id(_fragment_id), _status));
}

Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ arrow::Status ArrowFlightBatchReaderBase::_return_invalid_status(const std::stri
}

ArrowFlightBatchReaderBase::~ArrowFlightBatchReaderBase() {
VLOG_NOTICE << fmt::format(
LOG(INFO) << fmt::format(
"ArrowFlightBatchReader finished, packet_seq={}, result_addr={}:{}, finistId={}, "
"convert_arrow_batch_timer={}, deserialize_block_timer={}, peak_memory_usage={}",
_packet_seq, _statement->result_addr.hostname, _statement->result_addr.port,
Expand Down
Loading