Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Nov 12, 2024
1 parent c035fd0 commit 1ddf264
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 69 deletions.
54 changes: 46 additions & 8 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void GetArrowResultBatchCtx::on_close(int64_t packet_seq) {
void GetArrowResultBatchCtx::on_data(
const std::shared_ptr<vectorized::Block>& block, int64_t packet_seq, int be_exec_version,
segment_v2::CompressionTypePB fragement_transmission_compression_type, std::string timezone,
std::string arrow_schema_field_names, RuntimeProfile::Counter* serialize_batch_ns_timer,
RuntimeProfile::Counter* serialize_batch_ns_timer,
RuntimeProfile::Counter* uncompressed_bytes_counter,
RuntimeProfile::Counter* compressed_bytes_counter) {
Status st = Status::OK();
Expand All @@ -128,7 +128,6 @@ void GetArrowResultBatchCtx::on_data(
result->set_eos(false);
if (packet_seq == 0) {
result->set_timezone(timezone);
result->set_fields_labels(arrow_schema_field_names);
}
} else {
result->clear_block();
Expand Down Expand Up @@ -237,9 +236,8 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* state,
auto* ctx = _waiting_arrow_result_batch_rpc.front();
_waiting_arrow_result_batch_rpc.pop_front();
ctx->on_data(result, _packet_num, _be_exec_version,
_fragement_transmission_compression_type, _timezone, _arrow_schema_field_names,
_serialize_batch_ns_timer, _uncompressed_bytes_counter,
_compressed_bytes_counter);
_fragement_transmission_compression_type, _timezone, _serialize_batch_ns_timer,
_uncompressed_bytes_counter, _compressed_bytes_counter);
_packet_num++;
}

Expand Down Expand Up @@ -347,8 +345,8 @@ void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {
_instance_rows_in_queue.pop_front();

ctx->on_data(block, _packet_num, _be_exec_version, _fragement_transmission_compression_type,
_timezone, _arrow_schema_field_names, _serialize_batch_ns_timer,
_uncompressed_bytes_counter, _compressed_bytes_counter);
_timezone, _serialize_batch_ns_timer, _uncompressed_bytes_counter,
_compressed_bytes_counter);
_packet_num++;
return;
}
Expand All @@ -370,8 +368,29 @@ void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {
}

void BufferControlBlock::register_arrow_schema(const std::shared_ptr<arrow::Schema>& arrow_schema) {
std::lock_guard<std::mutex> l(_lock);
_arrow_schema = arrow_schema;
_arrow_schema_field_names = join(_arrow_schema->field_names(), ",");
}

Status BufferControlBlock::find_arrow_schema(std::shared_ptr<arrow::Schema>* arrow_schema) {
std::unique_lock<std::mutex> l(_lock);
if (!_status.ok()) {
return _status;
}
if (_is_cancelled) {
return Status::Cancelled("Cancelled");
}

// normal path end
if (_arrow_schema != nullptr) {
*arrow_schema = _arrow_schema;
return Status::OK();
}

if (_is_close) {
return Status::RuntimeError("Closed");
}
return Status::InternalError("Get Arrow Schema Abnormal Ending");
}

Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
Expand Down Expand Up @@ -405,18 +424,37 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
}
_waiting_rpc.clear();
}

if (!_waiting_arrow_result_batch_rpc.empty()) {
if (_status.ok()) {
for (auto& ctx : _waiting_arrow_result_batch_rpc) {
ctx->on_close(_packet_num);
}
} else {
for (auto& ctx : _waiting_arrow_result_batch_rpc) {
ctx->on_failure(_status);
}
}
_waiting_arrow_result_batch_rpc.clear();
}
return Status::OK();
}

