diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 02ebefbc0f32d0..72253238909dc6 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -65,6 +65,7 @@ DEFINE_Int32(brpc_port, "8060"); DEFINE_Int32(arrow_flight_sql_port, "-1"); DEFINE_mString(public_access_ip, ""); +DEFINE_Int32(public_access_port, "-1"); // the number of bthreads for brpc, the default value is set to -1, // which means the number of bthreads is #cpu-cores @@ -535,6 +536,8 @@ DEFINE_Int32(brpc_light_work_pool_threads, "-1"); DEFINE_Int32(brpc_heavy_work_pool_max_queue_size, "-1"); DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1"); DEFINE_mBool(enable_bthread_transmit_block, "true"); +DEFINE_Int32(brpc_arrow_flight_work_pool_threads, "-1"); +DEFINE_Int32(brpc_arrow_flight_work_pool_max_queue_size, "-1"); //Enable brpc builtin services, see: //https://brpc.apache.org/docs/server/basics/#disable-built-in-services-completely @@ -643,7 +646,11 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5"); // result buffer cancelled time (unit: second) DEFINE_mInt32(result_buffer_cancelled_interval_time, "300"); +// arrow flight result sink buffer rows size, default 4096 * 8 DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768"); +// The timeout for ADBC Client to wait for data using arrow flight reader. +// If the query is very complex and no result is generated after this time, consider increasing this timeout. +DEFINE_mInt32(arrow_flight_reader_brpc_controller_timeout_ms, "300000"); // the increased frequency of priority for remaining tasks in BlockingPriorityQueue DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 56d5e8e648a86c..0944669c747f08 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -104,6 +104,7 @@ DECLARE_Int32(arrow_flight_sql_port); // For ADBC client fetch result, default is empty, the ADBC client uses the backend ip to fetch the result. // If ADBC client cannot access the backend ip, can set public_access_ip to modify the fetch result ip. DECLARE_mString(public_access_ip); +DECLARE_Int32(public_access_port); // the number of bthreads for brpc, the default value is set to -1, // which means the number of bthreads is #cpu-cores @@ -584,6 +585,8 @@ DECLARE_Int32(brpc_light_work_pool_threads); DECLARE_Int32(brpc_heavy_work_pool_max_queue_size); DECLARE_Int32(brpc_light_work_pool_max_queue_size); DECLARE_mBool(enable_bthread_transmit_block); +DECLARE_Int32(brpc_arrow_flight_work_pool_threads); +DECLARE_Int32(brpc_arrow_flight_work_pool_max_queue_size); // The maximum amount of data that can be processed by a stream load DECLARE_mInt64(streaming_load_max_mb); @@ -693,6 +696,9 @@ DECLARE_mInt32(result_buffer_cancelled_interval_time); // arrow flight result sink buffer rows size, default 4096 * 8 DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows); +// The timeout for ADBC Client to wait for data using arrow flight reader. +// If the query is very complex and no result is generated after this time, consider increasing this timeout. +DECLARE_mInt32(arrow_flight_reader_brpc_controller_timeout_ms); // the increased frequency of priority for remaining tasks in BlockingPriorityQueue DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency); diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp index b9f18c43e1e239..874b6fd1ab1aa0 100644 --- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp +++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp @@ -104,7 +104,7 @@ Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, vectorized::Block* { SCOPED_TIMER(local_state._get_arrow_schema_timer); // After expr executed, use recaculated schema as final schema - RETURN_IF_ERROR(get_arrow_schema(block, &block_arrow_schema)); + RETURN_IF_ERROR(get_arrow_schema_from_block(block, &block_arrow_schema, state->timezone())); } { SCOPED_TIMER(local_state._convert_block_to_arrow_batch_timer); diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index bc4e4c88d14ca7..c65b9dda89d0ec 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -72,9 +72,8 @@ Status ResultFileSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::open(state)); RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); if (state->query_options().enable_parallel_outfile) { - RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->query_id(), _buf_size, &_sender, state->execution_timeout(), - state->batch_size())); + RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->query_id(), _buf_size, + &_sender, state)); } return vectorized::VExpr::open(_output_vexpr_ctxs, state); } @@ -92,8 +91,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i _sender = _parent->cast()._sender; } else { RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->fragment_instance_id(), p._buf_size, &_sender, state->execution_timeout(), - state->batch_size())); + state->fragment_instance_id(), p._buf_size, &_sender, state)); } _sender->set_dependency(state->fragment_instance_id(), _dependency->shared_from_this()); diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index b8faa4f76f7a30..f8196910021b2c 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -18,6 +18,7 @@ #include "result_sink_operator.h" #include +#include #include @@ -45,15 +46,25 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1); auto fragment_instance_id = state->fragment_instance_id(); + auto& p = _parent->cast(); if (state->query_options().enable_parallel_result_sink) { _sender = _parent->cast()._sender; } else { - auto& p = _parent->cast(); RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - fragment_instance_id, p._result_sink_buffer_size_rows, &_sender, - state->execution_timeout(), state->batch_size())); + fragment_instance_id, p._result_sink_buffer_size_rows, &_sender, state)); } _sender->set_dependency(fragment_instance_id, _dependency->shared_from_this()); + + _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size()); + for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); + } + if (p._sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) { + std::shared_ptr arrow_schema; + RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema, + state->timezone())); + _sender->register_arrow_schema(arrow_schema); + } return Status::OK(); } @@ -62,10 +73,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(Base::open(state)); auto& p = _parent->cast(); - _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size()); - for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { - RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); - } // create writer based on sink type switch (p._sink_type) { case TResultSinkType::MYSQL_PROTOCAL: { @@ -79,16 +86,8 @@ Status ResultSinkLocalState::open(RuntimeState* state) { break; } case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { - std::shared_ptr arrow_schema; - RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema)); - if (state->query_options().enable_parallel_result_sink) { - state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), arrow_schema); - } else { - state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(), - arrow_schema); - } _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter( - _sender.get(), _output_vexpr_ctxs, _profile, arrow_schema)); + _sender.get(), _output_vexpr_ctxs, _profile)); break; } default: @@ -133,8 +132,7 @@ Status ResultSinkOperatorX::open(RuntimeState* state) { if (state->query_options().enable_parallel_result_sink) { RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->query_id(), _result_sink_buffer_size_rows, &_sender, - state->execution_timeout(), state->batch_size())); + state->query_id(), _result_sink_buffer_size_rows, &_sender, state)); } return vectorized::VExpr::open(_output_vexpr_ctxs, state); } diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 61ea5ef080de5f..98feb85ad6b9c2 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -33,9 +33,11 @@ #include "arrow/record_batch.h" #include "arrow/type_fwd.h" #include "pipeline/dependency.h" -#include "runtime/exec_env.h" #include "runtime/thread_context.h" +#include "util/runtime_profile.h" +#include "util/string_util.h" #include "util/thrift_util.h" +#include "vec/core/block.h" namespace doris { @@ -93,14 +95,80 @@ void GetResultBatchCtx::on_data(const std::unique_ptr& t_resul delete this; } -BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, int batch_size) +void GetArrowResultBatchCtx::on_failure(const Status& status) { + DCHECK(!status.ok()) << "status is ok, errmsg=" << status; + status.to_protobuf(result->mutable_status()); + delete this; +} + +void GetArrowResultBatchCtx::on_close(int64_t packet_seq) { + Status status; + status.to_protobuf(result->mutable_status()); + result->set_packet_seq(packet_seq); + result->set_eos(true); + delete this; +} + +void GetArrowResultBatchCtx::on_data( + const std::shared_ptr& block, int64_t packet_seq, int be_exec_version, + segment_v2::CompressionTypePB fragement_transmission_compression_type, std::string timezone, + RuntimeProfile::Counter* serialize_batch_ns_timer, + RuntimeProfile::Counter* uncompressed_bytes_counter, + RuntimeProfile::Counter* compressed_bytes_counter) { + Status st = Status::OK(); + if (result != nullptr) { + size_t uncompressed_bytes = 0, compressed_bytes = 0; + SCOPED_TIMER(serialize_batch_ns_timer); + st = block->serialize(be_exec_version, result->mutable_block(), &uncompressed_bytes, + &compressed_bytes, fragement_transmission_compression_type, false); + COUNTER_UPDATE(uncompressed_bytes_counter, uncompressed_bytes); + COUNTER_UPDATE(compressed_bytes_counter, compressed_bytes); + if (st.ok()) { + result->set_packet_seq(packet_seq); + result->set_eos(false); + if (packet_seq == 0) { + result->set_timezone(timezone); + } + } else { + result->clear_block(); + result->set_packet_seq(packet_seq); + LOG(WARNING) << "TFetchDataResult serialize failed, errmsg=" << st; + } + } else { + result->set_empty_batch(true); + result->set_packet_seq(packet_seq); + result->set_eos(false); + } + + /// The size limit of proto buffer message is 2G + if (result->ByteSizeLong() > std::numeric_limits::max()) { + st = Status::InternalError("Message size exceeds 2GB: {}", result->ByteSizeLong()); + result->clear_block(); + } + st.to_protobuf(result->mutable_status()); + delete this; +} + +BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* state) : _fragment_id(id), _is_close(false), _is_cancelled(false), _buffer_limit(buffer_size), _packet_num(0), - _batch_size(batch_size) { + _batch_size(state->batch_size()), + _timezone(state->timezone()), + _timezone_obj(state->timezone_obj()), + _be_exec_version(state->be_exec_version()), + _fragement_transmission_compression_type( + state->fragement_transmission_compression_type()), + _profile("BufferControlBlock " + print_id(_fragment_id)) { _query_statistics = std::make_unique(); + _serialize_batch_ns_timer = ADD_TIMER(&_profile, "SerializeBatchNsTime"); + _uncompressed_bytes_counter = ADD_COUNTER(&_profile, "UncompressedBytes", TUnit::BYTES); + _compressed_bytes_counter = ADD_COUNTER(&_profile, "CompressedBytes", TUnit::BYTES); + _mem_tracker = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::QUERY, + fmt::format("BufferControlBlock#FragmentInstanceId={}", print_id(_fragment_id))); } BufferControlBlock::~BufferControlBlock() { @@ -148,36 +216,44 @@ Status BufferControlBlock::add_batch(RuntimeState* state, } Status BufferControlBlock::add_arrow_batch(RuntimeState* state, - std::shared_ptr& result) { + std::shared_ptr& result) { std::unique_lock l(_lock); if (_is_cancelled) { return Status::Cancelled("Cancelled"); } - int num_rows = result->num_rows(); - - // TODO: merge RocordBatch, ToStructArray -> Make again + if (_waiting_arrow_result_batch_rpc.empty()) { + // TODO: Merge result into block to reduce rpc times + int num_rows = result->rows(); + _arrow_flight_result_batch_queue.push_back(std::move(result)); + _instance_rows_in_queue.emplace_back(); + _instance_rows[state->fragment_instance_id()] += num_rows; + _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; + _arrow_data_arrival + .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr,) + } else { + auto* ctx = _waiting_arrow_result_batch_rpc.front(); + _waiting_arrow_result_batch_rpc.pop_front(); + ctx->on_data(result, _packet_num, _be_exec_version, + _fragement_transmission_compression_type, _timezone, _serialize_batch_ns_timer, + _uncompressed_bytes_counter, _compressed_bytes_counter); + _packet_num++; + } - _arrow_flight_batch_queue.push_back(std::move(result)); - _instance_rows_in_queue.emplace_back(); - _instance_rows[state->fragment_instance_id()] += num_rows; - _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; - _arrow_data_arrival.notify_one(); _update_dependency(); return Status::OK(); } void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { std::lock_guard l(_lock); + Defer defer {[&]() { _update_dependency(); }}; if (!_status.ok()) { ctx->on_failure(_status); - _update_dependency(); return; } if (_is_cancelled) { ctx->on_failure(Status::Cancelled("Cancelled")); - _update_dependency(); return; } if (!_fe_result_batch_queue.empty()) { @@ -191,54 +267,132 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { ctx->on_data(result, _packet_num); _packet_num++; - _update_dependency(); return; } if (_is_close) { ctx->on_close(_packet_num, _query_statistics.get()); - _update_dependency(); return; } // no ready data, push ctx to waiting list _waiting_rpc.push_back(ctx); - _update_dependency(); } -Status BufferControlBlock::get_arrow_batch(std::shared_ptr* result) { +Status BufferControlBlock::get_arrow_batch(std::shared_ptr* result, + cctz::time_zone& timezone_obj) { std::unique_lock l(_lock); + Defer defer {[&]() { _update_dependency(); }}; if (!_status.ok()) { return _status; } if (_is_cancelled) { - return Status::Cancelled("Cancelled"); + return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id))); } - while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) { - _arrow_data_arrival.wait_for(l, std::chrono::seconds(1)); + while (_arrow_flight_result_batch_queue.empty() && !_is_cancelled && !_is_close) { + _arrow_data_arrival.wait_for(l, std::chrono::milliseconds(20)); } if (_is_cancelled) { - return Status::Cancelled("Cancelled"); + return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id))); } - if (!_arrow_flight_batch_queue.empty()) { - *result = std::move(_arrow_flight_batch_queue.front()); - _arrow_flight_batch_queue.pop_front(); + if (!_arrow_flight_result_batch_queue.empty()) { + *result = std::move(_arrow_flight_result_batch_queue.front()); + _arrow_flight_result_batch_queue.pop_front(); + timezone_obj = _timezone_obj; + for (auto it : _instance_rows_in_queue.front()) { _instance_rows[it.first] -= it.second; } _instance_rows_in_queue.pop_front(); _packet_num++; - _update_dependency(); return Status::OK(); } // normal path end if (_is_close) { - _update_dependency(); + std::stringstream ss; + _profile.pretty_print(&ss); + VLOG_NOTICE << fmt::format( + "BufferControlBlock finished, fragment_id={}, is_close={}, is_cancelled={}, " + "packet_num={}, peak_memory_usage={}, profile={}", + print_id(_fragment_id), _is_close, _is_cancelled, _packet_num, + _mem_tracker->peak_consumption(), ss.str()); + return Status::OK(); + } + return Status::InternalError( + fmt::format("Get Arrow Batch Abnormal Ending ()", print_id(_fragment_id))); +} + +void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) { + std::unique_lock l(_lock); + SCOPED_ATTACH_TASK(_mem_tracker); + Defer defer {[&]() { _update_dependency(); }}; + if (!_status.ok()) { + ctx->on_failure(_status); + return; + } + if (_is_cancelled) { + ctx->on_failure(Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id)))); + return; + } + + if (!_arrow_flight_result_batch_queue.empty()) { + auto block = _arrow_flight_result_batch_queue.front(); + _arrow_flight_result_batch_queue.pop_front(); + for (auto it : _instance_rows_in_queue.front()) { + _instance_rows[it.first] -= it.second; + } + _instance_rows_in_queue.pop_front(); + + ctx->on_data(block, _packet_num, _be_exec_version, _fragement_transmission_compression_type, + _timezone, _serialize_batch_ns_timer, _uncompressed_bytes_counter, + _compressed_bytes_counter); + _packet_num++; + return; + } + + // normal path end + if (_is_close) { + ctx->on_close(_packet_num); + std::stringstream ss; + _profile.pretty_print(&ss); + VLOG_NOTICE << fmt::format( + "BufferControlBlock finished, fragment_id={}, is_close={}, is_cancelled={}, " + "packet_num={}, peak_memory_usage={}, profile={}", + print_id(_fragment_id), _is_close, _is_cancelled, _packet_num, + _mem_tracker->peak_consumption(), ss.str()); + return; + } + // no ready data, push ctx to waiting list + _waiting_arrow_result_batch_rpc.push_back(ctx); +} + +void BufferControlBlock::register_arrow_schema(const std::shared_ptr& arrow_schema) { + std::lock_guard l(_lock); + _arrow_schema = arrow_schema; +} + +Status BufferControlBlock::find_arrow_schema(std::shared_ptr* arrow_schema) { + std::unique_lock l(_lock); + if (!_status.ok()) { + return _status; + } + if (_is_cancelled) { + return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id))); + } + + // normal path end + if (_arrow_schema != nullptr) { + *arrow_schema = _arrow_schema; return Status::OK(); } - return Status::InternalError("Get Arrow Batch Abnormal Ending"); + + if (_is_close) { + return Status::RuntimeError(fmt::format("Closed ()", print_id(_fragment_id))); + } + return Status::InternalError( + fmt::format("Get Arrow Schema Abnormal Ending ()", print_id(_fragment_id))); } Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { @@ -272,18 +426,37 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { } _waiting_rpc.clear(); } + + if (!_waiting_arrow_result_batch_rpc.empty()) { + if (_status.ok()) { + for (auto& ctx : _waiting_arrow_result_batch_rpc) { + ctx->on_close(_packet_num); + } + } else { + for (auto& ctx : _waiting_arrow_result_batch_rpc) { + ctx->on_failure(_status); + } + } + _waiting_arrow_result_batch_rpc.clear(); + } return Status::OK(); } void BufferControlBlock::cancel() { std::unique_lock l(_lock); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); _is_cancelled = true; _arrow_data_arrival.notify_all(); for (auto& ctx : _waiting_rpc) { ctx->on_failure(Status::Cancelled("Cancelled")); } _waiting_rpc.clear(); + for (auto& ctx : _waiting_arrow_result_batch_rpc) { + ctx->on_failure(Status::Cancelled("Cancelled")); + } + _waiting_arrow_result_batch_rpc.clear(); _update_dependency(); + _arrow_flight_result_batch_queue.clear(); } void BufferControlBlock::set_dependency( diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 8b45552b2fadb1..a75b670836d121 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -17,6 +17,8 @@ #pragma once +#include +#include #include #include #include @@ -52,7 +54,12 @@ namespace pipeline { class Dependency; } // namespace pipeline +namespace vectorized { +class Block; +} // namespace vectorized + class PFetchDataResult; +class PFetchArrowDataResult; struct GetResultBatchCtx { brpc::Controller* cntl = nullptr; @@ -69,18 +76,44 @@ struct GetResultBatchCtx { bool eos = false); }; +struct GetArrowResultBatchCtx { + brpc::Controller* cntl = nullptr; + PFetchArrowDataResult* result = nullptr; + google::protobuf::Closure* done = nullptr; + + GetArrowResultBatchCtx(brpc::Controller* cntl_, PFetchArrowDataResult* result_, + google::protobuf::Closure* done_) + : cntl(cntl_), result(result_), done(done_) {} + + void on_failure(const Status& status); + void on_close(int64_t packet_seq); + void on_data(const std::shared_ptr& block, int64_t packet_seq, + int be_exec_version, + segment_v2::CompressionTypePB fragement_transmission_compression_type, + std::string timezone, RuntimeProfile::Counter* serialize_batch_ns_timer, + RuntimeProfile::Counter* uncompressed_bytes_counter, + RuntimeProfile::Counter* compressed_bytes_counter); +}; + // buffer used for result customer and producer class BufferControlBlock { public: - BufferControlBlock(const TUniqueId& id, int buffer_size, int batch_size); + BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* state); ~BufferControlBlock(); Status init(); Status add_batch(RuntimeState* state, std::unique_ptr& result); - Status add_arrow_batch(RuntimeState* state, std::shared_ptr& result); + Status add_arrow_batch(RuntimeState* state, std::shared_ptr& result); void get_batch(GetResultBatchCtx* ctx); - Status get_arrow_batch(std::shared_ptr* result); + // for ArrowFlightBatchLocalReader + Status get_arrow_batch(std::shared_ptr* result, + cctz::time_zone& timezone_obj); + // for ArrowFlightBatchRemoteReader + void get_arrow_batch(GetArrowResultBatchCtx* ctx); + + void register_arrow_schema(const std::shared_ptr& arrow_schema); + Status find_arrow_schema(std::shared_ptr* arrow_schema); // close buffer block, set _status to exec_status and set _is_close to true; // called because data has been read or error happened. @@ -89,6 +122,7 @@ class BufferControlBlock { void cancel(); [[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; } + [[nodiscard]] std::shared_ptr mem_tracker() { return _mem_tracker; } void update_return_rows(int64_t num_rows) { // _query_statistics may be null when the result sink init failed @@ -106,7 +140,7 @@ class BufferControlBlock { void _update_dependency(); using FeResultQueue = std::list>; - using ArrowFlightResultQueue = std::list>; + using ArrowFlightResultQueue = std::list>; // result's query id TUniqueId _fragment_id; @@ -118,7 +152,9 @@ class BufferControlBlock { // blocking queue for batch FeResultQueue _fe_result_batch_queue; - ArrowFlightResultQueue _arrow_flight_batch_queue; + ArrowFlightResultQueue _arrow_flight_result_batch_queue; + // for arrow flight + std::shared_ptr _arrow_schema; // protects all subsequent data in this block std::mutex _lock; @@ -128,6 +164,7 @@ class BufferControlBlock { std::condition_variable _arrow_data_arrival; std::deque _waiting_rpc; + std::deque _waiting_arrow_result_batch_rpc; // only used for FE using return rows to check limit std::unique_ptr _query_statistics; @@ -137,6 +174,17 @@ class BufferControlBlock { std::list> _instance_rows_in_queue; int _batch_size; + std::string _timezone; + cctz::time_zone _timezone_obj; + int _be_exec_version; + segment_v2::CompressionTypePB _fragement_transmission_compression_type; + std::shared_ptr _mem_tracker; + + // only used for ArrowFlightBatchRemoteReader + RuntimeProfile _profile; + RuntimeProfile::Counter* _serialize_batch_ns_timer = nullptr; + RuntimeProfile::Counter* _uncompressed_bytes_counter = nullptr; + RuntimeProfile::Counter* _compressed_bytes_counter = nullptr; }; } // namespace doris diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index ccbf0c3ff6729e..ecc3d56773ca82 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -67,8 +67,8 @@ Status ResultBufferMgr::init() { } Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size, - std::shared_ptr* sender, int exec_timout, - int batch_size) { + std::shared_ptr* sender, + RuntimeState* state) { *sender = find_control_block(query_id); if (*sender != nullptr) { LOG(WARNING) << "already have buffer control block for this instance " << query_id; @@ -77,7 +77,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size std::shared_ptr control_block = nullptr; - control_block = std::make_shared(query_id, buffer_size, batch_size); + control_block = std::make_shared(query_id, buffer_size, state); { std::unique_lock wlock(_buffer_map_lock); @@ -87,7 +87,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size // otherwise in some case may block all fragment handle threads // details see issue https://github.com/apache/doris/issues/16203 // add extra 5s for avoid corner case - int64_t max_timeout = time(nullptr) + exec_timout + 5; + int64_t max_timeout = time(nullptr) + state->execution_timeout() + 5; cancel_at_time(max_timeout, query_id); } *sender = control_block; @@ -105,27 +105,19 @@ std::shared_ptr ResultBufferMgr::find_control_block(const TU return {}; } -void ResultBufferMgr::register_arrow_schema(const TUniqueId& query_id, - const std::shared_ptr& arrow_schema) { - std::unique_lock wlock(_arrow_schema_map_lock); - _arrow_schema_map.insert(std::make_pair(query_id, arrow_schema)); -} - -std::shared_ptr ResultBufferMgr::find_arrow_schema(const TUniqueId& query_id) { - std::shared_lock rlock(_arrow_schema_map_lock); - auto iter = _arrow_schema_map.find(query_id); - - if (_arrow_schema_map.end() != iter) { - return iter->second; +Status ResultBufferMgr::find_arrow_schema(const TUniqueId& finst_id, + std::shared_ptr* schema) { + std::shared_ptr cb = find_control_block(finst_id); + if (cb == nullptr) { + return Status::InternalError( + "no arrow schema for this query, maybe query has been canceled, finst_id={}", + print_id(finst_id)); } - - return nullptr; + return cb->find_arrow_schema(schema); } void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx) { - TUniqueId tid; - tid.__set_hi(finst_id.hi()); - tid.__set_lo(finst_id.lo()); + TUniqueId tid = UniqueId(finst_id).to_thrift(); std::shared_ptr cb = find_control_block(tid); if (cb == nullptr) { ctx->on_failure(Status::InternalError("no result for this query, tid={}", print_id(tid))); @@ -134,16 +126,43 @@ void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* c cb->get_batch(ctx); } +Status ResultBufferMgr::find_mem_tracker(const TUniqueId& finst_id, + std::shared_ptr* mem_tracker) { + std::shared_ptr cb = find_control_block(finst_id); + if (cb == nullptr) { + return Status::InternalError( + "no result for this query, maybe query has been canceled, finst_id={}", + print_id(finst_id)); + } + *mem_tracker = cb->mem_tracker(); + return Status::OK(); +} + Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id, - std::shared_ptr* result) { + std::shared_ptr* result, + cctz::time_zone& timezone_obj) { std::shared_ptr cb = find_control_block(finst_id); if (cb == nullptr) { - return Status::InternalError("no result for this query, finst_id={}", print_id(finst_id)); + return Status::InternalError( + "no result for this query, maybe query has been canceled, finst_id={}", + print_id(finst_id)); } - RETURN_IF_ERROR(cb->get_arrow_batch(result)); + RETURN_IF_ERROR(cb->get_arrow_batch(result, timezone_obj)); return Status::OK(); } +void ResultBufferMgr::fetch_arrow_data(const PUniqueId& finst_id, GetArrowResultBatchCtx* ctx) { + TUniqueId tid = UniqueId(finst_id).to_thrift(); + std::shared_ptr cb = find_control_block(tid); + if (cb == nullptr) { + ctx->on_failure(Status::InternalError( + "no result for this query, maybe query has been canceled, finst_id={}", + print_id(tid))); + return; + } + cb->get_arrow_batch(ctx); +} + void ResultBufferMgr::cancel(const TUniqueId& query_id) { { std::unique_lock wlock(_buffer_map_lock); @@ -154,15 +173,6 @@ void ResultBufferMgr::cancel(const TUniqueId& query_id) { _buffer_map.erase(iter); } } - - { - std::unique_lock wlock(_arrow_schema_map_lock); - auto arrow_schema_iter = _arrow_schema_map.find(query_id); - - if (_arrow_schema_map.end() != arrow_schema_iter) { - _arrow_schema_map.erase(arrow_schema_iter); - } - } } void ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& query_id) { diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index 8bac69c23ac522..1efa0a544f1961 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -17,7 +17,9 @@ #pragma once +#include #include +#include #include #include @@ -41,8 +43,14 @@ namespace doris { class BufferControlBlock; struct GetResultBatchCtx; +struct GetArrowResultBatchCtx; class PUniqueId; +class RuntimeState; +class MemTrackerLimiter; class Thread; +namespace vectorized { +class Block; +} // namespace vectorized // manage all result buffer control block in one backend class ResultBufferMgr { @@ -58,17 +66,18 @@ class ResultBufferMgr { // the returned sender do not need release // sender is not used when call cancel or unregister Status create_sender(const TUniqueId& query_id, int buffer_size, - std::shared_ptr* sender, int exec_timeout, - int batch_size); + std::shared_ptr* sender, RuntimeState* state); // fetch data result to FE void fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx); - // fetch data result to Arrow Flight Server - Status fetch_arrow_data(const TUniqueId& finst_id, std::shared_ptr* result); - - void register_arrow_schema(const TUniqueId& query_id, - const std::shared_ptr& arrow_schema); - std::shared_ptr find_arrow_schema(const TUniqueId& query_id); + // fetch data result to Arrow Flight Client + Status fetch_arrow_data(const TUniqueId& finst_id, std::shared_ptr* result, + cctz::time_zone& timezone_obj); + // fetch data result to Other BE forwards to Client + void fetch_arrow_data(const PUniqueId& finst_id, GetArrowResultBatchCtx* ctx); + Status find_mem_tracker(const TUniqueId& finst_id, + std::shared_ptr* mem_tracker); + Status find_arrow_schema(const TUniqueId& query_id, std::shared_ptr* schema); // cancel void cancel(const TUniqueId& fragment_id); @@ -79,7 +88,6 @@ class ResultBufferMgr { private: using BufferMap = std::unordered_map>; using TimeoutMap = std::map>; - using ArrowSchemaMap = std::unordered_map>; std::shared_ptr find_control_block(const TUniqueId& query_id); @@ -91,10 +99,6 @@ class ResultBufferMgr { std::shared_mutex _buffer_map_lock; // buffer block map BufferMap _buffer_map; - // lock for arrow schema map - std::shared_mutex _arrow_schema_map_lock; - // for arrow flight - ArrowSchemaMap _arrow_schema_map; // lock for timeout map std::mutex _timeout_lock; diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp index a07e479d759be7..e935aff996d55e 100644 --- a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp +++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp @@ -17,53 +17,294 @@ #include "service/arrow_flight/arrow_flight_batch_reader.h" +#include +#include #include +#include +#include -#include "arrow/builder.h" #include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "runtime/result_buffer_mgr.h" +#include "runtime/thread_context.h" +#include "service/backend_options.h" +#include "util/arrow/block_convertor.h" #include "util/arrow/row_batch.h" #include "util/arrow/utils.h" +#include "util/brpc_client_cache.h" +#include "util/ref_count_closure.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" -namespace doris { -namespace flight { +namespace doris::flight { -std::shared_ptr ArrowFlightBatchReader::schema() const { - return schema_; +ArrowFlightBatchReaderBase::ArrowFlightBatchReaderBase( + const std::shared_ptr& statement) + : _statement(statement) {} + +std::shared_ptr ArrowFlightBatchReaderBase::schema() const { + return _schema; +} + +arrow::Status ArrowFlightBatchReaderBase::_return_invalid_status(const std::string& msg) { + std::string status_msg = + fmt::format("ArrowFlightBatchReader {}, packet_seq={}, result={}:{}, finistId={}", msg, + _packet_seq, _statement->result_addr.hostname, _statement->result_addr.port, + print_id(_statement->query_id)); + LOG(WARNING) << status_msg; + return arrow::Status::Invalid(status_msg); } -ArrowFlightBatchReader::ArrowFlightBatchReader(std::shared_ptr statement, - std::shared_ptr schema) - : statement_(std::move(statement)), schema_(std::move(schema)) {} +ArrowFlightBatchReaderBase::~ArrowFlightBatchReaderBase() { + VLOG_NOTICE << fmt::format( + "ArrowFlightBatchReader finished, packet_seq={}, result_addr={}:{}, finistId={}, " + "convert_arrow_batch_timer={}, deserialize_block_timer={}, peak_memory_usage={}", + _packet_seq, _statement->result_addr.hostname, _statement->result_addr.port, + print_id(_statement->query_id), _convert_arrow_batch_timer, _deserialize_block_timer, + _mem_tracker->peak_consumption()); +} -arrow::Result> ArrowFlightBatchReader::Create( - const std::shared_ptr& statement_) { +ArrowFlightBatchLocalReader::ArrowFlightBatchLocalReader( + const std::shared_ptr& statement, + const std::shared_ptr& schema, + const std::shared_ptr& mem_tracker) + : ArrowFlightBatchReaderBase(statement) { + _schema = schema; + _mem_tracker = mem_tracker; +} + +arrow::Result> ArrowFlightBatchLocalReader::Create( + const std::shared_ptr& statement) { + DCHECK(statement->result_addr.hostname == BackendOptions::get_localhost()); // Make sure that FE send the fragment to BE and creates the BufferControlBlock before returning ticket // to the ADBC client, so that the schema and control block can be found. - auto schema = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement_->query_id); - if (schema == nullptr) { - ARROW_RETURN_NOT_OK(arrow::Status::Invalid(fmt::format( - "Client not found arrow flight schema, maybe query has been canceled, queryid: {}", - print_id(statement_->query_id)))); + std::shared_ptr schema; + RETURN_ARROW_STATUS_IF_ERROR( + ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement->query_id, &schema)); + std::shared_ptr mem_tracker; + RETURN_ARROW_STATUS_IF_ERROR(ExecEnv::GetInstance()->result_mgr()->find_mem_tracker( + statement->query_id, &mem_tracker)); + + std::shared_ptr result( + new ArrowFlightBatchLocalReader(statement, schema, mem_tracker)); + return result; +} + +arrow::Status ArrowFlightBatchLocalReader::ReadNext(std::shared_ptr* out) { + // parameter *out not nullptr + *out = nullptr; + SCOPED_ATTACH_TASK(_mem_tracker); + std::shared_ptr result; + auto st = ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(_statement->query_id, &result, + _timezone_obj); + st.prepend("ArrowFlightBatchLocalReader fetch arrow data failed"); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + if (result == nullptr) { + // eof, normal path end + return arrow::Status::OK(); + } + + { + // convert one batch + SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer); + st = convert_to_arrow_batch(*result, _schema, arrow::default_memory_pool(), out, + _timezone_obj); + st.prepend("ArrowFlightBatchLocalReader convert block to arrow batch failed"); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + } + + _packet_seq++; + if (*out != nullptr) { + VLOG_NOTICE << "ArrowFlightBatchLocalReader read next: " << (*out)->num_rows() << ", " + << (*out)->num_columns() << ", packet_seq: " << _packet_seq; } - std::shared_ptr result(new ArrowFlightBatchReader(statement_, schema)); + return arrow::Status::OK(); +} + +ArrowFlightBatchRemoteReader::ArrowFlightBatchRemoteReader( + const std::shared_ptr& statement, + const std::shared_ptr& stub) + : ArrowFlightBatchReaderBase(statement), _brpc_stub(stub), _block(nullptr) { + _mem_tracker = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::QUERY, + fmt::format("ArrowFlightBatchRemoteReader#QueryId={}", print_id(_statement->query_id))); +} + +arrow::Result> ArrowFlightBatchRemoteReader::Create( + const std::shared_ptr& statement) { + std::shared_ptr stub = + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( + statement->result_addr); + if (!stub) { + std::string msg = fmt::format( + "ArrowFlightBatchRemoteReader get rpc stub failed, result_addr={}:{}, finistId={}", + statement->result_addr.hostname, statement->result_addr.port, + print_id(statement->query_id)); + LOG(WARNING) << msg; + return arrow::Status::Invalid(msg); + } + + std::shared_ptr result( + new ArrowFlightBatchRemoteReader(statement, stub)); + ARROW_RETURN_NOT_OK(result->init_schema()); return result; } -arrow::Status ArrowFlightBatchReader::ReadNext(std::shared_ptr* out) { - // *out not nullptr +arrow::Status ArrowFlightBatchRemoteReader::_fetch_schema() { + Status st; + auto request = std::make_shared(); + auto* pfinst_id = request->mutable_finst_id(); + pfinst_id->set_hi(_statement->query_id.hi); + pfinst_id->set_lo(_statement->query_id.lo); + auto callback = DummyBrpcCallback::create_shared(); + auto closure = AutoReleaseClosure< + PFetchArrowFlightSchemaRequest, + DummyBrpcCallback>::create_unique(request, callback); + callback->cntl_->set_timeout_ms(config::arrow_flight_reader_brpc_controller_timeout_ms); + callback->cntl_->ignore_eovercrowded(); + + _brpc_stub->fetch_arrow_flight_schema(closure->cntl_.get(), closure->request_.get(), + closure->response_.get(), closure.get()); + closure.release(); + callback->join(); + + if (callback->cntl_->Failed()) { + if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available( + _brpc_stub, _statement->result_addr.hostname, _statement->result_addr.port)) { + ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( + callback->cntl_->remote_side()); + } + auto error_code = callback->cntl_->ErrorCode(); + auto error_text = callback->cntl_->ErrorText(); + return _return_invalid_status(fmt::format("fetch schema error: {}, error_text: {}", + berror(error_code), error_text)); + } + st = Status::create(callback->response_->status()); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + + if (callback->response_->has_schema() && !callback->response_->schema().empty()) { + auto input = + arrow::io::BufferReader::FromString(std::string(callback->response_->schema())); + ARROW_ASSIGN_OR_RAISE(auto reader, + arrow::ipc::RecordBatchStreamReader::Open( + input.get(), arrow::ipc::IpcReadOptions::Defaults())); + _schema = reader->schema(); + } else { + return _return_invalid_status(fmt::format("fetch schema error: not find schema")); + } + return arrow::Status::OK(); +} + +arrow::Status ArrowFlightBatchRemoteReader::_fetch_data() { + DCHECK(_block == nullptr); + while (true) { + // if `continue` occurs, data is invalid, continue fetch, block is nullptr. + // if `break` occurs, fetch data successfully (block is not nullptr) or fetch eos. + Status st; + auto request = std::make_shared(); + auto* pfinst_id = request->mutable_finst_id(); + pfinst_id->set_hi(_statement->query_id.hi); + pfinst_id->set_lo(_statement->query_id.lo); + auto callback = DummyBrpcCallback::create_shared(); + auto closure = AutoReleaseClosure< + PFetchArrowDataRequest, + DummyBrpcCallback>::create_unique(request, callback); + callback->cntl_->set_timeout_ms(config::arrow_flight_reader_brpc_controller_timeout_ms); + callback->cntl_->ignore_eovercrowded(); + + _brpc_stub->fetch_arrow_data(closure->cntl_.get(), closure->request_.get(), + closure->response_.get(), closure.get()); + closure.release(); + callback->join(); + + if (callback->cntl_->Failed()) { + if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available( + _brpc_stub, _statement->result_addr.hostname, + _statement->result_addr.port)) { + ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( + callback->cntl_->remote_side()); + } + auto error_code = callback->cntl_->ErrorCode(); + auto error_text = callback->cntl_->ErrorText(); + return _return_invalid_status(fmt::format("fetch data error={}, error_text: {}", + berror(error_code), error_text)); + } + st = Status::create(callback->response_->status()); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + + DCHECK(callback->response_->has_packet_seq()); + if (_packet_seq != callback->response_->packet_seq()) { + return _return_invalid_status( + fmt::format("fetch data receive packet failed, expect: {}, receive: {}", + _packet_seq, callback->response_->packet_seq())); + } + _packet_seq++; + + if (callback->response_->has_eos() && callback->response_->eos()) { + break; + } + + if (callback->response_->has_empty_batch() && callback->response_->empty_batch()) { + continue; + } + + DCHECK(callback->response_->has_block()); + if (callback->response_->block().ByteSizeLong() == 0) { + continue; + } + + std::call_once(_timezone_once_flag, [this, callback] { + DCHECK(callback->response_->has_timezone()); + TimezoneUtils::find_cctz_time_zone(callback->response_->timezone(), _timezone_obj); + }); + + { + SCOPED_ATOMIC_TIMER(&_deserialize_block_timer); + _block = vectorized::Block::create_shared(); + st = _block->deserialize(callback->response_->block()); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + break; + } + + const auto rows = _block->rows(); + if (rows == 0) { + _block = nullptr; + continue; + } + } + return arrow::Status::OK(); +} + +arrow::Status ArrowFlightBatchRemoteReader::init_schema() { + ARROW_RETURN_NOT_OK(_fetch_schema()); + DCHECK(_schema != nullptr); + return arrow::Status::OK(); +} + +arrow::Status ArrowFlightBatchRemoteReader::ReadNext(std::shared_ptr* out) { + // parameter *out not nullptr *out = nullptr; - auto st = ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(statement_->query_id, out); - if (UNLIKELY(!st.ok())) { - LOG(WARNING) << "ArrowFlightBatchReader fetch arrow data failed: " + st.to_string(); + SCOPED_ATTACH_TASK(_mem_tracker); + ARROW_RETURN_NOT_OK(_fetch_data()); + if (_block == nullptr) { + // eof, normal path end, last _fetch_data return block is nullptr + return arrow::Status::OK(); + } + { + // convert one batch + SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer); + auto st = convert_to_arrow_batch(*_block, _schema, arrow::default_memory_pool(), out, + _timezone_obj); + st.prepend("ArrowFlightBatchRemoteReader convert block to arrow batch failed"); ARROW_RETURN_NOT_OK(to_arrow_status(st)); } + _block = nullptr; + if (*out != nullptr) { - VLOG_NOTICE << "ArrowFlightBatchReader read next: " << (*out)->num_rows() << ", " - << (*out)->num_columns(); + VLOG_NOTICE << "ArrowFlightBatchRemoteReader read next: " << (*out)->num_rows() << ", " + << (*out)->num_columns() << ", packet_seq: " << _packet_seq; } return arrow::Status::OK(); } -} // namespace flight -} // namespace doris +} // namespace doris::flight diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.h b/be/src/service/arrow_flight/arrow_flight_batch_reader.h index e0279cbb70da12..612ebc8063ca37 100644 --- a/be/src/service/arrow_flight/arrow_flight_batch_reader.h +++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.h @@ -17,40 +17,91 @@ #pragma once +#include #include #include +#include #include "arrow/record_batch.h" +#include "runtime/exec_env.h" namespace doris { + +namespace vectorized { +class Block; +} // namespace vectorized + namespace flight { struct QueryStatement { public: TUniqueId query_id; + TNetworkAddress result_addr; // BE brpc ip & port std::string sql; - QueryStatement(const TUniqueId& query_id_, const std::string& sql_) - : query_id(query_id_), sql(sql_) {} + QueryStatement(TUniqueId query_id_, TNetworkAddress result_addr_, std::string sql_) + : query_id(std::move(query_id_)), + result_addr(std::move(result_addr_)), + sql(std::move(sql_)) {} +}; + +class ArrowFlightBatchReaderBase : public arrow::RecordBatchReader { +public: + // RecordBatchReader force override + [[nodiscard]] std::shared_ptr schema() const override; + +protected: + ArrowFlightBatchReaderBase(const std::shared_ptr& statement); + ~ArrowFlightBatchReaderBase() override; + arrow::Status _return_invalid_status(const std::string& msg); + + std::shared_ptr _statement; + std::shared_ptr _schema; + cctz::time_zone _timezone_obj; + std::atomic _packet_seq = 0; + + std::atomic _convert_arrow_batch_timer = 0; + std::atomic _deserialize_block_timer = 0; + std::shared_ptr _mem_tracker; }; -class ArrowFlightBatchReader : public arrow::RecordBatchReader { +class ArrowFlightBatchLocalReader : public ArrowFlightBatchReaderBase { public: - static arrow::Result> Create( + static arrow::Result> Create( const std::shared_ptr& statement); - [[nodiscard]] std::shared_ptr schema() const override; + arrow::Status ReadNext(std::shared_ptr* out) override; +private: + ArrowFlightBatchLocalReader(const std::shared_ptr& statement, + const std::shared_ptr& schema, + const std::shared_ptr& mem_tracker); +}; + +class ArrowFlightBatchRemoteReader : public ArrowFlightBatchReaderBase { +public: + static arrow::Result> Create( + const std::shared_ptr& statement); + + // create arrow RecordBatchReader must initialize the schema. + // so when creating arrow RecordBatchReader, fetch result data once, + // which will return Block and some necessary information, and extract arrow schema from Block. + arrow::Status init_schema(); arrow::Status ReadNext(std::shared_ptr* out) override; private: - std::shared_ptr statement_; - std::shared_ptr schema_; + ArrowFlightBatchRemoteReader(const std::shared_ptr& statement, + const std::shared_ptr& stub); - ArrowFlightBatchReader(std::shared_ptr statement, - std::shared_ptr schema); + arrow::Status _fetch_schema(); + arrow::Status _fetch_data(); + + std::shared_ptr _brpc_stub = nullptr; + std::once_flag _timezone_once_flag; + std::shared_ptr _block; }; } // namespace flight + } // namespace doris diff --git a/be/src/service/arrow_flight/flight_sql_service.cpp b/be/src/service/arrow_flight/flight_sql_service.cpp index 60b665c62fc781..90ee3edfbea72b 100644 --- a/be/src/service/arrow_flight/flight_sql_service.cpp +++ b/be/src/service/arrow_flight/flight_sql_service.cpp @@ -19,15 +19,17 @@ #include +#include + #include "arrow/flight/sql/server.h" +#include "gutil/strings/split.h" #include "service/arrow_flight/arrow_flight_batch_reader.h" #include "service/arrow_flight/flight_sql_info.h" #include "service/backend_options.h" #include "util/arrow/utils.h" #include "util/uid_util.h" -namespace doris { -namespace flight { +namespace doris::flight { class FlightSqlServer::Impl { private: @@ -41,14 +43,21 @@ class FlightSqlServer::Impl { return arrow::flight::Ticket {std::move(ticket)}; } - arrow::Result> decode_ticket(const std::string& ticket) { - auto divider = ticket.find(':'); - if (divider == std::string::npos) { - return arrow::Status::Invalid("Malformed ticket"); + arrow::Result> decode_ticket(const std::string& ticket) { + std::vector fields = strings::Split(ticket, "&"); + if (fields.size() != 4) { + return arrow::Status::Invalid(fmt::format("Malformed ticket, size: {}", fields.size())); } - std::string query_id = ticket.substr(0, divider); - std::string sql = ticket.substr(divider + 1); - return std::make_pair(std::move(sql), std::move(query_id)); + + TUniqueId queryid; + parse_id(fields[0], &queryid); + TNetworkAddress result_addr; + result_addr.hostname = fields[1]; + result_addr.port = std::stoi(fields[2]); + std::string sql = fields[3]; + std::shared_ptr statement = + std::make_shared(queryid, result_addr, sql); + return statement; } public: @@ -59,18 +68,21 @@ class FlightSqlServer::Impl { arrow::Result> DoGetStatement( const arrow::flight::ServerCallContext& context, const arrow::flight::sql::StatementQueryTicket& command) { - ARROW_ASSIGN_OR_RAISE(auto pair, decode_ticket(command.statement_handle)); - const std::string& sql = pair.first; - const std::string query_id = pair.second; - TUniqueId queryid; - parse_id(query_id, &queryid); - - auto statement = std::make_shared(queryid, sql); - - std::shared_ptr reader; - ARROW_ASSIGN_OR_RAISE(reader, ArrowFlightBatchReader::Create(statement)); - - return std::make_unique(reader); + ARROW_ASSIGN_OR_RAISE(auto statement, decode_ticket(command.statement_handle)); + // if IP:BrpcPort in the Ticket is not current BE node, + // pulls the query result Block from the BE node specified by IP:BrpcPort, + // converts it to Arrow Batch and returns it to ADBC client. + // use brpc to transmit blocks between BEs. + if (statement->result_addr.hostname == BackendOptions::get_localhost() && + statement->result_addr.port == config::brpc_port) { + std::shared_ptr reader; + ARROW_ASSIGN_OR_RAISE(reader, ArrowFlightBatchLocalReader::Create(statement)); + return std::make_unique(reader); + } else { + std::shared_ptr reader; + ARROW_ASSIGN_OR_RAISE(reader, ArrowFlightBatchRemoteReader::Create(statement)); + return std::make_unique(reader); + } } }; @@ -135,5 +147,4 @@ Status FlightSqlServer::join() { return Status::OK(); } -} // namespace flight -} // namespace doris +} // namespace doris::flight diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index ae84081813f1f3..be99278ab541a3 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -157,6 +157,11 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, MetricUnit::N DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_active_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_max_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_max_threads, MetricUnit::NOUNIT); + bthread_key_t btls_key; static void thread_context_deleter(void* d) { @@ -200,7 +205,14 @@ PInternalService::PInternalService(ExecEnv* exec_env) config::brpc_light_work_pool_max_queue_size != -1 ? config::brpc_light_work_pool_max_queue_size : std::max(10240, CpuInfo::num_cores() * 320), - "brpc_light") { + "brpc_light"), + _arrow_flight_work_pool(config::brpc_arrow_flight_work_pool_threads != -1 + ? config::brpc_arrow_flight_work_pool_threads + : std::max(512, CpuInfo::num_cores() * 16), + config::brpc_arrow_flight_work_pool_max_queue_size != -1 + ? config::brpc_arrow_flight_work_pool_max_queue_size + : std::max(20480, CpuInfo::num_cores() * 640), + "brpc_arrow_flight") { REGISTER_HOOK_METRIC(heavy_work_pool_queue_size, [this]() { return _heavy_work_pool.get_queue_size(); }); REGISTER_HOOK_METRIC(light_work_pool_queue_size, @@ -219,6 +231,15 @@ PInternalService::PInternalService(ExecEnv* exec_env) REGISTER_HOOK_METRIC(light_work_max_threads, []() { return config::brpc_light_work_pool_threads; }); + REGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size, + [this]() { return _arrow_flight_work_pool.get_queue_size(); }); + REGISTER_HOOK_METRIC(arrow_flight_work_active_threads, + [this]() { return _arrow_flight_work_pool.get_active_threads(); }); + REGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size, + []() { return config::brpc_arrow_flight_work_pool_max_queue_size; }); + REGISTER_HOOK_METRIC(arrow_flight_work_max_threads, + []() { return config::brpc_arrow_flight_work_pool_threads; }); + _exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool); _exec_env->load_stream_mgr()->set_light_work_pool(&_light_work_pool); @@ -242,6 +263,11 @@ PInternalService::~PInternalService() { DEREGISTER_HOOK_METRIC(heavy_work_max_threads); DEREGISTER_HOOK_METRIC(light_work_max_threads); + DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size); + DEREGISTER_HOOK_METRIC(arrow_flight_work_active_threads); + DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size); + DEREGISTER_HOOK_METRIC(arrow_flight_work_max_threads); + CHECK_EQ(0, bthread_key_delete(btls_key)); CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key)); } @@ -650,6 +676,22 @@ void PInternalService::fetch_data(google::protobuf::RpcController* controller, } } +void PInternalService::fetch_arrow_data(google::protobuf::RpcController* controller, + const PFetchArrowDataRequest* request, + PFetchArrowDataResult* result, + google::protobuf::Closure* done) { + bool ret = _arrow_flight_work_pool.try_offer([this, controller, request, result, done]() { + brpc::ClosureGuard closure_guard(done); + auto* cntl = static_cast(controller); + auto* ctx = new GetArrowResultBatchCtx(cntl, result, done); + _exec_env->result_mgr()->fetch_arrow_data(request->finst_id(), ctx); + }); + if (!ret) { + offer_failed(result, done, _arrow_flight_work_pool); + return; + } +} + void PInternalService::outfile_write_success(google::protobuf::RpcController* controller, const POutfileWriteSuccessRequest* request, POutfileWriteSuccessResult* result, @@ -857,23 +899,22 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController google::protobuf::Closure* done) { bool ret = _light_work_pool.try_offer([request, result, done]() { brpc::ClosureGuard closure_guard(done); - std::shared_ptr schema = - ExecEnv::GetInstance()->result_mgr()->find_arrow_schema( - UniqueId(request->finst_id()).to_thrift()); - if (schema == nullptr) { - LOG(INFO) << "FE not found arrow flight schema, maybe query has been canceled"; - auto st = Status::NotFound( - "FE not found arrow flight schema, maybe query has been canceled"); + std::shared_ptr schema; + auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema( + UniqueId(request->finst_id()).to_thrift(), &schema); + if (!st.ok()) { + LOG(WARNING) << "fetch arrow flight schema failed, errmsg=" << st; st.to_protobuf(result->mutable_status()); return; } std::string schema_str; - auto st = serialize_arrow_schema(&schema, &schema_str); + st = serialize_arrow_schema(&schema, &schema_str); if (st.ok()) { result->set_schema(std::move(schema_str)); - if (config::public_access_ip != "") { + if (!config::public_access_ip.empty() && config::public_access_port != -1) { result->set_be_arrow_flight_ip(config::public_access_ip); + result->set_be_arrow_flight_port(config::public_access_port); } } st.to_protobuf(result->mutable_status()); diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index b3ab1c5a6474c0..66a0f867393793 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -97,6 +97,10 @@ class PInternalService : public PBackendService { void fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) override; + void fetch_arrow_data(google::protobuf::RpcController* controller, + const PFetchArrowDataRequest* request, PFetchArrowDataResult* result, + google::protobuf::Closure* done) override; + void outfile_write_success(google::protobuf::RpcController* controller, const POutfileWriteSuccessRequest* request, POutfileWriteSuccessResult* result, @@ -271,6 +275,7 @@ class PInternalService : public PBackendService { // otherwise as light interface FifoThreadPool _heavy_work_pool; FifoThreadPool _light_work_pool; + FifoThreadPool _arrow_flight_work_pool; }; // `StorageEngine` mixin for `PInternalService` diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp index 084765e5aaa9c0..dd11d5ae46f740 100644 --- a/be/src/util/arrow/row_batch.cpp +++ b/be/src/util/arrow/row_batch.cpp @@ -46,7 +46,8 @@ namespace doris { using strings::Substitute; -Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* result) { +Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* result, + const std::string& timezone) { switch (type.type) { case TYPE_NULL: *result = arrow::null(); @@ -96,11 +97,11 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr 3) { - *result = std::make_shared(arrow::TimeUnit::MICRO); + *result = std::make_shared(arrow::TimeUnit::MICRO, timezone); } else if (type.scale > 0) { - *result = std::make_shared(arrow::TimeUnit::MILLI); + *result = std::make_shared(arrow::TimeUnit::MILLI, timezone); } else { - *result = std::make_shared(arrow::TimeUnit::SECOND); + *result = std::make_shared(arrow::TimeUnit::SECOND, timezone); } break; case TYPE_DECIMALV2: @@ -120,7 +121,7 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr item_type; - static_cast(convert_to_arrow_type(type.children[0], &item_type)); + static_cast(convert_to_arrow_type(type.children[0], &item_type, timezone)); *result = std::make_shared(item_type); break; } @@ -128,8 +129,8 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr key_type; std::shared_ptr val_type; - static_cast(convert_to_arrow_type(type.children[0], &key_type)); - static_cast(convert_to_arrow_type(type.children[1], &val_type)); + static_cast(convert_to_arrow_type(type.children[0], &key_type, timezone)); + static_cast(convert_to_arrow_type(type.children[1], &val_type, timezone)); *result = std::make_shared(key_type, val_type); break; } @@ -138,7 +139,7 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr> fields; for (size_t i = 0; i < type.children.size(); i++) { std::shared_ptr field_type; - static_cast(convert_to_arrow_type(type.children[i], &field_type)); + static_cast(convert_to_arrow_type(type.children[i], &field_type, timezone)); fields.push_back(std::make_shared(type.field_names[i], field_type, type.contains_nulls[i])); } @@ -156,19 +157,14 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* field) { - std::shared_ptr type; - RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type)); - *field = arrow::field(desc->col_name(), type, desc->is_nullable()); - return Status::OK(); -} - -Status get_arrow_schema(const vectorized::Block& block, std::shared_ptr* result) { +Status get_arrow_schema_from_block(const vectorized::Block& block, + std::shared_ptr* result, + const std::string& timezone) { std::vector> fields; for (const auto& type_and_name : block) { std::shared_ptr arrow_type; RETURN_IF_ERROR(convert_to_arrow_type(type_and_name.type->get_type_as_type_descriptor(), - &arrow_type)); + &arrow_type, timezone)); fields.push_back(std::make_shared(type_and_name.name, arrow_type, type_and_name.type->is_nullable())); } @@ -176,13 +172,14 @@ Status get_arrow_schema(const vectorized::Block& block, std::shared_ptr* result) { +Status get_arrow_schema_from_expr_ctxs(const vectorized::VExprContextSPtrs& output_vexpr_ctxs, + std::shared_ptr* result, + const std::string& timezone) { std::vector> fields; for (int i = 0; i < output_vexpr_ctxs.size(); i++) { std::shared_ptr arrow_type; auto root_expr = output_vexpr_ctxs.at(i)->root(); - RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type)); + RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type, timezone)); auto field_name = root_expr->is_slot_ref() && !root_expr->expr_label().empty() ? root_expr->expr_label() : fmt::format("{}_{}", root_expr->data_type()->get_name(), i); diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h index 5dd76ff66d7ff8..d10bd54b2ae5af 100644 --- a/be/src/util/arrow/row_batch.h +++ b/be/src/util/arrow/row_batch.h @@ -41,12 +41,16 @@ namespace doris { class RowDescriptor; -Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* result); +Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* result, + const std::string& timezone); -Status get_arrow_schema(const vectorized::Block& block, std::shared_ptr* result); +Status get_arrow_schema_from_block(const vectorized::Block& block, + std::shared_ptr* result, + const std::string& timezone); -Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs, - std::shared_ptr* result); +Status get_arrow_schema_from_expr_ctxs(const vectorized::VExprContextSPtrs& output_vexpr_ctxs, + std::shared_ptr* result, + const std::string& timezone); Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result); diff --git a/be/src/util/arrow/utils.cpp b/be/src/util/arrow/utils.cpp index 5ccff849034a4a..742f5bd0fc33f3 100644 --- a/be/src/util/arrow/utils.cpp +++ b/be/src/util/arrow/utils.cpp @@ -33,9 +33,10 @@ Status to_doris_status(const arrow::Status& status) { } arrow::Status to_arrow_status(const Status& status) { - if (status.ok()) { + if (LIKELY(status.ok())) { return arrow::Status::OK(); } else { + LOG(WARNING) << status.to_string(); // The length of exception msg returned to the ADBC Client cannot larger than 8192, // otherwise ADBC Client will receive: // `INTERNAL: http2 exception Header size exceeded max allowed size (8192)`. diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 3006461059c106..69516773debdbc 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -218,6 +218,11 @@ class DorisMetrics { UIntGauge* heavy_work_max_threads = nullptr; UIntGauge* light_work_max_threads = nullptr; + UIntGauge* arrow_flight_work_pool_queue_size = nullptr; + UIntGauge* arrow_flight_work_active_threads = nullptr; + UIntGauge* arrow_flight_work_pool_max_queue_size = nullptr; + UIntGauge* arrow_flight_work_max_threads = nullptr; + UIntGauge* flush_thread_pool_queue_size = nullptr; UIntGauge* flush_thread_pool_thread_num = nullptr; diff --git a/be/src/vec/runtime/vparquet_transformer.cpp b/be/src/vec/runtime/vparquet_transformer.cpp index 1969858349f0e9..f0810d6c7ceead 100644 --- a/be/src/vec/runtime/vparquet_transformer.cpp +++ b/be/src/vec/runtime/vparquet_transformer.cpp @@ -266,7 +266,8 @@ Status VParquetTransformer::_parse_schema() { std::vector> fields; for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { std::shared_ptr type; - RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type)); + RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type, + _state->timezone())); if (_parquet_schemas != nullptr) { std::shared_ptr field = arrow::field(_parquet_schemas->operator[](i).schema_column_name, type, diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp b/be/src/vec/sink/varrow_flight_result_writer.cpp index b23d1668465bbd..c54c27a84844b1 100644 --- a/be/src/vec/sink/varrow_flight_result_writer.cpp +++ b/be/src/vec/sink/varrow_flight_result_writer.cpp @@ -19,21 +19,16 @@ #include "runtime/buffer_control_block.h" #include "runtime/runtime_state.h" -#include "util/arrow/block_convertor.h" -#include "util/arrow/row_batch.h" +#include "runtime/thread_context.h" #include "vec/core/block.h" #include "vec/exprs/vexpr_context.h" -namespace doris { -namespace vectorized { +namespace doris::vectorized { -VArrowFlightResultWriter::VArrowFlightResultWriter( - BufferControlBlock* sinker, const VExprContextSPtrs& output_vexpr_ctxs, - RuntimeProfile* parent_profile, const std::shared_ptr& arrow_schema) - : _sinker(sinker), - _output_vexpr_ctxs(output_vexpr_ctxs), - _parent_profile(parent_profile), - _arrow_schema(arrow_schema) {} +VArrowFlightResultWriter::VArrowFlightResultWriter(BufferControlBlock* sinker, + const VExprContextSPtrs& output_vexpr_ctxs, + RuntimeProfile* parent_profile) + : _sinker(sinker), _output_vexpr_ctxs(output_vexpr_ctxs), _parent_profile(parent_profile) {} Status VArrowFlightResultWriter::init(RuntimeState* state) { _init_profile(); @@ -41,13 +36,11 @@ Status VArrowFlightResultWriter::init(RuntimeState* state) { return Status::InternalError("sinker is NULL pointer."); } _is_dry_run = state->query_options().dry_run_query; - _timezone_obj = state->timezone_obj(); return Status::OK(); } void VArrowFlightResultWriter::_init_profile() { _append_row_batch_timer = ADD_TIMER(_parent_profile, "AppendBatchTime"); - _convert_tuple_timer = ADD_CHILD_TIMER(_parent_profile, "TupleConvertTime", "AppendBatchTime"); _result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime", "AppendBatchTime"); _sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", TUnit::UNIT); _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES); @@ -66,29 +59,31 @@ Status VArrowFlightResultWriter::write(RuntimeState* state, Block& input_block) RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, input_block, &block)); - // convert one batch - std::shared_ptr result; - auto num_rows = block.rows(); - // arrow::RecordBatch without `nbytes()` in C++ - uint64_t bytes_sent = block.bytes(); { - SCOPED_TIMER(_convert_tuple_timer); - RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema, arrow::default_memory_pool(), - &result, _timezone_obj)); - } - { - SCOPED_TIMER(_result_send_timer); - // If this is a dry run task, no need to send data block - if (!_is_dry_run) { - status = _sinker->add_arrow_batch(state, result); - } - if (status.ok()) { - _written_rows += num_rows; + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_sinker->mem_tracker()); + std::unique_ptr mutable_block = + vectorized::MutableBlock::create_unique(block.clone_empty()); + RETURN_IF_ERROR(mutable_block->merge_ignore_overflow(std::move(block))); + std::shared_ptr output_block = vectorized::Block::create_shared(); + output_block->swap(mutable_block->to_block()); + + auto num_rows = output_block->rows(); + // arrow::RecordBatch without `nbytes()` in C++ + uint64_t bytes_sent = output_block->bytes(); + { + SCOPED_TIMER(_result_send_timer); + // If this is a dry run task, no need to send data block if (!_is_dry_run) { - _bytes_sent += bytes_sent; + status = _sinker->add_arrow_batch(state, output_block); + } + if (status.ok()) { + _written_rows += num_rows; + if (!_is_dry_run) { + _bytes_sent += bytes_sent; + } + } else { + LOG(WARNING) << "append result batch to sink failed."; } - } else { - LOG(WARNING) << "append result batch to sink failed."; } } return status; @@ -100,5 +95,4 @@ Status VArrowFlightResultWriter::close(Status st) { return Status::OK(); } -} // namespace vectorized -} // namespace doris +} // namespace doris::vectorized diff --git a/be/src/vec/sink/varrow_flight_result_writer.h b/be/src/vec/sink/varrow_flight_result_writer.h index ab2578421c80bc..c87518de5e1561 100644 --- a/be/src/vec/sink/varrow_flight_result_writer.h +++ b/be/src/vec/sink/varrow_flight_result_writer.h @@ -17,13 +17,6 @@ #pragma once -#include -#include -#include - -#include -#include - #include "common/status.h" #include "runtime/result_writer.h" #include "util/runtime_profile.h" @@ -39,8 +32,7 @@ class Block; class VArrowFlightResultWriter final : public ResultWriter { public: VArrowFlightResultWriter(BufferControlBlock* sinker, const VExprContextSPtrs& output_vexpr_ctxs, - RuntimeProfile* parent_profile, - const std::shared_ptr& arrow_schema); + RuntimeProfile* parent_profile); Status init(RuntimeState* state) override; @@ -58,8 +50,6 @@ class VArrowFlightResultWriter final : public ResultWriter { RuntimeProfile* _parent_profile = nullptr; // parent profile from result sink. not owned // total time cost on append batch operation RuntimeProfile::Counter* _append_row_batch_timer = nullptr; - // tuple convert timer, child timer of _append_row_batch_timer - RuntimeProfile::Counter* _convert_tuple_timer = nullptr; // file write timer, child timer of _append_row_batch_timer RuntimeProfile::Counter* _result_send_timer = nullptr; // number of sent rows @@ -70,10 +60,6 @@ class VArrowFlightResultWriter final : public ResultWriter { bool _is_dry_run = false; uint64_t _bytes_sent = 0; - - std::shared_ptr _arrow_schema; - - cctz::time_zone _timezone_obj; }; } // namespace vectorized } // namespace doris diff --git a/be/test/runtime/result_buffer_mgr_test.cpp b/be/test/runtime/result_buffer_mgr_test.cpp index 152c155ef0a3c5..4ab9186c5fab10 100644 --- a/be/test/runtime/result_buffer_mgr_test.cpp +++ b/be/test/runtime/result_buffer_mgr_test.cpp @@ -34,6 +34,7 @@ class ResultBufferMgrTest : public testing::Test { virtual void SetUp() {} private: + RuntimeState _state; }; TEST_F(ResultBufferMgrTest, create_normal) { @@ -43,7 +44,7 @@ TEST_F(ResultBufferMgrTest, create_normal) { query_id.hi = 100; std::shared_ptr control_block1; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok()); } TEST_F(ResultBufferMgrTest, create_same_buffer) { @@ -53,9 +54,9 @@ TEST_F(ResultBufferMgrTest, create_same_buffer) { query_id.hi = 100; std::shared_ptr control_block1; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok()); std::shared_ptr control_block2; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block2, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block2, &_state).ok()); EXPECT_EQ(control_block1.get(), control_block1.get()); } @@ -67,7 +68,7 @@ TEST_F(ResultBufferMgrTest, fetch_data_normal) { query_id.hi = 100; std::shared_ptr control_block1; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok()); TFetchDataResult* result = new TFetchDataResult(); result->result_batch.rows.push_back("hello test"); @@ -85,7 +86,7 @@ TEST_F(ResultBufferMgrTest, fetch_data_no_block) { query_id.hi = 100; std::shared_ptr control_block1; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok()); TFetchDataResult* result = new TFetchDataResult(); query_id.lo = 11; @@ -101,7 +102,7 @@ TEST_F(ResultBufferMgrTest, normal_cancel) { query_id.hi = 100; std::shared_ptr control_block1; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok()); EXPECT_TRUE(buffer_mgr.cancel(query_id).ok()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 5d431b386b7f11..371680813502f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -27,6 +27,7 @@ import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry; import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager; +import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Preconditions; import com.google.protobuf.Any; @@ -224,24 +225,40 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con } } else { // Now only query stmt will pull results from BE. - final ByteString handle; - if (connectContext.getSessionVariable().enableParallelResultSink()) { - handle = ByteString.copyFromUtf8(DebugUtil.printId(connectContext.queryId()) + ":" + query); - } else { - // only one instance - handle = ByteString.copyFromUtf8(DebugUtil.printId(connectContext.getFinstId()) + ":" + query); - } Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000); if (schema == null) { throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null") .toRuntimeException(); } + + TUniqueId queryId = connectContext.queryId(); + if (!connectContext.getSessionVariable().enableParallelResultSink()) { + // only one instance + queryId = connectContext.getFinstId(); + } + // Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located. + final ByteString handle = ByteString.copyFromUtf8( + DebugUtil.printId(queryId) + "&" + connectContext.getResultInternalServiceAddr().hostname + + "&" + connectContext.getResultInternalServiceAddr().port + "&" + query); TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle) .build(); Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); // TODO Support multiple endpoints. - Location location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname, - connectContext.getResultFlightServerAddr().port); + Location location; + if (flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) { + // In a production environment, it is often inconvenient to expose Doris BE nodes + // to the external network. + // However, a reverse proxy (such as nginx) can be added to all Doris BE nodes, + // and the external client will be randomly routed to a Doris BE node when connecting to nginx. + // The query results of Arrow Flight SQL will be randomly saved on a Doris BE node. + // If it is different from the Doris BE node randomly routed by nginx, + // data forwarding needs to be done inside the Doris BE node. + location = Location.forGrpcInsecure(flightSQLConnectProcessor.getPublicAccessAddr().hostname, + flightSQLConnectProcessor.getPublicAccessAddr().port); + } else { + location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname, + connectContext.getResultFlightServerAddr().port); + } List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); // TODO Set in BE callback after query end, Client will not callback. return new FlightInfo(schema, descriptor, endpoints, -1, -1); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index b812bf81d8a514..febadbef0ab0d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -53,11 +53,12 @@ /** * Process one flgiht sql connection. - * + *

