Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Nov 11, 2024
1 parent e0192b1 commit c035fd0
Show file tree
Hide file tree
Showing 24 changed files with 799 additions and 243 deletions.
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ DEFINE_Int32(brpc_port, "8060");
DEFINE_Int32(arrow_flight_sql_port, "-1");

DEFINE_mString(public_access_ip, "");
DEFINE_Int32(public_access_port, "-1");

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
Expand Down Expand Up @@ -520,6 +521,8 @@ DEFINE_Int32(brpc_light_work_pool_threads, "-1");
DEFINE_Int32(brpc_heavy_work_pool_max_queue_size, "-1");
DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1");
DEFINE_mBool(enable_bthread_transmit_block, "true");
DEFINE_Int32(brpc_arrow_flight_work_pool_threads, "-1");
DEFINE_Int32(brpc_arrow_flight_work_pool_max_queue_size, "-1");

//Enable brpc builtin services, see:
//https://brpc.apache.org/docs/server/basics/#disable-built-in-services-completely
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ DECLARE_Int32(arrow_flight_sql_port);
// For ADBC client fetch result, default is empty, the ADBC client uses the backend ip to fetch the result.
// If ADBC client cannot access the backend ip, can set public_access_ip to modify the fetch result ip.
DECLARE_mString(public_access_ip);
DECLARE_Int32(public_access_port);

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
Expand Down Expand Up @@ -571,6 +572,8 @@ DECLARE_Int32(brpc_light_work_pool_threads);
DECLARE_Int32(brpc_heavy_work_pool_max_queue_size);
DECLARE_Int32(brpc_light_work_pool_max_queue_size);
DECLARE_mBool(enable_bthread_transmit_block);
DECLARE_Int32(brpc_arrow_flight_work_pool_threads);
DECLARE_Int32(brpc_arrow_flight_work_pool_max_queue_size);

// The maximum amount of data that can be processed by a stream load
DECLARE_mInt64(streaming_load_max_mb);
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/memory_scratch_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
{
SCOPED_TIMER(local_state._get_arrow_schema_timer);
// After expr executed, use recaculated schema as final schema
RETURN_IF_ERROR(get_arrow_schema(block, &block_arrow_schema, state->timezone()));
RETURN_IF_ERROR(get_arrow_schema_from_block(block, &block_arrow_schema, state->timezone()));
}
{
SCOPED_TIMER(local_state._convert_block_to_arrow_batch_timer);
Expand Down
8 changes: 3 additions & 5 deletions be/src/pipeline/exec/result_file_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@ Status ResultFileSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<ResultFileSinkLocalState>::open(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
if (state->query_options().enable_parallel_outfile) {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->query_id(), _buf_size, &_sender, state->execution_timeout(),
state->batch_size()));
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->query_id(), _buf_size,
&_sender, state));
}
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}
Expand All @@ -92,8 +91,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i
_sender = _parent->cast<ResultFileSinkOperatorX>()._sender;
} else {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(), p._buf_size, &_sender, state->execution_timeout(),
state->batch_size()));
state->fragment_instance_id(), p._buf_size, &_sender, state));
}
_sender->set_dependency(state->fragment_instance_id(), _dependency->shared_from_this());

Expand Down
19 changes: 6 additions & 13 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
} else {
auto& p = _parent->cast<ResultSinkOperatorX>();
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
fragment_instance_id, p._result_sink_buffer_size_rows, &_sender,
state->execution_timeout(), state->batch_size()));
fragment_instance_id, p._result_sink_buffer_size_rows, &_sender, state));
}
_sender->set_dependency(fragment_instance_id, _dependency->shared_from_this());
return Status::OK();
Expand Down Expand Up @@ -81,16 +80,11 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
}
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
std::shared_ptr<arrow::Schema> arrow_schema;
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema,
state->timezone()));
if (state->query_options().enable_parallel_result_sink) {
state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), arrow_schema);
} else {
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
arrow_schema);
}
RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema,
state->timezone()));
_sender->register_arrow_schema(arrow_schema);
_writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
_sender.get(), _output_vexpr_ctxs, _profile, arrow_schema));
_sender.get(), _output_vexpr_ctxs, _profile));
break;
}
default:
Expand Down Expand Up @@ -135,8 +129,7 @@ Status ResultSinkOperatorX::open(RuntimeState* state) {

if (state->query_options().enable_parallel_result_sink) {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->query_id(), _result_sink_buffer_size_rows, &_sender,
state->execution_timeout(), state->batch_size()));
state->query_id(), _result_sink_buffer_size_rows, &_sender, state));
}
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}
Expand Down
183 changes: 158 additions & 25 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
#include "arrow/record_batch.h"
#include "arrow/type_fwd.h"
#include "pipeline/dependency.h"
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
#include "util/string_util.h"
#include "util/thrift_util.h"
#include "vec/core/block.h"

