diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index e01d35e08884fe2..1686c28d413c7af 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -112,7 +112,7 @@ void GetArrowResultBatchCtx::on_close(int64_t packet_seq) { void GetArrowResultBatchCtx::on_data( const std::shared_ptr& block, int64_t packet_seq, int be_exec_version, segment_v2::CompressionTypePB fragement_transmission_compression_type, std::string timezone, - std::string arrow_schema_field_names, RuntimeProfile::Counter* serialize_batch_ns_timer, + RuntimeProfile::Counter* serialize_batch_ns_timer, RuntimeProfile::Counter* uncompressed_bytes_counter, RuntimeProfile::Counter* compressed_bytes_counter) { Status st = Status::OK(); @@ -128,7 +128,6 @@ void GetArrowResultBatchCtx::on_data( result->set_eos(false); if (packet_seq == 0) { result->set_timezone(timezone); - result->set_fields_labels(arrow_schema_field_names); } } else { result->clear_block(); @@ -237,9 +236,8 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* state, auto* ctx = _waiting_arrow_result_batch_rpc.front(); _waiting_arrow_result_batch_rpc.pop_front(); ctx->on_data(result, _packet_num, _be_exec_version, - _fragement_transmission_compression_type, _timezone, _arrow_schema_field_names, - _serialize_batch_ns_timer, _uncompressed_bytes_counter, - _compressed_bytes_counter); + _fragement_transmission_compression_type, _timezone, _serialize_batch_ns_timer, + _uncompressed_bytes_counter, _compressed_bytes_counter); _packet_num++; } @@ -347,8 +345,8 @@ void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) { _instance_rows_in_queue.pop_front(); ctx->on_data(block, _packet_num, _be_exec_version, _fragement_transmission_compression_type, - _timezone, _arrow_schema_field_names, _serialize_batch_ns_timer, - _uncompressed_bytes_counter, _compressed_bytes_counter); + _timezone, _serialize_batch_ns_timer, _uncompressed_bytes_counter, + _compressed_bytes_counter); _packet_num++; return; } @@ -370,8 +368,29 @@ void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) { } void BufferControlBlock::register_arrow_schema(const std::shared_ptr& arrow_schema) { + std::lock_guard l(_lock); _arrow_schema = arrow_schema; - _arrow_schema_field_names = join(_arrow_schema->field_names(), ","); +} + +Status BufferControlBlock::find_arrow_schema(std::shared_ptr* arrow_schema) { + std::unique_lock l(_lock); + if (!_status.ok()) { + return _status; + } + if (_is_cancelled) { + return Status::Cancelled("Cancelled"); + } + + // normal path end + if (_arrow_schema != nullptr) { + *arrow_schema = _arrow_schema; + return Status::OK(); + } + + if (_is_close) { + return Status::RuntimeError("Closed"); + } + return Status::InternalError("Get Arrow Schema Abnormal Ending"); } Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { @@ -405,18 +424,37 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { } _waiting_rpc.clear(); } + + if (!_waiting_arrow_result_batch_rpc.empty()) { + if (_status.ok()) { + for (auto& ctx : _waiting_arrow_result_batch_rpc) { + ctx->on_close(_packet_num); + } + } else { + for (auto& ctx : _waiting_arrow_result_batch_rpc) { + ctx->on_failure(_status); + } + } + _waiting_arrow_result_batch_rpc.clear(); + } return Status::OK(); } void BufferControlBlock::cancel(const Status& reason) { std::unique_lock l(_lock); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); _is_cancelled = true; _arrow_data_arrival.notify_all(); for (auto& ctx : _waiting_rpc) { ctx->on_failure(reason); } _waiting_rpc.clear(); + for (auto& ctx : _waiting_arrow_result_batch_rpc) { + ctx->on_failure(Status::Cancelled("Cancelled")); + } + _waiting_arrow_result_batch_rpc.clear(); _update_dependency(); + _arrow_flight_result_batch_queue.clear(); } void BufferControlBlock::set_dependency( diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 1888009930e92f9..249e1ba7652df73 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -90,8 +90,7 @@ struct GetArrowResultBatchCtx { void on_data(const std::shared_ptr& block, int64_t packet_seq, int be_exec_version, segment_v2::CompressionTypePB fragement_transmission_compression_type, - std::string timezone, std::string arrow_schema_field_names, - RuntimeProfile::Counter* serialize_batch_ns_timer, + std::string timezone, RuntimeProfile::Counter* serialize_batch_ns_timer, RuntimeProfile::Counter* uncompressed_bytes_counter, RuntimeProfile::Counter* compressed_bytes_counter); }; @@ -114,7 +113,7 @@ class BufferControlBlock { void get_arrow_batch(GetArrowResultBatchCtx* ctx); void register_arrow_schema(const std::shared_ptr& arrow_schema); - std::shared_ptr find_arrow_schema() { return _arrow_schema; } + Status find_arrow_schema(std::shared_ptr* arrow_schema); // close buffer block, set _status to exec_status and set _is_close to true; // called because data has been read or error happened. @@ -156,7 +155,6 @@ class BufferControlBlock { ArrowFlightResultQueue _arrow_flight_result_batch_queue; // for arrow flight std::shared_ptr _arrow_schema; - std::string _arrow_schema_field_names; // protects all subsequent data in this block std::mutex _lock; diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index 3fbfbce2fcc06b9..5756fd267851fe1 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -114,8 +114,7 @@ Status ResultBufferMgr::find_arrow_schema(const TUniqueId& finst_id, "no arrow schema for this query, maybe query has been canceled, finst_id={}", print_id(finst_id)); } - *schema = cb->find_arrow_schema(); - return Status::OK(); + return cb->find_arrow_schema(schema); } void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx) { diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp index 6310e83a16402c4..69550b28bab9b53 100644 --- a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp +++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp @@ -17,12 +17,12 @@ #include "service/arrow_flight/arrow_flight_batch_reader.h" +#include +#include #include #include #include -#include - #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/result_buffer_mgr.h" @@ -33,7 +33,7 @@ #include "util/arrow/utils.h" #include "util/brpc_client_cache.h" #include "util/ref_count_closure.h" -#include "util/string_util.h" +#include "util/runtime_profile.h" #include "vec/core/block.h" namespace doris::flight { @@ -152,7 +152,52 @@ arrow::Result> ArrowFlightBatchRem return result; } -arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool first_fetch_for_init) { +arrow::Status ArrowFlightBatchRemoteReader::_fetch_schema() { + Status st; + auto request = std::make_shared(); + auto* pfinst_id = request->mutable_finst_id(); + pfinst_id->set_hi(_statement->query_id.hi); + pfinst_id->set_lo(_statement->query_id.lo); + auto callback = DummyBrpcCallback::create_shared(); + auto closure = AutoReleaseClosure< + PFetchArrowFlightSchemaRequest, + DummyBrpcCallback>::create_unique(request, callback); + callback->cntl_->set_timeout_ms(BRPC_CONTROLLER_TIMEOUT_MS); + callback->cntl_->ignore_eovercrowded(); + + _brpc_stub->fetch_arrow_flight_schema(closure->cntl_.get(), closure->request_.get(), + closure->response_.get(), closure.get()); + closure.release(); + callback->join(); + + if (callback->cntl_->Failed()) { + if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available( + _brpc_stub, _statement->result_addr.hostname, _statement->result_addr.port)) { + ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( + callback->cntl_->remote_side()); + } + auto error_code = callback->cntl_->ErrorCode(); + auto error_text = callback->cntl_->ErrorText(); + return _return_invalid_status(fmt::format("fetch schema error: {}, error_text: {}", + berror(error_code), error_text)); + } + st = Status::create(callback->response_->status()); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + + if (callback->response_->has_schema() && !callback->response_->schema().empty()) { + auto input = + arrow::io::BufferReader::FromString(std::string(callback->response_->schema())); + ARROW_ASSIGN_OR_RAISE(auto reader, + arrow::ipc::RecordBatchStreamReader::Open( + input.get(), arrow::ipc::IpcReadOptions::Defaults())); + _schema = reader->schema(); + } else { + return _return_invalid_status(fmt::format("fetch schema error: not find schema")); + } + return arrow::Status::OK(); +} + +arrow::Status ArrowFlightBatchRemoteReader::_fetch_data() { DCHECK(_block == nullptr); while (true) { // if `continue` occurs, data is invalid, continue fetch, block is nullptr. @@ -183,8 +228,8 @@ arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool first_fetch_for_ini } auto error_code = callback->cntl_->ErrorCode(); auto error_text = callback->cntl_->ErrorText(); - return _return_invalid_status( - fmt::format("error={}, error_text={}", berror(error_code), error_text)); + return _return_invalid_status(fmt::format("fetch data error={}, error_text: {}", + berror(error_code), error_text)); } st = Status::create(callback->response_->status()); ARROW_RETURN_NOT_OK(to_arrow_status(st)); @@ -192,18 +237,13 @@ arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool first_fetch_for_ini DCHECK(callback->response_->has_packet_seq()); if (_packet_seq != callback->response_->packet_seq()) { return _return_invalid_status( - fmt::format("receive packet failed, expect={}, receive={}", _packet_seq, - callback->response_->packet_seq())); + fmt::format("fetch data receive packet failed, expect: {}, receive: {}", + _packet_seq, callback->response_->packet_seq())); } _packet_seq++; if (callback->response_->has_eos() && callback->response_->eos()) { - if (!first_fetch_for_init) { - break; - } else { - return _return_invalid_status(fmt::format("received unexpected eos, packet_seq={}", - callback->response_->packet_seq())); - } + break; } if (callback->response_->has_empty_batch() && callback->response_->empty_batch()) { @@ -215,13 +255,10 @@ arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool first_fetch_for_ini continue; } - if (first_fetch_for_init) { + std::call_once(_timezone_once_flag, [this, callback] { DCHECK(callback->response_->has_timezone()); - DCHECK(callback->response_->has_fields_labels()); - _timezone = callback->response_->timezone(); - TimezoneUtils::find_cctz_time_zone(_timezone, _timezone_obj); - _arrow_schema_field_names = callback->response_->fields_labels(); - } + TimezoneUtils::find_cctz_time_zone(callback->response_->timezone(), _timezone_obj); + }); { SCOPED_ATOMIC_TIMER(&_deserialize_block_timer); @@ -231,47 +268,30 @@ arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool first_fetch_for_ini break; } - if (!first_fetch_for_init) { - const auto rows = _block->rows(); - if (rows == 0) { - _block = nullptr; - continue; - } + const auto rows = _block->rows(); + if (rows == 0) { + _block = nullptr; + continue; } } return arrow::Status::OK(); } arrow::Status ArrowFlightBatchRemoteReader::init_schema() { - SCOPED_ATTACH_TASK(_mem_tracker); - ARROW_RETURN_NOT_OK(_fetch_data(true)); - if (_block == nullptr) { - return _return_invalid_status("failed to fetch data for schema"); - } - RETURN_ARROW_STATUS_IF_ERROR(get_arrow_schema_from_block(*_block, &_schema, _timezone)); - - // Block does not contain the real column name (label), for example: select avg(k) from tbl - // - Block.name: type=decimal(38, 9) - // - Real column name (label): avg(k) - // so, the first fetch data Block will return the actual column name, and then modify the schema. - std::vector arrow_schema_field_names = split(_arrow_schema_field_names, ","); - std::vector> fields; - for (int i = 0; i < arrow_schema_field_names.size(); i++) { - fields.push_back(_schema->fields()[i]->WithName(arrow_schema_field_names[i])); - } - _schema = arrow::schema(std::move(fields)); + ARROW_RETURN_NOT_OK(_fetch_schema()); + DCHECK(_schema != nullptr); return arrow::Status::OK(); } arrow::Status ArrowFlightBatchRemoteReader::ReadNext(std::shared_ptr* out) { // parameter *out not nullptr *out = nullptr; + SCOPED_ATTACH_TASK(_mem_tracker); + ARROW_RETURN_NOT_OK(_fetch_data()); if (_block == nullptr) { - // eof, normal path end - // last ReadNext -> _fetch_data return block is nullptr + // eof, normal path end, last _fetch_data return block is nullptr return arrow::Status::OK(); } - SCOPED_ATTACH_TASK(_mem_tracker); { // convert one batch SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer); @@ -281,7 +301,6 @@ arrow::Status ArrowFlightBatchRemoteReader::ReadNext(std::shared_ptrnum_rows() << ", " diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.h b/be/src/service/arrow_flight/arrow_flight_batch_reader.h index 1b5681fca98d851..612ebc8063ca373 100644 --- a/be/src/service/arrow_flight/arrow_flight_batch_reader.h +++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.h @@ -93,15 +93,13 @@ class ArrowFlightBatchRemoteReader : public ArrowFlightBatchReaderBase { private: ArrowFlightBatchRemoteReader(const std::shared_ptr& statement, const std::shared_ptr& stub); - // If first_fetch_for_init is true, some additional information will be returned - // to initialize the schema. in this case, the fetched block is allowed to be empty, - // but eos is not expected to be returned. - arrow::Status _fetch_data(bool first_fetch_for_init); + + arrow::Status _fetch_schema(); + arrow::Status _fetch_data(); std::shared_ptr _brpc_stub = nullptr; - std::string _timezone; + std::once_flag _timezone_once_flag; std::shared_ptr _block; - std::string _arrow_schema_field_names; }; } // namespace flight diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 88919c290d84072..d3f1defb4b0bd10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1187,7 +1187,7 @@ public enum IgnoreSplitType { private boolean enableSyncRuntimeFilterSize = true; @VariableMgr.VarAttr(name = ENABLE_PARALLEL_RESULT_SINK, needForward = true, fuzzy = true) - private boolean enableParallelResultSink = true; + private boolean enableParallelResultSink = false; @VariableMgr.VarAttr(name = "sort_phase_num", fuzzy = true, needForward = true, description = {"如设置为1,则只生成1阶段sort,设置为2,则只生成2阶段sort,设置其它值,优化器根据代价选择sort类型", diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 1e88b02f7a93d3d..6a9339459332e15 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -295,7 +295,6 @@ message PFetchArrowDataResult { optional PBlock block = 4; optional bool empty_batch = 5; optional string timezone = 6; - optional string fields_labels = 7; }; message PFetchArrowFlightSchemaRequest {