void BufferControlBlock::cancel(const Status& reason) {
std::unique_lock<std::mutex> l(_lock);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
_is_cancelled = true;
_arrow_data_arrival.notify_all();
for (auto& ctx : _waiting_rpc) {
ctx->on_failure(reason);
}
_waiting_rpc.clear();
for (auto& ctx : _waiting_arrow_result_batch_rpc) {
ctx->on_failure(Status::Cancelled("Cancelled"));
}
_waiting_arrow_result_batch_rpc.clear();
_update_dependency();
_arrow_flight_result_batch_queue.clear();
}

void BufferControlBlock::set_dependency(
Expand Down
6 changes: 2 additions & 4 deletions be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ struct GetArrowResultBatchCtx {
void on_data(const std::shared_ptr<vectorized::Block>& block, int64_t packet_seq,
int be_exec_version,
segment_v2::CompressionTypePB fragement_transmission_compression_type,
std::string timezone, std::string arrow_schema_field_names,
RuntimeProfile::Counter* serialize_batch_ns_timer,
std::string timezone, RuntimeProfile::Counter* serialize_batch_ns_timer,
RuntimeProfile::Counter* uncompressed_bytes_counter,
RuntimeProfile::Counter* compressed_bytes_counter);
};
Expand All @@ -114,7 +113,7 @@ class BufferControlBlock {
void get_arrow_batch(GetArrowResultBatchCtx* ctx);

void register_arrow_schema(const std::shared_ptr<arrow::Schema>& arrow_schema);
std::shared_ptr<arrow::Schema> find_arrow_schema() { return _arrow_schema; }
Status find_arrow_schema(std::shared_ptr<arrow::Schema>* arrow_schema);

// close buffer block, set _status to exec_status and set _is_close to true;
// called because data has been read or error happened.
Expand Down Expand Up @@ -156,7 +155,6 @@ class BufferControlBlock {
ArrowFlightResultQueue _arrow_flight_result_batch_queue;
// for arrow flight
std::shared_ptr<arrow::Schema> _arrow_schema;
std::string _arrow_schema_field_names;

// protects all subsequent data in this block
std::mutex _lock;
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/result_buffer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ Status ResultBufferMgr::find_arrow_schema(const TUniqueId& finst_id,
"no arrow schema for this query, maybe query has been canceled, finst_id={}",
print_id(finst_id));
}
*schema = cb->find_arrow_schema();
return Status::OK();
return cb->find_arrow_schema(schema);
}

void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx) {
Expand Down
113 changes: 66 additions & 47 deletions be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

#include "service/arrow_flight/arrow_flight_batch_reader.h"

#include <arrow/io/memory.h>
#include <arrow/ipc/reader.h>
#include <arrow/status.h>
#include <arrow/type.h>
#include <gen_cpp/internal_service.pb.h>

#include <utility>

#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/result_buffer_mgr.h"
Expand All @@ -33,7 +33,7 @@
#include "util/arrow/utils.h"
#include "util/brpc_client_cache.h"
#include "util/ref_count_closure.h"
#include "util/string_util.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"

namespace doris::flight {
Expand Down Expand Up @@ -152,7 +152,52 @@ arrow::Result<std::shared_ptr<ArrowFlightBatchRemoteReader>> ArrowFlightBatchRem
return result;
}

arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool first_fetch_for_init) {
arrow::Status ArrowFlightBatchRemoteReader::_fetch_schema() {
Status st;
auto request = std::make_shared<PFetchArrowFlightSchemaRequest>();
auto* pfinst_id = request->mutable_finst_id();
pfinst_id->set_hi(_statement->query_id.hi);
pfinst_id->set_lo(_statement->query_id.lo);
auto callback = DummyBrpcCallback<PFetchArrowFlightSchemaResult>::create_shared();
auto closure = AutoReleaseClosure<
PFetchArrowFlightSchemaRequest,
DummyBrpcCallback<PFetchArrowFlightSchemaResult>>::create_unique(request, callback);
callback->cntl_->set_timeout_ms(BRPC_CONTROLLER_TIMEOUT_MS);
callback->cntl_->ignore_eovercrowded();

_brpc_stub->fetch_arrow_flight_schema(closure->cntl_.get(), closure->request_.get(),
closure->response_.get(), closure.get());
closure.release();
callback->join();

if (callback->cntl_->Failed()) {
if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
_brpc_stub, _statement->result_addr.hostname, _statement->result_addr.port)) {
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
callback->cntl_->remote_side());
}
auto error_code = callback->cntl_->ErrorCode();
auto error_text = callback->cntl_->ErrorText();
return _return_invalid_status(fmt::format("fetch schema error: {}, error_text: {}",
berror(error_code), error_text));
}
st = Status::create(callback->response_->status());
ARROW_RETURN_NOT_OK(to_arrow_status(st));

