diff --git a/be/src/common/config.h b/be/src/common/config.h index dbd9f53a037fc6..f85e0ff75a03a4 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -296,6 +296,8 @@ namespace config { // you may need to increase this timeout if using larger 'streaming_load_max_mb', // or encounter 'tablet writer write failed' error when loading. // CONF_Int32(tablet_writer_rpc_timeout_sec, "600"); + // OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc. + CONF_mInt32(olap_table_sink_send_interval_ms, "10"); // Fragment thread pool CONF_Int32(fragment_pool_thread_num, "64"); diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 59ff01646d8981..5e91fece51e6fe 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -26,18 +26,17 @@ #include "runtime/tuple_row.h" #include "olap/hll.h" +#include "service/brpc.h" #include "util/brpc_stub_cache.h" +#include "util/monotime.h" #include "util/uid_util.h" -#include "service/brpc.h" namespace doris { namespace stream_load { -NodeChannel::NodeChannel(OlapTableSink* parent, int64_t index_id, - int64_t node_id, int32_t schema_hash) - : _parent(parent), _index_id(index_id), - _node_id(node_id), _schema_hash(schema_hash) { -} +NodeChannel::NodeChannel(OlapTableSink* parent, int64_t index_id, int64_t node_id, + int32_t schema_hash) + : _parent(parent), _index_id(index_id), _node_id(node_id), _schema_hash(schema_hash) {} NodeChannel::~NodeChannel() { if (_open_closure != nullptr) { @@ -47,12 +46,11 @@ NodeChannel::~NodeChannel() { _open_closure = nullptr; } if (_add_batch_closure != nullptr) { - if (_add_batch_closure->unref()) { - delete _add_batch_closure; - } + // it's safe to delete, but may take some time to wait until brpc joined + delete _add_batch_closure; _add_batch_closure = nullptr; } - _add_batch_request.release_id(); + _cur_add_batch_request.release_id(); } Status NodeChannel::init(RuntimeState* state) { @@ -63,23 +61,30 @@ Status NodeChannel::init(RuntimeState* state) { ss << "unknown node id, id=" << _node_id; return Status::InternalError(ss.str()); } - RowDescriptor row_desc(_tuple_desc, false); - _batch.reset(new RowBatch(row_desc, state->batch_size(), _parent->_mem_tracker)); - _stub = state->exec_env()->brpc_stub_cache()->get_stub( - _node_info->host, _node_info->brpc_port); + _row_desc.reset(new RowDescriptor(_tuple_desc, false)); + _batch_size = state->batch_size(); + _cur_batch.reset(new RowBatch(*_row_desc, _batch_size, _parent->_mem_tracker)); + + _stub = state->exec_env()->brpc_stub_cache()->get_stub(_node_info->host, _node_info->brpc_port); if (_stub == nullptr) { LOG(WARNING) << "Get rpc stub failed, host=" << _node_info->host - << ", port=" << _node_info->brpc_port; + << ", port=" << _node_info->brpc_port; + _cancelled = true; return Status::InternalError("get rpc stub failed"); } - // Initialize _add_batch_request - _add_batch_request.set_allocated_id(&_parent->_load_id); - _add_batch_request.set_index_id(_index_id); - _add_batch_request.set_sender_id(_parent->_sender_id); + // Initialize _cur_add_batch_request + _cur_add_batch_request.set_allocated_id(&_parent->_load_id); + _cur_add_batch_request.set_index_id(_index_id); + _cur_add_batch_request.set_sender_id(_parent->_sender_id); + _cur_add_batch_request.set_eos(false); _rpc_timeout_ms = state->query_options().query_timeout * 1000; + + _load_info = "load_id=" + print_id(_parent->_load_id) + ", txn_id" + + std::to_string(_parent->_txn_id); + _name = "NodeChannel[" + std::to_string(_index_id) + "-" + std::to_string(_node_id) + "]"; return Status::OK(); } @@ -105,9 +110,7 @@ void NodeChannel::open() { // This ref is for RPC's reference _open_closure->ref(); _open_closure->cntl.set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); - _stub->tablet_writer_open(&_open_closure->cntl, - &request, - &_open_closure->result, + _stub->tablet_writer_open(&_open_closure->cntl, &request, &_open_closure->result, _open_closure); request.release_id(); request.release_schema(); @@ -117,8 +120,9 @@ Status NodeChannel::open_wait() { _open_closure->join(); if (_open_closure->cntl.Failed()) { LOG(WARNING) << "failed to open tablet writer, error=" - << berror(_open_closure->cntl.ErrorCode()) - << ", error_text=" << _open_closure->cntl.ErrorText(); + << berror(_open_closure->cntl.ErrorCode()) + << ", error_text=" << _open_closure->cntl.ErrorText(); + _cancelled = true; return Status::InternalError("failed to open tablet writer"); } Status status(_open_closure->result.status()); @@ -128,54 +132,134 @@ Status NodeChannel::open_wait() { _open_closure = nullptr; // add batch closure - _add_batch_closure = new RefCountClosure(); - _add_batch_closure->ref(); + _add_batch_closure = ReusableClosure::create(); + _add_batch_closure->addFailedHandler([this]() { + _cancelled = true; + LOG(WARNING) << "NodeChannel add batch req rpc failed, " << print_load_info() + << ", node=" << node_info()->host << ":" << node_info()->brpc_port; + }); + + _add_batch_closure->addSuccessHandler( + [this](const PTabletWriterAddBatchResult& result, bool is_last_rpc) { + Status status(result.status()); + if (status.ok()) { + if (is_last_rpc) { + for (auto& tablet : result.tablet_vec()) { + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet.tablet_id(); + commit_info.backendId = _node_id; + _tablet_commit_infos.emplace_back(std::move(commit_info)); + } + _add_batches_finished = true; + } + } else { + _cancelled = true; + LOG(WARNING) << "NodeChannel add batch req success but status isn't ok, " + << print_load_info() << ", node=" << node_info()->host << ":" + << node_info()->brpc_port << ", errmsg=" << status.get_error_msg(); + } + + if (result.has_execution_time_us()) { + _add_batch_counter.add_batch_execution_time_us += result.execution_time_us(); + _add_batch_counter.add_batch_wait_lock_time_us += result.wait_lock_time_us(); + _add_batch_counter.add_batch_num++; + } + }); return status; } Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { - auto row_no = _batch->add_row(); + // If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed. + auto st = none_of({_cancelled, _eos_is_produced}); + if (!st.ok()) { + return st.clone_and_prepend("already stopped, can't add_row. cancelled/eos: "); + } + + // We use OlapTableSink mem_tracker which has the same ancestor of _plan node, + // so in the ideal case, mem limit is a matter for _plan node. + // But there is still some unfinished things, we do mem limit here temporarily. + while (_parent->_mem_tracker->any_limit_exceeded()) { + SCOPED_RAW_TIMER(&_mem_exceeded_block_ns); + SleepFor(MonoDelta::FromMilliseconds(10)); + } + + auto row_no = _cur_batch->add_row(); if (row_no == RowBatch::INVALID_ROW_INDEX) { - RETURN_IF_ERROR(_send_cur_batch()); - row_no = _batch->add_row(); + { + SCOPED_RAW_TIMER(&_queue_push_lock_ns); + std::lock_guard l(_pending_batches_lock); + //To simplify the add_row logic, postpone adding batch into req until the time of sending req + _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); + _pending_batches_num++; + } + + _cur_batch.reset(new RowBatch(*_row_desc, _batch_size, _parent->_mem_tracker)); + _cur_add_batch_request.clear_tablet_ids(); + + row_no = _cur_batch->add_row(); } DCHECK_NE(row_no, RowBatch::INVALID_ROW_INDEX); - auto tuple = input_tuple->deep_copy(*_tuple_desc, _batch->tuple_data_pool()); - _batch->get_row(row_no)->set_tuple(0, tuple); - _batch->commit_last_row(); - _add_batch_request.add_tablet_ids(tablet_id); + auto tuple = input_tuple->deep_copy(*_tuple_desc, _cur_batch->tuple_data_pool()); + _cur_batch->get_row(row_no)->set_tuple(0, tuple); + _cur_batch->commit_last_row(); + _cur_add_batch_request.add_tablet_ids(tablet_id); return Status::OK(); } -Status NodeChannel::close(RuntimeState* state) { - auto st = _close(state); - _batch.reset(); - return st; -} +Status NodeChannel::mark_close() { + auto st = none_of({_cancelled, _eos_is_produced}); + if (!st.ok()) { + return st.clone_and_prepend("already stopped, can't mark as closed. cancelled/eos: "); + } -Status NodeChannel::_close(RuntimeState* state) { - return _send_cur_batch(true); + _cur_add_batch_request.set_eos(true); + { + std::lock_guard l(_pending_batches_lock); + _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); + _pending_batches_num++; + DCHECK(_pending_batches.back().second.eos()); + } + + _eos_is_produced = true; } Status NodeChannel::close_wait(RuntimeState* state) { - RETURN_IF_ERROR(_wait_in_flight_packet()); - Status status(_add_batch_closure->result.status()); - if (status.ok()) { - for (auto& tablet : _add_batch_closure->result.tablet_vec()) { - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet.tablet_id(); - commit_info.backendId = _node_id; - state->tablet_commit_infos().emplace_back(std::move(commit_info)); - } + auto st = none_of({_cancelled, !_eos_is_produced}); + if (!st.ok()) { + return st.clone_and_prepend("already stopped, skip waiting for close. cancelled/!eos: "); } - // clear batch after sendt - _batch.reset(); - return status; + + // waiting for finished, it may take a long time, so we could't set a timeout + MonotonicStopWatch timer; + timer.start(); + while (!_add_batches_finished && !_cancelled) { + SleepFor(MonoDelta::FromMilliseconds(1)); + } + timer.stop(); + VLOG(1) << name() << " close_wait cost: " << timer.elapsed_time() / 1000000 << " ms"; + + { + std::lock_guard lg(_pending_batches_lock); + DCHECK(_pending_batches.empty()); + DCHECK(_cur_batch == nullptr); + } + + if (_add_batches_finished) { + state->tablet_commit_infos().insert(state->tablet_commit_infos().end(), + std::make_move_iterator(_tablet_commit_infos.begin()), + std::make_move_iterator(_tablet_commit_infos.end())); + return Status::OK(); + } + + return Status::InternalError("close wait failed coz rpc error"); } void NodeChannel::cancel() { - // Do we need to wait last rpc finished??? + // we don't need to wait last rpc finished, cause closure's release/reset will join. + // But do we need brpc::StartCancel(call_id)? + _cancelled = true; + PTabletWriterCancelRequest request; request.set_allocated_id(&_parent->_load_id); request.set_index_id(_index_id); @@ -185,80 +269,89 @@ void NodeChannel::cancel() { closure->ref(); closure->cntl.set_timeout_ms(_rpc_timeout_ms); - _stub->tablet_writer_cancel(&closure->cntl, - &request, - &closure->result, - closure); + _stub->tablet_writer_cancel(&closure->cntl, &request, &closure->result, closure); request.release_id(); - // reset batch - _batch.reset(); + // Beware of the destruct sequence. RowBatches will use mem_trackers(include ancestors). + // Delete RowBatches here is a better choice to reduce the potential of dtor errors. + { + std::lock_guard lg(_pending_batches_lock); + std::queue empty; + std::swap(_pending_batches, empty); + _cur_batch.reset(); + } } -Status NodeChannel::_wait_in_flight_packet() { - if (!_has_in_flight_packet) { - return Status::OK(); +int NodeChannel::try_send_and_fetch_status() { + auto st = none_of({_cancelled, _send_finished}); + if (!st.ok()) { + return 0; } - SCOPED_RAW_TIMER(_parent->mutable_wait_in_flight_packet_ns()); - _add_batch_closure->join(); - _has_in_flight_packet = false; - if (_add_batch_closure->cntl.Failed()) { - LOG(WARNING) << "failed to send batch, error=" - << berror(_add_batch_closure->cntl.ErrorCode()) - << ", error_text=" << _add_batch_closure->cntl.ErrorText(); - return Status::InternalError("failed to send batch"); - } + if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0) { + SCOPED_RAW_TIMER(&_actual_consume_ns); + AddBatchReq send_batch; + { + std::lock_guard lg(_pending_batches_lock); + DCHECK(!_pending_batches.empty()); + send_batch = std::move(_pending_batches.front()); + _pending_batches.pop(); + _pending_batches_num--; + } - if (_add_batch_closure->result.has_execution_time_us()) { - _parent->update_node_add_batch_counter(_node_id, - _add_batch_closure->result.execution_time_us(), - _add_batch_closure->result.wait_lock_time_us()); - } - return {_add_batch_closure->result.status()}; -} + auto row_batch = std::move(send_batch.first); + auto request = std::move(send_batch.second); // doesn't need to be saved in heap -Status NodeChannel::_send_cur_batch(bool eos) { - RETURN_IF_ERROR(_wait_in_flight_packet()); + // tablet_ids has already set when add row + request.set_packet_seq(_next_packet_seq); + if (row_batch->num_rows() > 0) { + SCOPED_RAW_TIMER(&_serialize_batch_ns); + row_batch->serialize(request.mutable_row_batch()); + } - // tablet_ids has already set when add row - _add_batch_request.set_eos(eos); - _add_batch_request.set_packet_seq(_next_packet_seq); - if (_batch->num_rows() > 0) { - SCOPED_RAW_TIMER(_parent->mutable_serialize_batch_ns()); - _batch->serialize(_add_batch_request.mutable_row_batch()); - } + _add_batch_closure->reset(); + _add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms); - _add_batch_closure->ref(); - _add_batch_closure->cntl.Reset(); - _add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms); + if (request.eos()) { + for (auto pid : _parent->_partition_ids) { + request.add_partition_ids(pid); + } - if (eos) { - for (auto pid : _parent->_partition_ids) { - _add_batch_request.add_partition_ids(pid); + // eos request must be the last request + _add_batch_closure->end_mark(); + _send_finished = true; + DCHECK(_pending_batches_num == 0); } - } - _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, - &_add_batch_request, - &_add_batch_closure->result, - _add_batch_closure); - _add_batch_request.clear_tablet_ids(); - _add_batch_request.clear_row_batch(); - _add_batch_request.clear_partition_ids(); + _add_batch_closure->set_in_flight(); + _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, + &_add_batch_closure->result, _add_batch_closure); - _has_in_flight_packet = true; - _next_packet_seq++; + _next_packet_seq++; + } - _batch->reset(); - return Status::OK(); + return _send_finished ? 0 : 1; } -IndexChannel::~IndexChannel() { +Status NodeChannel::none_of(std::initializer_list vars) { + bool none = std::none_of(vars.begin(), vars.end(), [](bool var) { return var; }); + Status st = Status::OK(); + if (!none) { + std::string vars_str; + std::for_each(vars.begin(), vars.end(), + [&vars_str](bool var) -> void { vars_str += (var ? "1/" : "0/"); }); + if (!vars_str.empty()) { + vars_str.pop_back(); // 0/1/0/ -> 0/1/0 + } + st = Status::InternalError(vars_str); + } + + return st; } -Status IndexChannel::init(RuntimeState* state, - const std::vector& tablets) { +IndexChannel::~IndexChannel() {} + +Status IndexChannel::init(RuntimeState* state, const std::vector& tablets) { for (auto& tablet : tablets) { auto location = _parent->_location->find_tablet(tablet.tablet_id); if (location == nullptr) { @@ -287,119 +380,37 @@ Status IndexChannel::init(RuntimeState* state, return Status::OK(); } -Status IndexChannel::open() { - for (auto& it : _node_channels) { - it.second->open(); - } - for (auto& it : _node_channels) { - auto channel = it.second; - auto st = channel->open_wait(); - if (!st.ok()) { - LOG(WARNING) << "tablet open failed, load_id=" << _parent->_load_id - << ", node=" << channel->node_info()->host - << ":" << channel->node_info()->brpc_port - << ", errmsg=" << st.get_error_msg(); - if (_handle_failed_node(channel)) { - LOG(WARNING) << "open failed, load_id=" << _parent->_load_id; - return st; - } - } - } - return Status::OK(); -} - Status IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) { auto it = _channels_by_tablet.find(tablet_id); DCHECK(it != std::end(_channels_by_tablet)) << "unknown tablet, tablet_id=" << tablet_id; for (auto channel : it->second) { - if (channel->already_failed()) { - continue; - } + // if this node channel is already failed, this add_row will be skipped auto st = channel->add_row(tuple, tablet_id); if (!st.ok()) { - LOG(WARNING) << "NodeChannel add row failed, load_id=" << _parent->_load_id - << ", tablet_id=" << tablet_id - << ", node=" << channel->node_info()->host - << ":" << channel->node_info()->brpc_port - << ", errmsg=" << st.get_error_msg(); - if (_handle_failed_node(channel)) { - LOG(WARNING) << "add row failed, load_id=" << _parent->_load_id; - return st; - } + mark_as_failed(channel); } } - return Status::OK(); -} -Status IndexChannel::close(RuntimeState* state) { - std::vector need_wait_channels; - need_wait_channels.reserve(_node_channels.size()); - - Status close_status; - for (auto& it : _node_channels) { - auto channel = it.second; - if (channel->already_failed() || !close_status.ok()) { - channel->cancel(); - continue; - } - auto st = channel->close(state); - if (st.ok()) { - need_wait_channels.push_back(channel); - } else { - LOG(WARNING) << "close node channel failed, load_id=" << _parent->_load_id - << ", node=" << channel->node_info()->host - << ":" << channel->node_info()->brpc_port - << ", errmsg=" << st.get_error_msg(); - if (_handle_failed_node(channel)) { - LOG(WARNING) << "close failed, load_id=" << _parent->_load_id; - close_status = st; - } - } + if (has_intolerable_failure()) { + return Status::InternalError("index channel has intoleralbe failure"); } - if (close_status.ok()) { - for (auto channel : need_wait_channels) { - auto st = channel->close_wait(state); - if (!st.ok()) { - LOG(WARNING) << "close_wait node channel failed, load_id=" << _parent->_load_id - << ", node=" << channel->node_info()->host - << ":" << channel->node_info()->brpc_port - << ", errmsg=" << st.get_error_msg(); - if (_handle_failed_node(channel)) { - LOG(WARNING) << "close_wait failed, load_id=" << _parent->_load_id; - return st; - } - } - } - } - return close_status; -} - -void IndexChannel::cancel() { - for (auto& it : _node_channels) { - it.second->cancel(); - } + return Status::OK(); } -bool IndexChannel::_handle_failed_node(NodeChannel* channel) { - DCHECK(!channel->already_failed()); - channel->set_failed(); - _num_failed_channels++; - return _num_failed_channels >= ((_parent->_num_repicas + 1) / 2); +bool IndexChannel::has_intolerable_failure() { + return _failed_channels.size() >= ((_parent->_num_repicas + 1) / 2); } -OlapTableSink::OlapTableSink(ObjectPool* pool, - const RowDescriptor& row_desc, - const std::vector& texprs, - Status* status) +OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector& texprs, Status* status) : _pool(pool), _input_row_desc(row_desc), _filter_bitmap(1024) { if (!texprs.empty()) { *status = Expr::create_expr_trees(_pool, texprs, &_output_expr_ctxs); } } -OlapTableSink::~OlapTableSink() { -} +OlapTableSink::~OlapTableSink() {} Status OlapTableSink::init(const TDataSink& t_sink) { DCHECK(t_sink.__isset.olap_table_sink); @@ -443,8 +454,8 @@ Status OlapTableSink::prepare(RuntimeState* state) { SCOPED_TIMER(_profile->total_time_counter()); // Prepare the exprs to run. - RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, - _input_row_desc, _expr_mem_tracker.get())); + RETURN_IF_ERROR( + Expr::prepare(_output_expr_ctxs, state, _input_row_desc, _expr_mem_tracker.get())); // get table's tuple descriptor _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id); @@ -455,17 +466,17 @@ Status OlapTableSink::prepare(RuntimeState* state) { if (!_output_expr_ctxs.empty()) { if (_output_expr_ctxs.size() != _output_tuple_desc->slots().size()) { LOG(WARNING) << "number of exprs is not same with slots, num_exprs=" - << _output_expr_ctxs.size() - << ", num_slots=" << _output_tuple_desc->slots().size(); + << _output_expr_ctxs.size() + << ", num_slots=" << _output_tuple_desc->slots().size(); return Status::InternalError("number of exprs is not same with slots"); } for (int i = 0; i < _output_expr_ctxs.size(); ++i) { if (!is_type_compatible(_output_expr_ctxs[i]->root()->type().type, _output_tuple_desc->slots()[i]->type().type)) { LOG(WARNING) << "type of exprs is not match slot's, expr_type=" - << _output_expr_ctxs[i]->root()->type().type - << ", slot_type=" << _output_tuple_desc->slots()[i]->type().type - << ", slot_name=" << _output_tuple_desc->slots()[i]->col_name(); + << _output_expr_ctxs[i]->root()->type().type + << ", slot_type=" << _output_tuple_desc->slots()[i]->type().type + << ", slot_name=" << _output_tuple_desc->slots()[i]->col_name(); return Status::InternalError("expr's type is not same with slot's"); } } @@ -513,8 +524,8 @@ Status OlapTableSink::prepare(RuntimeState* state) { _convert_batch_timer = ADD_TIMER(_profile, "ConvertBatchTime"); _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime"); _open_timer = ADD_TIMER(_profile, "OpenTime"); - _close_timer = ADD_TIMER(_profile, "CloseTime"); - _wait_in_flight_packet_timer = ADD_TIMER(_profile, "WaitInFlightPacketTime"); + _close_timer = ADD_TIMER(_profile, "CloseWaitTime"); + _non_blocking_send_timer = ADD_TIMER(_profile, "NonBlockingSendTime"); _serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime"); _load_mem_limit = state->get_load_mem_limit(); @@ -546,9 +557,29 @@ Status OlapTableSink::open(RuntimeState* state) { // Prepare the exprs to run. RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state)); - for (auto channel : _channels) { - RETURN_IF_ERROR(channel->open()); + for (auto index_channel : _channels) { + index_channel->for_each_node_channel([](NodeChannel* ch) { ch->open(); }); + } + + for (auto index_channel : _channels) { + index_channel->for_each_node_channel([&index_channel](NodeChannel* ch) { + auto st = ch->open_wait(); + if (!st.ok()) { + LOG(WARNING) << "tablet open failed, " << ch->print_load_info() + << ", node=" << ch->node_info()->host << ":" + << ch->node_info()->brpc_port << ", errmsg=" << st.get_error_msg(); + index_channel->mark_as_failed(ch); + } + }); + + if (index_channel->has_intolerable_failure()) { + LOG(WARNING) << "open failed, load_id=" << _load_id; + return Status::InternalError("intolerable failure in opening node channels"); + } } + + _sender_thread = std::thread(&OlapTableSink::_send_batch_process, this); + return Status::OK(); } @@ -584,7 +615,7 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) { if (!_partition->find_tablet(tuple, &partition, &dist_hash)) { std::stringstream ss; ss << "no partition for this tuple. tuple=" - << Tuple::to_string(tuple, *_output_tuple_desc); + << Tuple::to_string(tuple, *_output_tuple_desc); #if BE_TEST LOG(INFO) << ss.str(); #else @@ -610,57 +641,86 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { // only if status is ok can we call this _profile->total_time_counter(). // if status is not ok, this sink may not be prepared, so that _profile is null SCOPED_TIMER(_profile->total_time_counter()); + // BE id -> add_batch method counter + std::unordered_map node_add_batch_counter_map; + int64_t serialize_batch_ns = 0, mem_exceeded_block_ns = 0, queue_push_lock_ns = 0, + actual_consume_ns = 0; { SCOPED_TIMER(_close_timer); - for (auto channel : _channels) { - status = channel->close(state); - if (!status.ok()) { - LOG(WARNING) << "close channel failed, load_id=" << print_id(_load_id) - << ", txn_id=" << _txn_id; - } + for (auto index_channel : _channels) { + index_channel->for_each_node_channel( + [](NodeChannel* ch) { WARN_IF_ERROR(ch->mark_close(), ""); }); + } + + for (auto index_channel : _channels) { + index_channel->for_each_node_channel([&status, &state, &node_add_batch_counter_map, + &serialize_batch_ns, &mem_exceeded_block_ns, + &queue_push_lock_ns, + &actual_consume_ns](NodeChannel* ch) { + status = ch->close_wait(state); + if (!status.ok()) { + LOG(WARNING) << "close channel failed, " << ch->print_load_info(); + } + ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns, + &mem_exceeded_block_ns, &queue_push_lock_ns, + &actual_consume_ns); + }); } } + // TODO need to be improved + LOG(INFO) << "total mem_exceeded_block_ns=" << mem_exceeded_block_ns + << ", total queue_push_lock_ns=" << queue_push_lock_ns + << ", total actual_consume_ns=" << actual_consume_ns; + COUNTER_SET(_input_rows_counter, _number_input_rows); COUNTER_SET(_output_rows_counter, _number_output_rows); COUNTER_SET(_filtered_rows_counter, _number_filtered_rows); COUNTER_SET(_send_data_timer, _send_data_ns); COUNTER_SET(_convert_batch_timer, _convert_batch_ns); COUNTER_SET(_validate_data_timer, _validate_data_ns); - COUNTER_SET(_wait_in_flight_packet_timer, _wait_in_flight_packet_ns); - COUNTER_SET(_serialize_batch_timer, _serialize_batch_ns); + COUNTER_SET(_non_blocking_send_timer, _non_blocking_send_ns); + COUNTER_SET(_serialize_batch_timer, serialize_batch_ns); // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node - int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + state->num_rows_load_unselected(); + int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + + state->num_rows_load_unselected(); state->set_num_rows_load_total(num_rows_load_total); state->update_num_rows_load_filtered(_number_filtered_rows); // print log of add batch time of all node, for tracing load performance easily std::stringstream ss; ss << "finished to close olap table sink. load_id=" << print_id(_load_id) - << ", txn_id=" << _txn_id << ", node add batch time(ms)/wait lock time(ms)/num: "; - for (auto const& pair : _node_add_batch_counter_map) { - ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_ns / 1000) << ")(" - << (pair.second.add_batch_wait_lock_time_ns / 1000) << ")(" + << ", txn_id=" << _txn_id << ", node add batch time(ms)/wait lock time(ms)/num: "; + for (auto const& pair : node_add_batch_counter_map) { + ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_us / 1000) + << ")(" << (pair.second.add_batch_wait_lock_time_us / 1000) << ")(" << pair.second.add_batch_num << ")} "; } LOG(INFO) << ss.str(); - } else { for (auto channel : _channels) { - channel->cancel(); + channel->for_each_node_channel([](NodeChannel* ch) { ch->cancel(); }); } } + + // Sender join() must put after node channels mark_close/cancel. + // But there is no specific sequence required between sender join() & close_wait(). + if (_sender_thread.joinable()) { + _sender_thread.join(); + } + Expr::close(_output_expr_ctxs, state); _output_batch.reset(); return status; } -void OlapTableSink::_convert_batch(RuntimeState* state, RowBatch* input_batch, RowBatch* output_batch) { +void OlapTableSink::_convert_batch(RuntimeState* state, RowBatch* input_batch, + RowBatch* output_batch) { DCHECK_GE(output_batch->capacity(), input_batch->num_rows()); int commit_rows = 0; for (int i = 0; i < input_batch->num_rows(); ++i) { auto src_row = input_batch->get_row(i); - Tuple* dst_tuple = (Tuple*)output_batch->tuple_data_pool()->allocate( - _output_tuple_desc->byte_size()); + Tuple* dst_tuple = + (Tuple*)output_batch->tuple_data_pool()->allocate(_output_tuple_desc->byte_size()); bool ignore_this_row = false; for (int j = 0; j < _output_expr_ctxs.size(); ++j) { auto src_val = _output_expr_ctxs[j]->get_value(src_row); @@ -727,10 +787,10 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* if (str_val->len > desc->type().len) { std::stringstream ss; ss << "the length of input is too long than schema. " - << "column_name: " << desc->col_name() << "; " - << "input_str: [" << std::string(str_val->ptr, str_val->len) << "] " - << "schema length: " << desc->type().len << "; " - << "actual length: " << str_val->len << "; "; + << "column_name: " << desc->col_name() << "; " + << "input_str: [" << std::string(str_val->ptr, str_val->len) << "] " + << "schema length: " << desc->type().len << "; " + << "actual length: " << str_val->len << "; "; #if BE_TEST LOG(INFO) << ss.str(); #else @@ -743,9 +803,8 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* continue; } // padding 0 to CHAR field - if (desc->type().type == TYPE_CHAR - && str_val->len < desc->type().len) { - auto new_ptr = (char*)batch->tuple_data_pool()->allocate(desc->type().len); + if (desc->type().type == TYPE_CHAR && str_val->len < desc->type().len) { + auto new_ptr = (char*)batch->tuple_data_pool()->allocate(desc->type().len); memcpy(new_ptr, str_val->ptr, str_val->len); memset(new_ptr + str_val->len, 0, desc->type().len - str_val->len); @@ -776,9 +835,9 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* if (*dec_val > _max_decimal_val[i] || *dec_val < _min_decimal_val[i]) { std::stringstream ss; ss << "decimal value is not valid for defination, column=" << desc->col_name() - << ", value=" << dec_val->to_string() - << ", precision=" << desc->type().precision - << ", scale=" << desc->type().scale; + << ", value=" << dec_val->to_string() + << ", precision=" << desc->type().precision + << ", scale=" << desc->type().scale; #if BE_TEST LOG(INFO) << ss.str(); #else @@ -814,9 +873,9 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* if (dec_val > _max_decimalv2_val[i] || dec_val < _min_decimalv2_val[i]) { std::stringstream ss; ss << "decimal value is not valid for defination, column=" << desc->col_name() - << ", value=" << dec_val.to_string() - << ", precision=" << desc->type().precision - << ", scale=" << desc->type().scale; + << ", value=" << dec_val.to_string() + << ", precision=" << desc->type().precision + << ", scale=" << desc->type().scale; #if BE_TEST LOG(INFO) << ss.str(); #else @@ -834,7 +893,7 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* if (!HyperLogLog::is_valid(*hll_val)) { std::stringstream ss; ss << "Content of HLL type column is invalid" - << "column_name: " << desc->col_name() << "; "; + << "column_name: " << desc->col_name() << "; "; #if BE_TEST LOG(INFO) << ss.str(); #else @@ -855,5 +914,24 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* return filtered_rows; } +void OlapTableSink::_send_batch_process() { + SCOPED_RAW_TIMER(&_non_blocking_send_ns); + while (true) { + int running_channels_num = 0; + for (auto index_channel : _channels) { + index_channel->for_each_node_channel([&running_channels_num](NodeChannel* ch) { + running_channels_num += ch->try_send_and_fetch_status(); + }); + } + + if (running_channels_num == 0) { + LOG(INFO) << "all node channels are stopped(maybe finished/offending/cancelled), " + "consumer thread exit."; + return; + } + SleepFor(MonoDelta::FromMilliseconds(config::olap_table_sink_send_interval_ms)); + } } -} + +} // namespace stream_load +} // namespace doris diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 77fb89f1cbd853..9ff18cf773d65e 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -18,22 +18,23 @@ #pragma once #include +#include #include #include #include #include #include -#include "common/status.h" #include "common/object_pool.h" +#include "common/status.h" #include "exec/data_sink.h" #include "exec/tablet_info.h" #include "gen_cpp/Types_types.h" #include "gen_cpp/internal_service.pb.h" #include "gen_cpp/palo_internal_service.pb.h" #include "util/bitmap.h" -#include "util/thrift_util.h" #include "util/ref_count_closure.h" +#include "util/thrift_util.h" namespace doris { @@ -47,18 +48,103 @@ class ExprContext; class TExpr; namespace stream_load { - + class OlapTableSink; +// The counter of add_batch rpc of a single node +struct AddBatchCounter { + // total execution time of a add_batch rpc + int64_t add_batch_execution_time_us = 0; + // lock waiting time in a add_batch rpc + int64_t add_batch_wait_lock_time_us = 0; + // number of add_batch call + int64_t add_batch_num = 0; + AddBatchCounter& operator+=(const AddBatchCounter& rhs) { + add_batch_execution_time_us += rhs.add_batch_execution_time_us; + add_batch_wait_lock_time_us += rhs.add_batch_wait_lock_time_us; + add_batch_num += rhs.add_batch_num; + return *this; + } + friend AddBatchCounter operator+(const AddBatchCounter& lhs, const AddBatchCounter& rhs) { + AddBatchCounter sum = lhs; + sum += rhs; + return sum; + } +}; + +// It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence. +// So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction. +// Delete this point is safe, don't worry about RPC callback will run after ReusableClosure deleted. +template +class ReusableClosure : public google::protobuf::Closure { +public: + ReusableClosure() : cid(INVALID_BTHREAD_ID) {} + ~ReusableClosure() { + // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. + join(); + } + + static ReusableClosure* create() { return new ReusableClosure(); } + + void addFailedHandler(std::function fn) { failed_handler = fn; } + void addSuccessHandler(std::function fn) { success_handler = fn; } + + void join() { + if (cid != INVALID_BTHREAD_ID) { + brpc::Join(cid); + } + } + + // plz follow this order: reset() -> set_in_flight() -> send brpc batch + void reset() { + join(); + DCHECK(_packet_in_flight == false); + cntl.Reset(); + cid = cntl.call_id(); + } + + void set_in_flight() { + DCHECK(_packet_in_flight == false); + _packet_in_flight = true; + } + + bool is_packet_in_flight() { return _packet_in_flight; } + + void end_mark() { + DCHECK(_is_last_rpc == false); + _is_last_rpc = true; + } + + void Run() override { + DCHECK(_packet_in_flight); + if (cntl.Failed()) { + LOG(WARNING) << "failed to send brpc batch, error=" << berror(cntl.ErrorCode()) + << ", error_text=" << cntl.ErrorText(); + failed_handler(); + } else { + success_handler(result, _is_last_rpc); + } + _packet_in_flight = false; + } + + brpc::Controller cntl; + T result; + +private: + brpc::CallId cid; + std::atomic _packet_in_flight{false}; + std::atomic _is_last_rpc{false}; + std::function failed_handler; + std::function success_handler; +}; + class NodeChannel { public: NodeChannel(OlapTableSink* parent, int64_t index_id, int64_t node_id, int32_t schema_hash); ~NodeChannel() noexcept; // called before open, used to add tablet loacted in this backend - void add_tablet(const TTabletWithPartition& tablet) { - _all_tablets.emplace_back(tablet); - } + void add_tablet(const TTabletWithPartition& tablet) { _all_tablets.emplace_back(tablet); } Status init(RuntimeState* state); @@ -68,99 +154,128 @@ class NodeChannel { Status add_row(Tuple* tuple, int64_t tablet_id); - Status close(RuntimeState* state); + // two ways to stop channel: + // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response. + // 2. just cancel() + Status mark_close(); Status close_wait(RuntimeState* state); void cancel(); - int64_t node_id() const { return _node_id; } + // return: + // 0: stopped, send finished(eos request has been sent), or any internal error; + // 1: running, haven't reach eos. + // only allow 1 rpc in flight + int try_send_and_fetch_status(); + + void time_report(std::unordered_map* add_batch_counter_map, + int64_t* serialize_batch_ns, int64_t* mem_exceeded_block_ns, + int64_t* queue_push_lock_ns, int64_t* actual_consume_ns) { + (*add_batch_counter_map)[_node_id] += _add_batch_counter; + *serialize_batch_ns += _serialize_batch_ns; + *mem_exceeded_block_ns += _mem_exceeded_block_ns; + *queue_push_lock_ns += _queue_push_lock_ns; + *actual_consume_ns += _actual_consume_ns; + } - void set_failed() { _already_failed = true; } - bool already_failed() const { return _already_failed; } + int64_t node_id() const { return _node_id; } const NodeInfo* node_info() const { return _node_info; } + std::string print_load_info() const { return _load_info; } + std::string name() const { return _name; } -private: - Status _send_cur_batch(bool eos = false); - // wait inflight packet finish, return error if inflight packet return failed - Status _wait_in_flight_packet(); - - Status _close(RuntimeState* state); + Status none_of(std::initializer_list vars); private: OlapTableSink* _parent = nullptr; int64_t _index_id = -1; int64_t _node_id = -1; int32_t _schema_hash = 0; + std::string _load_info; + std::string _name; TupleDescriptor* _tuple_desc = nullptr; const NodeInfo* _node_info = nullptr; - bool _already_failed = false; - bool _has_in_flight_packet = false; // this should be set in init() using config int _rpc_timeout_ms = 60000; int64_t _next_packet_seq = 0; - std::unique_ptr _batch; + // user cancel or get some errors + std::atomic _cancelled{false}; + + // send finished means the consumer thread which send the rpc can exit + std::atomic _send_finished{false}; + + // add batches finished means the last rpc has be responsed, used to check whether this channel can be closed + std::atomic _add_batches_finished{false}; + + bool _eos_is_produced{false}; // only for restricting producer behaviors + + std::unique_ptr _row_desc; + int _batch_size = 0; + std::unique_ptr _cur_batch; + PTabletWriterAddBatchRequest _cur_add_batch_request; + + std::mutex _pending_batches_lock; + using AddBatchReq = std::pair, PTabletWriterAddBatchRequest>; + std::queue _pending_batches; + std::atomic _pending_batches_num{0}; + palo::PInternalService_Stub* _stub = nullptr; RefCountClosure* _open_closure = nullptr; - RefCountClosure* _add_batch_closure = nullptr; + ReusableClosure* _add_batch_closure = nullptr; std::vector _all_tablets; - PTabletWriterAddBatchRequest _add_batch_request; + std::vector _tablet_commit_infos; + + AddBatchCounter _add_batch_counter; + int64_t _serialize_batch_ns = 0; + + int64_t _mem_exceeded_block_ns = 0; + int64_t _queue_push_lock_ns = 0; + int64_t _actual_consume_ns = 0; }; class IndexChannel { public: IndexChannel(OlapTableSink* parent, int64_t index_id, int32_t schema_hash) - : _parent(parent), _index_id(index_id), - _schema_hash(schema_hash) { - } + : _parent(parent), _index_id(index_id), _schema_hash(schema_hash) {} ~IndexChannel(); - Status init(RuntimeState* state, - const std::vector& tablets); - Status open(); - Status add_row(Tuple* tuple, int64_t tablet_id); + Status init(RuntimeState* state, const std::vector& tablets); - Status close(RuntimeState* state); + Status add_row(Tuple* tuple, int64_t tablet_id); - void cancel(); + void for_each_node_channel(const std::function& func) { + for (auto& it : _node_channels) { + func(it.second); + } + } -private: - // return true if this load can't success. - bool _handle_failed_node(NodeChannel* channel); + void mark_as_failed(const NodeChannel* ch) { _failed_channels.insert(ch->node_id()); } + bool has_intolerable_failure(); private: OlapTableSink* _parent; int64_t _index_id; int32_t _schema_hash; - int _num_failed_channels = 0; // BeId -> channel std::unordered_map _node_channels; // from tablet_id to backend channel std::unordered_map> _channels_by_tablet; + // BeId + std::set _failed_channels; }; -// The counter of add_batch rpc of a single node -struct AddBatchCounter { - // total execution time of a add_batch rpc - int64_t add_batch_execution_time_ns = 0; - // lock waiting time in a add_batch rpc - int64_t add_batch_wait_lock_time_ns = 0; - // number of add_batch call - int64_t add_batch_num = 0; -}; - -// write data to Olap Table. -// this class distributed data according to +// Write data to Olap Table. +// When OlapTableSink::open() called, there will be a consumer thread running in the background. +// When you call OlapTableSink::send(), you will be the productor who products pending batches. +// Join the consumer thread in close(). class OlapTableSink : public DataSink { public: // Construct from thrift struct which is generated by FE. - OlapTableSink(ObjectPool* pool, - const RowDescriptor& row_desc, - const std::vector& texprs, + OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector& texprs, Status* status); ~OlapTableSink() override; @@ -172,29 +287,11 @@ class OlapTableSink : public DataSink { Status send(RuntimeState* state, RowBatch* batch) override; + // close() will send RPCs too. If RPCs failed, return error. Status close(RuntimeState* state, Status close_status) override; // Returns the runtime profile for the sink. - RuntimeProfile* profile() override { - return _profile; - } - - // these 2 counters does not thread-safe. make sure only one thread - // at a time can modify them. - int64_t* mutable_wait_in_flight_packet_ns() { return &_wait_in_flight_packet_ns; } - int64_t* mutable_serialize_batch_ns() { return &_serialize_batch_ns; } - void update_node_add_batch_counter(int64_t be_id, int64_t add_batch_time_ns, int64_t wait_lock_time_ns) { - auto search = _node_add_batch_counter_map.find(be_id); - if (search == _node_add_batch_counter_map.end()) { - AddBatchCounter new_counter; - _node_add_batch_counter_map.emplace(be_id, std::move(new_counter)); - } - - AddBatchCounter& counter = _node_add_batch_counter_map[be_id]; - counter.add_batch_execution_time_ns += add_batch_time_ns; - counter.add_batch_wait_lock_time_ns += wait_lock_time_ns; - counter.add_batch_num += 1; - } + RuntimeProfile* profile() override { return _profile; } private: // convert input batch to output batch which will be loaded into OLAP table. @@ -206,6 +303,11 @@ class OlapTableSink : public DataSink { // invalid row number is set in Bitmap int _validate_data(RuntimeState* state, RowBatch* batch, Bitmap* filter_bitmap); + // the consumer func of sending pending batches in every NodeChannel. + // use polling & NodeChannel::try_send_and_fetch_status() to achieve nonblocking sending. + // only focus on pending batches and channel status, the internal errors of NodeChannels will be handled by the productor + void _send_batch_process(); + private: friend class NodeChannel; friend class IndexChannel; @@ -254,6 +356,8 @@ class OlapTableSink : public DataSink { // index_channel std::vector _channels; + std::thread _sender_thread; + std::vector _max_decimal_val; std::vector _min_decimal_val; @@ -264,7 +368,7 @@ class OlapTableSink : public DataSink { int64_t _convert_batch_ns = 0; int64_t _validate_data_ns = 0; int64_t _send_data_ns = 0; - int64_t _wait_in_flight_packet_ns = 0; + int64_t _non_blocking_send_ns = 0; int64_t _serialize_batch_ns = 0; int64_t _number_input_rows = 0; int64_t _number_output_rows = 0; @@ -278,12 +382,9 @@ class OlapTableSink : public DataSink { RuntimeProfile::Counter* _validate_data_timer = nullptr; RuntimeProfile::Counter* _open_timer = nullptr; RuntimeProfile::Counter* _close_timer = nullptr; - RuntimeProfile::Counter* _wait_in_flight_packet_timer = nullptr; + RuntimeProfile::Counter* _non_blocking_send_timer = nullptr; RuntimeProfile::Counter* _serialize_batch_timer = nullptr; - // BE id -> add_batch method counter - std::unordered_map _node_add_batch_counter_map; - // load mem limit is for remote load channel int64_t _load_mem_limit = -1; @@ -291,5 +392,5 @@ class OlapTableSink : public DataSink { int64_t _load_channel_timeout_s = 0; }; -} -} +} // namespace stream_load +} // namespace doris diff --git a/be/test/exec/tablet_sink_test.cpp b/be/test/exec/tablet_sink_test.cpp index bef07940d7f0bd..0f563b384dfca0 100644 --- a/be/test/exec/tablet_sink_test.cpp +++ b/be/test/exec/tablet_sink_test.cpp @@ -19,25 +19,23 @@ #include +#include "common/config.h" #include "gen_cpp/HeartbeatService_types.h" #include "gen_cpp/internal_service.pb.h" -#include "common/config.h" +#include "runtime/bufferpool/reservation_tracker.h" #include "runtime/decimal_value.h" +#include "runtime/descriptor_helper.h" #include "runtime/exec_env.h" +#include "runtime/result_queue_mgr.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" +#include "runtime/stream_load/load_stream_mgr.h" #include "runtime/thread_resource_mgr.h" #include "runtime/tuple_row.h" -#include "runtime/stream_load/load_stream_mgr.h" #include "service/brpc.h" #include "util/brpc_stub_cache.h" #include "util/cpu_info.h" #include "util/debug/leakcheck_disabler.h" -#include "runtime/descriptor_helper.h" -#include "runtime/bufferpool/reservation_tracker.h" -#include "runtime/exec_env.h" -#include "runtime/result_queue_mgr.h" -#include "runtime/thread_resource_mgr.h" namespace doris { namespace stream_load { @@ -46,8 +44,8 @@ Status k_add_batch_status; class OlapTableSinkTest : public testing::Test { public: - OlapTableSinkTest() { } - virtual ~OlapTableSinkTest() { } + OlapTableSinkTest() {} + virtual ~OlapTableSinkTest() {} void SetUp() override { k_add_batch_status = Status::OK(); _env = ExecEnv::GetInstance(); @@ -66,9 +64,16 @@ class OlapTableSinkTest : public testing::Test { SAFE_DELETE(_env->_master_info); SAFE_DELETE(_env->_thread_mgr); SAFE_DELETE(_env->_buffer_reservation); + if (_server) { + _server->Stop(100); + _server->Join(); + SAFE_DELETE(_server); + } } + private: - ExecEnv* _env; + ExecEnv* _env = nullptr; + brpc::Server* _server = nullptr; }; TDataSink get_data_sink(TDescriptorTable* desc_tbl) { @@ -106,24 +111,42 @@ TDataSink get_data_sink(TDescriptorTable* desc_tbl) { { TTupleDescriptorBuilder tuple_builder; - tuple_builder.add_slot( - TSlotDescriptorBuilder().type(TYPE_INT).column_name("c1").column_pos(1).build()); - tuple_builder.add_slot( - TSlotDescriptorBuilder().type(TYPE_BIGINT).column_name("c2").column_pos(2).build()); - tuple_builder.add_slot( - TSlotDescriptorBuilder().string_type(10).column_name("c3").column_pos(3).build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_INT) + .column_name("c1") + .column_pos(1) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_BIGINT) + .column_name("c2") + .column_pos(2) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(10) + .column_name("c3") + .column_pos(3) + .build()); tuple_builder.build(&dtb); } { TTupleDescriptorBuilder tuple_builder; - tuple_builder.add_slot( - TSlotDescriptorBuilder().type(TYPE_INT).column_name("c1").column_pos(1).build()); - tuple_builder.add_slot( - TSlotDescriptorBuilder().type(TYPE_BIGINT).column_name("c2").column_pos(2).build()); - tuple_builder.add_slot( - TSlotDescriptorBuilder().string_type(20).column_name("c3").column_pos(3).build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_INT) + .column_name("c1") + .column_pos(1) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_BIGINT) + .column_name("c2") + .column_pos(2) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(20) + .column_name("c3") + .column_pos(3) + .build()); tuple_builder.build(&dtb); } @@ -212,10 +235,16 @@ TDataSink get_decimal_sink(TDescriptorTable* desc_tbl) { { TTupleDescriptorBuilder tuple_builder; - tuple_builder.add_slot( - TSlotDescriptorBuilder().type(TYPE_INT).column_name("c1").column_pos(1).build()); - tuple_builder.add_slot( - TSlotDescriptorBuilder().decimal_type(5, 2).column_name("c2").column_pos(2).build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_INT) + .column_name("c1") + .column_pos(1) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .decimal_type(5, 2) + .column_name("c2") + .column_pos(2) + .build()); tuple_builder.build(&dtb); } @@ -271,8 +300,8 @@ TDataSink get_decimal_sink(TDescriptorTable* desc_tbl) { class TestInternalService : public palo::PInternalService { public: - TestInternalService() { } - virtual ~TestInternalService() { } + TestInternalService() {} + virtual ~TestInternalService() {} void transmit_data(::google::protobuf::RpcController* controller, const ::doris::PTransmitDataParams* request, @@ -281,7 +310,6 @@ class TestInternalService : public palo::PInternalService { done->Run(); } - void tablet_writer_open(google::protobuf::RpcController* controller, const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, @@ -306,7 +334,7 @@ class TestInternalService : public palo::PInternalService { if (request->has_row_batch() && _row_desc != nullptr) { MemTracker tracker; RowBatch batch(*_row_desc, request->row_batch(), &tracker); - for (int i = 0; i < batch.num_rows(); ++i){ + for (int i = 0; i < batch.num_rows(); ++i) { LOG(INFO) << batch.get_row(i)->to_string(*_row_desc); _output_set->emplace(batch.get_row(i)->to_string(*_row_desc)); } @@ -330,13 +358,13 @@ class TestInternalService : public palo::PInternalService { TEST_F(OlapTableSinkTest, normal) { // start brpc service first - auto server = new brpc::Server(); + _server = new brpc::Server(); auto service = new TestInternalService(); - server->AddService(service, brpc::SERVER_OWNS_SERVICE); + ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); brpc::ServerOptions options; { debug::ScopedLeakCheckDisabler disable_lsan; - server->Start(4356, &options); + _server->Start(4356, &options); } TUniqueId fragment_id; @@ -361,7 +389,7 @@ TEST_F(OlapTableSinkTest, normal) { LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string(); RowDescriptor row_desc(*desc_tbl, {0}, {false}); - + OlapTableSink sink(&obj_pool, row_desc, {}, &st); ASSERT_TRUE(st.ok()); @@ -433,21 +461,17 @@ TEST_F(OlapTableSinkTest, normal) { // 2node * 2 ASSERT_EQ(1, state.num_rows_load_filtered()); - - server->Stop(100); - server->Join(); - delete server; } TEST_F(OlapTableSinkTest, convert) { // start brpc service first - auto server = new brpc::Server(); + _server = new brpc::Server(); auto service = new TestInternalService(); - server->AddService(service, brpc::SERVER_OWNS_SERVICE); + ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); brpc::ServerOptions options; { debug::ScopedLeakCheckDisabler disable_lsan; - server->Start(4356, &options); + _server->Start(4356, &options); } TUniqueId fragment_id; @@ -470,7 +494,7 @@ TEST_F(OlapTableSinkTest, convert) { RowDescriptor row_desc(*desc_tbl, {0}, {false}); - // expr + // expr std::vector exprs; exprs.resize(3); exprs[0].nodes.resize(1); @@ -570,10 +594,6 @@ TEST_F(OlapTableSinkTest, convert) { // 2node * 2 ASSERT_EQ(0, state.num_rows_load_filtered()); - - server->Stop(100); - server->Join(); - delete server; } TEST_F(OlapTableSinkTest, init_fail1) { @@ -595,7 +615,7 @@ TEST_F(OlapTableSinkTest, init_fail1) { RowDescriptor row_desc(*desc_tbl, {0}, {false}); - // expr + // expr std::vector exprs; exprs.resize(1); exprs[0].nodes.resize(1); @@ -653,7 +673,7 @@ TEST_F(OlapTableSinkTest, init_fail3) { RowDescriptor row_desc(*desc_tbl, {0}, {false}); - // expr + // expr std::vector exprs; exprs.resize(3); exprs[0].nodes.resize(1); @@ -712,7 +732,7 @@ TEST_F(OlapTableSinkTest, init_fail4) { RowDescriptor row_desc(*desc_tbl, {0}, {false}); - // expr + // expr std::vector exprs; exprs.resize(3); exprs[0].nodes.resize(1); @@ -755,13 +775,13 @@ TEST_F(OlapTableSinkTest, init_fail4) { TEST_F(OlapTableSinkTest, add_batch_failed) { // start brpc service first - auto server = new brpc::Server(); + _server = new brpc::Server(); auto service = new TestInternalService(); - server->AddService(service, brpc::SERVER_OWNS_SERVICE); + ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); brpc::ServerOptions options; { debug::ScopedLeakCheckDisabler disable_lsan; - server->Start(4356, &options); + _server->Start(4356, &options); } TUniqueId fragment_id; @@ -782,7 +802,7 @@ TEST_F(OlapTableSinkTest, add_batch_failed) { RowDescriptor row_desc(*desc_tbl, {0}, {false}); - // expr + // expr std::vector exprs; exprs.resize(3); exprs[0].nodes.resize(1); @@ -845,21 +865,17 @@ TEST_F(OlapTableSinkTest, add_batch_failed) { // close st = sink.close(&state, Status::OK()); ASSERT_FALSE(st.ok()); - - server->Stop(100); - server->Join(); - delete server; } TEST_F(OlapTableSinkTest, decimal) { // start brpc service first - auto server = new brpc::Server(); + _server = new brpc::Server(); auto service = new TestInternalService(); - server->AddService(service, brpc::SERVER_OWNS_SERVICE); + ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); brpc::ServerOptions options; { debug::ScopedLeakCheckDisabler disable_lsan; - server->Start(4356, &options); + _server->Start(4356, &options); } TUniqueId fragment_id; @@ -885,7 +901,7 @@ TEST_F(OlapTableSinkTest, decimal) { service->_row_desc = &row_desc; std::set output_set; service->_output_set = &output_set; - + OlapTableSink sink(&obj_pool, row_desc, {}, &st); ASSERT_TRUE(st.ok()); @@ -946,14 +962,10 @@ TEST_F(OlapTableSinkTest, decimal) { ASSERT_TRUE(output_set.count("[(12 12.3)]") > 0); ASSERT_TRUE(output_set.count("[(13 123.12)]") > 0); // ASSERT_TRUE(output_set.count("[(14 999.99)]") > 0); - - server->Stop(100); - server->Join(); - delete server; } -} -} +} // namespace stream_load +} // namespace doris int main(int argc, char* argv[]) { doris::CpuInfo::init();