namespace doris {

Expand Down Expand Up @@ -93,14 +95,81 @@ void GetResultBatchCtx::on_data(const std::unique_ptr<TFetchDataResult>& t_resul
delete this;
}

BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, int batch_size)
void GetArrowResultBatchCtx::on_failure(const Status& status) {
DCHECK(!status.ok()) << "status is ok, errmsg=" << status;
status.to_protobuf(result->mutable_status());
delete this;
}

void GetArrowResultBatchCtx::on_close(int64_t packet_seq) {
Status status;
status.to_protobuf(result->mutable_status());
result->set_packet_seq(packet_seq);
result->set_eos(true);
delete this;
}

void GetArrowResultBatchCtx::on_data(
const std::shared_ptr<vectorized::Block>& block, int64_t packet_seq, int be_exec_version,
segment_v2::CompressionTypePB fragement_transmission_compression_type, std::string timezone,
std::string arrow_schema_field_names, RuntimeProfile::Counter* serialize_batch_ns_timer,
RuntimeProfile::Counter* uncompressed_bytes_counter,
RuntimeProfile::Counter* compressed_bytes_counter) {
Status st = Status::OK();
if (result != nullptr) {
size_t uncompressed_bytes = 0, compressed_bytes = 0;
SCOPED_TIMER(serialize_batch_ns_timer);
st = block->serialize(be_exec_version, result->mutable_block(), &uncompressed_bytes,
&compressed_bytes, fragement_transmission_compression_type, false);
COUNTER_UPDATE(uncompressed_bytes_counter, uncompressed_bytes);
COUNTER_UPDATE(compressed_bytes_counter, compressed_bytes);
if (st.ok()) {
result->set_packet_seq(packet_seq);
result->set_eos(false);
if (packet_seq == 0) {
result->set_timezone(timezone);
result->set_fields_labels(arrow_schema_field_names);
}
} else {
result->clear_block();
result->set_packet_seq(packet_seq);
LOG(WARNING) << "TFetchDataResult serialize failed, errmsg=" << st;
}
} else {
result->set_empty_batch(true);
result->set_packet_seq(packet_seq);
result->set_eos(false);
}

/// The size limit of proto buffer message is 2G
if (result->ByteSizeLong() > std::numeric_limits<int32_t>::max()) {
st = Status::InternalError("Message size exceeds 2GB: {}", result->ByteSizeLong());
result->clear_block();
}
st.to_protobuf(result->mutable_status());
delete this;
}

BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* state)
: _fragment_id(id),
_is_close(false),
_is_cancelled(false),
_buffer_limit(buffer_size),
_packet_num(0),
_batch_size(batch_size) {
_batch_size(state->batch_size()),
_timezone(state->timezone()),
_timezone_obj(state->timezone_obj()),
_be_exec_version(state->be_exec_version()),
_fragement_transmission_compression_type(
state->fragement_transmission_compression_type()),
_profile("BufferControlBlock " + print_id(_fragment_id)) {
_query_statistics = std::make_unique<QueryStatistics>();
_serialize_batch_ns_timer = ADD_TIMER(&_profile, "SerializeBatchNsTime");
_uncompressed_bytes_counter = ADD_COUNTER(&_profile, "UncompressedBytes", TUnit::BYTES);
_compressed_bytes_counter = ADD_COUNTER(&_profile, "CompressedBytes", TUnit::BYTES);
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::QUERY,
fmt::format("BufferControlBlock#FragmentInstanceId={}", print_id(_fragment_id)));
}

BufferControlBlock::~BufferControlBlock() {
Expand Down Expand Up @@ -148,36 +217,45 @@ Status BufferControlBlock::add_batch(RuntimeState* state,
}

Status BufferControlBlock::add_arrow_batch(RuntimeState* state,
std::shared_ptr<arrow::RecordBatch>& result) {
std::shared_ptr<vectorized::Block>& result) {
std::unique_lock<std::mutex> l(_lock);

if (_is_cancelled) {
return Status::Cancelled("Cancelled");
}

int num_rows = result->num_rows();

// TODO: merge RocordBatch, ToStructArray -> Make again
if (_waiting_arrow_result_batch_rpc.empty()) {
// TODO: Merge result into block to reduce rpc times
int num_rows = result->rows();
_arrow_flight_result_batch_queue.push_back(std::move(result));
_instance_rows_in_queue.emplace_back();
_instance_rows[state->fragment_instance_id()] += num_rows;
_instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
_arrow_data_arrival
.notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<vectorized::Block>,)
} else {
auto* ctx = _waiting_arrow_result_batch_rpc.front();
_waiting_arrow_result_batch_rpc.pop_front();
ctx->on_data(result, _packet_num, _be_exec_version,
_fragement_transmission_compression_type, _timezone, _arrow_schema_field_names,
_serialize_batch_ns_timer, _uncompressed_bytes_counter,
_compressed_bytes_counter);
_packet_num++;
}

_arrow_flight_batch_queue.push_back(std::move(result));
_instance_rows_in_queue.emplace_back();
_instance_rows[state->fragment_instance_id()] += num_rows;
_instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
_arrow_data_arrival.notify_one();
_update_dependency();
return Status::OK();
}