if (callback->response_->has_schema() && !callback->response_->schema().empty()) {
auto input =
arrow::io::BufferReader::FromString(std::string(callback->response_->schema()));
ARROW_ASSIGN_OR_RAISE(auto reader,
arrow::ipc::RecordBatchStreamReader::Open(
input.get(), arrow::ipc::IpcReadOptions::Defaults()));
_schema = reader->schema();
} else {
return _return_invalid_status(fmt::format("fetch schema error: not find schema"));
}
return arrow::Status::OK();
}

arrow::Status ArrowFlightBatchRemoteReader::_fetch_data() {
DCHECK(_block == nullptr);
while (true) {
// if `continue` occurs, data is invalid, continue fetch, block is nullptr.
Expand Down Expand Up @@ -183,27 +228,22 @@ arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool first_fetch_for_ini
}
auto error_code = callback->cntl_->ErrorCode();
auto error_text = callback->cntl_->ErrorText();
return _return_invalid_status(
fmt::format("error={}, error_text={}", berror(error_code), error_text));
return _return_invalid_status(fmt::format("fetch data error={}, error_text: {}",
berror(error_code), error_text));
}
st = Status::create(callback->response_->status());
ARROW_RETURN_NOT_OK(to_arrow_status(st));

DCHECK(callback->response_->has_packet_seq());
if (_packet_seq != callback->response_->packet_seq()) {
return _return_invalid_status(
fmt::format("receive packet failed, expect={}, receive={}", _packet_seq,
callback->response_->packet_seq()));
fmt::format("fetch data receive packet failed, expect: {}, receive: {}",
_packet_seq, callback->response_->packet_seq()));
}
_packet_seq++;

if (callback->response_->has_eos() && callback->response_->eos()) {
if (!first_fetch_for_init) {
break;
} else {
return _return_invalid_status(fmt::format("received unexpected eos, packet_seq={}",
callback->response_->packet_seq()));
}
break;
}

if (callback->response_->has_empty_batch() && callback->response_->empty_batch()) {
Expand All @@ -215,13 +255,10 @@ arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool first_fetch_for_ini
continue;
}

if (first_fetch_for_init) {
std::call_once(_timezone_once_flag, [this, callback] {
DCHECK(callback->response_->has_timezone());
DCHECK(callback->response_->has_fields_labels());
_timezone = callback->response_->timezone();
TimezoneUtils::find_cctz_time_zone(_timezone, _timezone_obj);
_arrow_schema_field_names = callback->response_->fields_labels();
}
TimezoneUtils::find_cctz_time_zone(callback->response_->timezone(), _timezone_obj);
});

{
SCOPED_ATOMIC_TIMER(&_deserialize_block_timer);
Expand All @@ -231,47 +268,30 @@ arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool first_fetch_for_ini
break;
}

if (!first_fetch_for_init) {
const auto rows = _block->rows();
if (rows == 0) {
_block = nullptr;
continue;
}
const auto rows = _block->rows();
if (rows == 0) {
_block = nullptr;
continue;
}
}
return arrow::Status::OK();
}