* Must use try-with-resources. */ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoCloseable { private static final Logger LOG = LogManager.getLogger(FlightSqlConnectProcessor.class); + private TNetworkAddress publicAccessAddr = new TNetworkAddress(); public FlightSqlConnectProcessor(ConnectContext context) { super(context); @@ -66,6 +67,10 @@ public FlightSqlConnectProcessor(ConnectContext context) { context.setReturnResultFromLocal(true); } + public TNetworkAddress getPublicAccessAddr() { + return publicAccessAddr; + } + public void prepare(MysqlCommand command) { // set status of query to OK. ctx.getState().reset(); @@ -130,10 +135,11 @@ public Schema fetchArrowFlightSchema(int timeoutMs) { Status resultStatus = new Status(pResult.getStatus()); if (resultStatus.getErrorCode() != TStatusCode.OK) { throw new RuntimeException(String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s", - DebugUtil.printId(tid), resultStatus.toString())); + DebugUtil.printId(tid), resultStatus)); } - if (pResult.hasBeArrowFlightIp()) { - ctx.getResultFlightServerAddr().hostname = pResult.getBeArrowFlightIp().toStringUtf8(); + if (pResult.hasBeArrowFlightIp() && pResult.hasBeArrowFlightPort()) { + publicAccessAddr.hostname = pResult.getBeArrowFlightIp().toStringUtf8(); + publicAccessAddr.port = pResult.getBeArrowFlightPort(); } if (pResult.hasSchema() && pResult.getSchema().size() > 0) { RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index f3764cea233806..17448861dc06bb 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -283,6 +283,20 @@ message PFetchDataResult { optional bool empty_batch = 6; }; +message PFetchArrowDataRequest { + optional PUniqueId finst_id = 1; +}; + +message PFetchArrowDataResult { + optional PStatus status = 1; + // valid when status is ok + optional int64 packet_seq = 2; + optional bool eos = 3; + optional PBlock block = 4; + optional bool empty_batch = 5; + optional string timezone = 6; +}; + message PFetchArrowFlightSchemaRequest { optional PUniqueId finst_id = 1; }; @@ -292,6 +306,7 @@ message PFetchArrowFlightSchemaResult { // valid when status is ok optional bytes schema = 2; optional bytes be_arrow_flight_ip = 3; + optional int32 be_arrow_flight_port = 4; }; message KeyTuple { @@ -979,6 +994,7 @@ service PBackendService { rpc exec_plan_fragment_start(PExecPlanFragmentStartRequest) returns (PExecPlanFragmentResult); rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns (PCancelPlanFragmentResult); rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult); + rpc fetch_arrow_data(PFetchArrowDataRequest) returns (PFetchArrowDataResult); rpc tablet_writer_open(PTabletWriterOpenRequest) returns (PTabletWriterOpenResult); rpc open_load_stream(POpenLoadStreamRequest) returns (POpenLoadStreamResponse); rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns (PTabletWriterAddBlockResult); diff --git a/regression-test/data/arrow_flight_sql_p0/test_select.out b/regression-test/data/arrow_flight_sql_p0/test_select.out index d643597bbafcb5..f2f4b86bbf5ceb 100644 --- a/regression-test/data/arrow_flight_sql_p0/test_select.out +++ b/regression-test/data/arrow_flight_sql_p0/test_select.out @@ -2,3 +2,7 @@ -- !arrow_flight_sql -- 777 4 +-- !arrow_flight_sql_datetime -- +333 plsql333 2024-07-21 12:00:00.123456 2024-07-21 12:00:00.0 +222 plsql222 2024-07-20 12:00:00.123456 2024-07-20 12:00:00.0 +111 plsql111 2024-07-19 12:00:00.123456 2024-07-19 12:00:00.0 \ No newline at end of file diff --git a/regression-test/framework/pom.xml b/regression-test/framework/pom.xml index aded781c08bb71..6b749bf0fd1dae 100644 --- a/regression-test/framework/pom.xml +++ b/regression-test/framework/pom.xml @@ -379,7 +379,7 @@ under the License. org.apache.doris flink-doris-connector-1.16 - 1.6.1 + 24.0.0 diff --git a/regression-test/suites/arrow_flight_sql_p0/test_select.groovy b/regression-test/suites/arrow_flight_sql_p0/test_select.groovy index 55b3c301e244f4..950fb4af7e9034 100644 --- a/regression-test/suites/arrow_flight_sql_p0/test_select.groovy +++ b/regression-test/suites/arrow_flight_sql_p0/test_select.groovy @@ -28,4 +28,16 @@ suite("test_select", "arrow_flight_sql") { sql """INSERT INTO ${tableName} VALUES(111, "plsql333")""" qt_arrow_flight_sql "select sum(id) as a, count(1) as b from ${tableName}" + + tableName = "test_select_datetime" + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + create table ${tableName} (id int, name varchar(20), f_datetime_p datetime(6), f_datetime datetime) DUPLICATE key(`id`) distributed by hash (`id`) buckets 4 + properties ("replication_num"="1"); + """ + sql """INSERT INTO ${tableName} VALUES(111, "plsql111","2024-07-19 12:00:00.123456","2024-07-19 12:00:00")""" + sql """INSERT INTO ${tableName} VALUES(222, "plsql222","2024-07-20 12:00:00.123456","2024-07-20 12:00:00")""" + sql """INSERT INTO ${tableName} VALUES(333, "plsql333","2024-07-21 12:00:00.123456","2024-07-21 12:00:00")""" + + qt_arrow_flight_sql_datetime "select * from ${tableName} order by id desc" }