void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
std::lock_guard<std::mutex> l(_lock);
Defer defer {[&]() { _update_dependency(); }};
if (!_status.ok()) {
ctx->on_failure(_status);
_update_dependency();
return;
}
if (_is_cancelled) {
ctx->on_failure(Status::Cancelled("Cancelled"));
_update_dependency();
return;
}
if (!_fe_result_batch_queue.empty()) {
Expand All @@ -191,56 +269,111 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {

ctx->on_data(result, _packet_num);
_packet_num++;
_update_dependency();
return;
}
if (_is_close) {
ctx->on_close(_packet_num, _query_statistics.get());
_update_dependency();
return;
}
// no ready data, push ctx to waiting list
_waiting_rpc.push_back(ctx);
_update_dependency();
}

Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result) {
Status BufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>* result,
cctz::time_zone& timezone_obj) {
std::unique_lock<std::mutex> l(_lock);
Defer defer {[&]() { _update_dependency(); }};
if (!_status.ok()) {
return _status;
}
if (_is_cancelled) {
return Status::Cancelled("Cancelled");
}

while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) {
_arrow_data_arrival.wait_for(l, std::chrono::seconds(1));
while (_arrow_flight_result_batch_queue.empty() && !_is_cancelled && !_is_close) {
_arrow_data_arrival.wait_for(l, std::chrono::milliseconds(20));
}

if (_is_cancelled) {
return Status::Cancelled("Cancelled");
}

if (!_arrow_flight_batch_queue.empty()) {
*result = std::move(_arrow_flight_batch_queue.front());
_arrow_flight_batch_queue.pop_front();
if (!_arrow_flight_result_batch_queue.empty()) {
*result = std::move(_arrow_flight_result_batch_queue.front());
_arrow_flight_result_batch_queue.pop_front();
timezone_obj = _timezone_obj;

for (auto it : _instance_rows_in_queue.front()) {
_instance_rows[it.first] -= it.second;
}
_instance_rows_in_queue.pop_front();
_packet_num++;
_update_dependency();
return Status::OK();
}

// normal path end
if (_is_close) {
_update_dependency();
std::stringstream ss;
_profile.pretty_print(&ss);
VLOG_NOTICE << fmt::format(
"BufferControlBlock finished, fragment_id={}, is_close={}, is_cancelled={}, "
"packet_num={}, peak_memory_usage={}, profile={}",
print_id(_fragment_id), _is_close, _is_cancelled, _packet_num,
_mem_tracker->peak_consumption(), ss.str());
return Status::OK();
}
return Status::InternalError("Get Arrow Batch Abnormal Ending");
}

void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {
std::unique_lock<std::mutex> l(_lock);
SCOPED_ATTACH_TASK(_mem_tracker);
Defer defer {[&]() { _update_dependency(); }};
if (!_status.ok()) {
ctx->on_failure(_status);
return;
}
if (_is_cancelled) {
ctx->on_failure(Status::Cancelled("Cancelled"));
return;
}

if (!_arrow_flight_result_batch_queue.empty()) {
auto block = _arrow_flight_result_batch_queue.front();
_arrow_flight_result_batch_queue.pop_front();
for (auto it : _instance_rows_in_queue.front()) {
_instance_rows[it.first] -= it.second;
}
_instance_rows_in_queue.pop_front();

ctx->on_data(block, _packet_num, _be_exec_version, _fragement_transmission_compression_type,
_timezone, _arrow_schema_field_names, _serialize_batch_ns_timer,
_uncompressed_bytes_counter, _compressed_bytes_counter);
_packet_num++;
return;
}

// normal path end
if (_is_close) {
ctx->on_close(_packet_num);
std::stringstream ss;
_profile.pretty_print(&ss);
VLOG_NOTICE << fmt::format(
"BufferControlBlock finished, fragment_id={}, is_close={}, is_cancelled={}, "
"packet_num={}, peak_memory_usage={}, profile={}",
print_id(_fragment_id), _is_close, _is_cancelled, _packet_num,
_mem_tracker->peak_consumption(), ss.str());
return;
}
// no ready data, push ctx to waiting list
_waiting_arrow_result_batch_rpc.push_back(ctx);
}

void BufferControlBlock::register_arrow_schema(const std::shared_ptr<arrow::Schema>& arrow_schema) {
_arrow_schema = arrow_schema;
_arrow_schema_field_names = join(_arrow_schema->field_names(), ",");
}

Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
std::unique_lock<std::mutex> l(_lock);
// close will be called multiple times and error status needs to be collected.
Expand Down
Loading

0 comments on commit c035fd0

Please sign in to comment.