arrow::Status ArrowFlightBatchRemoteReader::init_schema() {
SCOPED_ATTACH_TASK(_mem_tracker);
ARROW_RETURN_NOT_OK(_fetch_data(true));
if (_block == nullptr) {
return _return_invalid_status("failed to fetch data for schema");
}
RETURN_ARROW_STATUS_IF_ERROR(get_arrow_schema_from_block(*_block, &_schema, _timezone));

// Block does not contain the real column name (label), for example: select avg(k) from tbl
// - Block.name: type=decimal(38, 9)
// - Real column name (label): avg(k)
// so, the first fetch data Block will return the actual column name, and then modify the schema.
std::vector<std::string> arrow_schema_field_names = split(_arrow_schema_field_names, ",");
std::vector<std::shared_ptr<arrow::Field>> fields;
for (int i = 0; i < arrow_schema_field_names.size(); i++) {
fields.push_back(_schema->fields()[i]->WithName(arrow_schema_field_names[i]));
}
_schema = arrow::schema(std::move(fields));
ARROW_RETURN_NOT_OK(_fetch_schema());
DCHECK(_schema != nullptr);
return arrow::Status::OK();
}

arrow::Status ArrowFlightBatchRemoteReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) {
// parameter *out not nullptr
*out = nullptr;
SCOPED_ATTACH_TASK(_mem_tracker);
ARROW_RETURN_NOT_OK(_fetch_data());
if (_block == nullptr) {
// eof, normal path end
// last ReadNext -> _fetch_data return block is nullptr
// eof, normal path end, last _fetch_data return block is nullptr
return arrow::Status::OK();
}
SCOPED_ATTACH_TASK(_mem_tracker);
{
// convert one batch
SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer);
Expand All @@ -281,7 +301,6 @@ arrow::Status ArrowFlightBatchRemoteReader::ReadNext(std::shared_ptr<arrow::Reco
ARROW_RETURN_NOT_OK(to_arrow_status(st));
}
_block = nullptr;
ARROW_RETURN_NOT_OK(_fetch_data(false));

if (*out != nullptr) {
VLOG_NOTICE << "ArrowFlightBatchRemoteReader read next: " << (*out)->num_rows() << ", "
Expand Down
10 changes: 4 additions & 6 deletions be/src/service/arrow_flight/arrow_flight_batch_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,13 @@ class ArrowFlightBatchRemoteReader : public ArrowFlightBatchReaderBase {
private:
ArrowFlightBatchRemoteReader(const std::shared_ptr<QueryStatement>& statement,
const std::shared_ptr<PBackendService_Stub>& stub);
// If first_fetch_for_init is true, some additional information will be returned
// to initialize the schema. in this case, the fetched block is allowed to be empty,
// but eos is not expected to be returned.
arrow::Status _fetch_data(bool first_fetch_for_init);

arrow::Status _fetch_schema();
arrow::Status _fetch_data();

std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
std::string _timezone;
std::once_flag _timezone_once_flag;
std::shared_ptr<vectorized::Block> _block;
std::string _arrow_schema_field_names;
};

} // namespace flight
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ public enum IgnoreSplitType {
private boolean enableSyncRuntimeFilterSize = true;

@VariableMgr.VarAttr(name = ENABLE_PARALLEL_RESULT_SINK, needForward = true, fuzzy = true)
private boolean enableParallelResultSink = true;
private boolean enableParallelResultSink = false;

@VariableMgr.VarAttr(name = "sort_phase_num", fuzzy = true, needForward = true,
description = {"如设置为1,则只生成1阶段sort,设置为2,则只生成2阶段sort,设置其它值,优化器根据代价选择sort类型",
Expand Down
1 change: 0 additions & 1 deletion gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ message PFetchArrowDataResult {
optional PBlock block = 4;
optional bool empty_batch = 5;
optional string timezone = 6;
optional string fields_labels = 7;
};

message PFetchArrowFlightSchemaRequest {
Expand Down

0 comments on commit 1ddf264

Please sign in to comment.