diff --git a/be/src/common/config.h b/be/src/common/config.h index f85e0ff75a03a4..dbd9f53a037fc6 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -296,8 +296,6 @@ namespace config { // you may need to increase this timeout if using larger 'streaming_load_max_mb', // or encounter 'tablet writer write failed' error when loading. // CONF_Int32(tablet_writer_rpc_timeout_sec, "600"); - // OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc. - CONF_mInt32(olap_table_sink_send_interval_ms, "10"); // Fragment thread pool CONF_Int32(fragment_pool_thread_num, "64"); diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 5e91fece51e6fe..59ff01646d8981 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -26,17 +26,18 @@ #include "runtime/tuple_row.h" #include "olap/hll.h" -#include "service/brpc.h" #include "util/brpc_stub_cache.h" -#include "util/monotime.h" #include "util/uid_util.h" +#include "service/brpc.h" namespace doris { namespace stream_load { -NodeChannel::NodeChannel(OlapTableSink* parent, int64_t index_id, int64_t node_id, - int32_t schema_hash) - : _parent(parent), _index_id(index_id), _node_id(node_id), _schema_hash(schema_hash) {} +NodeChannel::NodeChannel(OlapTableSink* parent, int64_t index_id, + int64_t node_id, int32_t schema_hash) + : _parent(parent), _index_id(index_id), + _node_id(node_id), _schema_hash(schema_hash) { +} NodeChannel::~NodeChannel() { if (_open_closure != nullptr) { @@ -46,11 +47,12 @@ NodeChannel::~NodeChannel() { _open_closure = nullptr; } if (_add_batch_closure != nullptr) { - // it's safe to delete, but may take some time to wait until brpc joined - delete _add_batch_closure; + if (_add_batch_closure->unref()) { + delete _add_batch_closure; + } _add_batch_closure = nullptr; } - _cur_add_batch_request.release_id(); + _add_batch_request.release_id(); } Status NodeChannel::init(RuntimeState* state) { @@ -61,30 +63,23 @@ Status NodeChannel::init(RuntimeState* state) { ss << "unknown node id, id=" << _node_id; return Status::InternalError(ss.str()); } + RowDescriptor row_desc(_tuple_desc, false); + _batch.reset(new RowBatch(row_desc, state->batch_size(), _parent->_mem_tracker)); - _row_desc.reset(new RowDescriptor(_tuple_desc, false)); - _batch_size = state->batch_size(); - _cur_batch.reset(new RowBatch(*_row_desc, _batch_size, _parent->_mem_tracker)); - - _stub = state->exec_env()->brpc_stub_cache()->get_stub(_node_info->host, _node_info->brpc_port); + _stub = state->exec_env()->brpc_stub_cache()->get_stub( + _node_info->host, _node_info->brpc_port); if (_stub == nullptr) { LOG(WARNING) << "Get rpc stub failed, host=" << _node_info->host - << ", port=" << _node_info->brpc_port; - _cancelled = true; + << ", port=" << _node_info->brpc_port; return Status::InternalError("get rpc stub failed"); } - // Initialize _cur_add_batch_request - _cur_add_batch_request.set_allocated_id(&_parent->_load_id); - _cur_add_batch_request.set_index_id(_index_id); - _cur_add_batch_request.set_sender_id(_parent->_sender_id); - _cur_add_batch_request.set_eos(false); + // Initialize _add_batch_request + _add_batch_request.set_allocated_id(&_parent->_load_id); + _add_batch_request.set_index_id(_index_id); + _add_batch_request.set_sender_id(_parent->_sender_id); _rpc_timeout_ms = state->query_options().query_timeout * 1000; - - _load_info = "load_id=" + print_id(_parent->_load_id) + ", txn_id" + - std::to_string(_parent->_txn_id); - _name = "NodeChannel[" + std::to_string(_index_id) + "-" + std::to_string(_node_id) + "]"; return Status::OK(); } @@ -110,7 +105,9 @@ void NodeChannel::open() { // This ref is for RPC's reference _open_closure->ref(); _open_closure->cntl.set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); - _stub->tablet_writer_open(&_open_closure->cntl, &request, &_open_closure->result, + _stub->tablet_writer_open(&_open_closure->cntl, + &request, + &_open_closure->result, _open_closure); request.release_id(); request.release_schema(); @@ -120,9 +117,8 @@ Status NodeChannel::open_wait() { _open_closure->join(); if (_open_closure->cntl.Failed()) { LOG(WARNING) << "failed to open tablet writer, error=" - << berror(_open_closure->cntl.ErrorCode()) - << ", error_text=" << _open_closure->cntl.ErrorText(); - _cancelled = true; + << berror(_open_closure->cntl.ErrorCode()) + << ", error_text=" << _open_closure->cntl.ErrorText(); return Status::InternalError("failed to open tablet writer"); } Status status(_open_closure->result.status()); @@ -132,134 +128,54 @@ Status NodeChannel::open_wait() { _open_closure = nullptr; // add batch closure - _add_batch_closure = ReusableClosure::create(); - _add_batch_closure->addFailedHandler([this]() { - _cancelled = true; - LOG(WARNING) << "NodeChannel add batch req rpc failed, " << print_load_info() - << ", node=" << node_info()->host << ":" << node_info()->brpc_port; - }); - - _add_batch_closure->addSuccessHandler( - [this](const PTabletWriterAddBatchResult& result, bool is_last_rpc) { - Status status(result.status()); - if (status.ok()) { - if (is_last_rpc) { - for (auto& tablet : result.tablet_vec()) { - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet.tablet_id(); - commit_info.backendId = _node_id; - _tablet_commit_infos.emplace_back(std::move(commit_info)); - } - _add_batches_finished = true; - } - } else { - _cancelled = true; - LOG(WARNING) << "NodeChannel add batch req success but status isn't ok, " - << print_load_info() << ", node=" << node_info()->host << ":" - << node_info()->brpc_port << ", errmsg=" << status.get_error_msg(); - } - - if (result.has_execution_time_us()) { - _add_batch_counter.add_batch_execution_time_us += result.execution_time_us(); - _add_batch_counter.add_batch_wait_lock_time_us += result.wait_lock_time_us(); - _add_batch_counter.add_batch_num++; - } - }); + _add_batch_closure = new RefCountClosure(); + _add_batch_closure->ref(); return status; } Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { - // If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed. - auto st = none_of({_cancelled, _eos_is_produced}); - if (!st.ok()) { - return st.clone_and_prepend("already stopped, can't add_row. cancelled/eos: "); - } - - // We use OlapTableSink mem_tracker which has the same ancestor of _plan node, - // so in the ideal case, mem limit is a matter for _plan node. - // But there is still some unfinished things, we do mem limit here temporarily. - while (_parent->_mem_tracker->any_limit_exceeded()) { - SCOPED_RAW_TIMER(&_mem_exceeded_block_ns); - SleepFor(MonoDelta::FromMilliseconds(10)); - } - - auto row_no = _cur_batch->add_row(); + auto row_no = _batch->add_row(); if (row_no == RowBatch::INVALID_ROW_INDEX) { - { - SCOPED_RAW_TIMER(&_queue_push_lock_ns); - std::lock_guard l(_pending_batches_lock); - //To simplify the add_row logic, postpone adding batch into req until the time of sending req - _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); - _pending_batches_num++; - } - - _cur_batch.reset(new RowBatch(*_row_desc, _batch_size, _parent->_mem_tracker)); - _cur_add_batch_request.clear_tablet_ids(); - - row_no = _cur_batch->add_row(); + RETURN_IF_ERROR(_send_cur_batch()); + row_no = _batch->add_row(); } DCHECK_NE(row_no, RowBatch::INVALID_ROW_INDEX); - auto tuple = input_tuple->deep_copy(*_tuple_desc, _cur_batch->tuple_data_pool()); - _cur_batch->get_row(row_no)->set_tuple(0, tuple); - _cur_batch->commit_last_row(); - _cur_add_batch_request.add_tablet_ids(tablet_id); + auto tuple = input_tuple->deep_copy(*_tuple_desc, _batch->tuple_data_pool()); + _batch->get_row(row_no)->set_tuple(0, tuple); + _batch->commit_last_row(); + _add_batch_request.add_tablet_ids(tablet_id); return Status::OK(); } -Status NodeChannel::mark_close() { - auto st = none_of({_cancelled, _eos_is_produced}); - if (!st.ok()) { - return st.clone_and_prepend("already stopped, can't mark as closed. cancelled/eos: "); - } - - _cur_add_batch_request.set_eos(true); - { - std::lock_guard l(_pending_batches_lock); - _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); - _pending_batches_num++; - DCHECK(_pending_batches.back().second.eos()); - } +Status NodeChannel::close(RuntimeState* state) { + auto st = _close(state); + _batch.reset(); + return st; +} - _eos_is_produced = true; +Status NodeChannel::_close(RuntimeState* state) { + return _send_cur_batch(true); } Status NodeChannel::close_wait(RuntimeState* state) { - auto st = none_of({_cancelled, !_eos_is_produced}); - if (!st.ok()) { - return st.clone_and_prepend("already stopped, skip waiting for close. cancelled/!eos: "); - } - - // waiting for finished, it may take a long time, so we could't set a timeout - MonotonicStopWatch timer; - timer.start(); - while (!_add_batches_finished && !_cancelled) { - SleepFor(MonoDelta::FromMilliseconds(1)); - } - timer.stop(); - VLOG(1) << name() << " close_wait cost: " << timer.elapsed_time() / 1000000 << " ms"; - - { - std::lock_guard lg(_pending_batches_lock); - DCHECK(_pending_batches.empty()); - DCHECK(_cur_batch == nullptr); - } - - if (_add_batches_finished) { - state->tablet_commit_infos().insert(state->tablet_commit_infos().end(), - std::make_move_iterator(_tablet_commit_infos.begin()), - std::make_move_iterator(_tablet_commit_infos.end())); - return Status::OK(); + RETURN_IF_ERROR(_wait_in_flight_packet()); + Status status(_add_batch_closure->result.status()); + if (status.ok()) { + for (auto& tablet : _add_batch_closure->result.tablet_vec()) { + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet.tablet_id(); + commit_info.backendId = _node_id; + state->tablet_commit_infos().emplace_back(std::move(commit_info)); + } } - - return Status::InternalError("close wait failed coz rpc error"); + // clear batch after sendt + _batch.reset(); + return status; } void NodeChannel::cancel() { - // we don't need to wait last rpc finished, cause closure's release/reset will join. - // But do we need brpc::StartCancel(call_id)? - _cancelled = true; - + // Do we need to wait last rpc finished??? PTabletWriterCancelRequest request; request.set_allocated_id(&_parent->_load_id); request.set_index_id(_index_id); @@ -269,89 +185,80 @@ void NodeChannel::cancel() { closure->ref(); closure->cntl.set_timeout_ms(_rpc_timeout_ms); - _stub->tablet_writer_cancel(&closure->cntl, &request, &closure->result, closure); + _stub->tablet_writer_cancel(&closure->cntl, + &request, + &closure->result, + closure); request.release_id(); - // Beware of the destruct sequence. RowBatches will use mem_trackers(include ancestors). - // Delete RowBatches here is a better choice to reduce the potential of dtor errors. - { - std::lock_guard lg(_pending_batches_lock); - std::queue empty; - std::swap(_pending_batches, empty); - _cur_batch.reset(); - } + // reset batch + _batch.reset(); } -int NodeChannel::try_send_and_fetch_status() { - auto st = none_of({_cancelled, _send_finished}); - if (!st.ok()) { - return 0; +Status NodeChannel::_wait_in_flight_packet() { + if (!_has_in_flight_packet) { + return Status::OK(); } - if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0) { - SCOPED_RAW_TIMER(&_actual_consume_ns); - AddBatchReq send_batch; - { - std::lock_guard lg(_pending_batches_lock); - DCHECK(!_pending_batches.empty()); - send_batch = std::move(_pending_batches.front()); - _pending_batches.pop(); - _pending_batches_num--; - } + SCOPED_RAW_TIMER(_parent->mutable_wait_in_flight_packet_ns()); + _add_batch_closure->join(); + _has_in_flight_packet = false; + if (_add_batch_closure->cntl.Failed()) { + LOG(WARNING) << "failed to send batch, error=" + << berror(_add_batch_closure->cntl.ErrorCode()) + << ", error_text=" << _add_batch_closure->cntl.ErrorText(); + return Status::InternalError("failed to send batch"); + } - auto row_batch = std::move(send_batch.first); - auto request = std::move(send_batch.second); // doesn't need to be saved in heap + if (_add_batch_closure->result.has_execution_time_us()) { + _parent->update_node_add_batch_counter(_node_id, + _add_batch_closure->result.execution_time_us(), + _add_batch_closure->result.wait_lock_time_us()); + } + return {_add_batch_closure->result.status()}; +} - // tablet_ids has already set when add row - request.set_packet_seq(_next_packet_seq); - if (row_batch->num_rows() > 0) { - SCOPED_RAW_TIMER(&_serialize_batch_ns); - row_batch->serialize(request.mutable_row_batch()); - } +Status NodeChannel::_send_cur_batch(bool eos) { + RETURN_IF_ERROR(_wait_in_flight_packet()); - _add_batch_closure->reset(); - _add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms); + // tablet_ids has already set when add row + _add_batch_request.set_eos(eos); + _add_batch_request.set_packet_seq(_next_packet_seq); + if (_batch->num_rows() > 0) { + SCOPED_RAW_TIMER(_parent->mutable_serialize_batch_ns()); + _batch->serialize(_add_batch_request.mutable_row_batch()); + } - if (request.eos()) { - for (auto pid : _parent->_partition_ids) { - request.add_partition_ids(pid); - } + _add_batch_closure->ref(); + _add_batch_closure->cntl.Reset(); + _add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms); - // eos request must be the last request - _add_batch_closure->end_mark(); - _send_finished = true; - DCHECK(_pending_batches_num == 0); + if (eos) { + for (auto pid : _parent->_partition_ids) { + _add_batch_request.add_partition_ids(pid); } - - _add_batch_closure->set_in_flight(); - _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, - &_add_batch_closure->result, _add_batch_closure); - - _next_packet_seq++; } - return _send_finished ? 0 : 1; -} + _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, + &_add_batch_request, + &_add_batch_closure->result, + _add_batch_closure); + _add_batch_request.clear_tablet_ids(); + _add_batch_request.clear_row_batch(); + _add_batch_request.clear_partition_ids(); -Status NodeChannel::none_of(std::initializer_list vars) { - bool none = std::none_of(vars.begin(), vars.end(), [](bool var) { return var; }); - Status st = Status::OK(); - if (!none) { - std::string vars_str; - std::for_each(vars.begin(), vars.end(), - [&vars_str](bool var) -> void { vars_str += (var ? "1/" : "0/"); }); - if (!vars_str.empty()) { - vars_str.pop_back(); // 0/1/0/ -> 0/1/0 - } - st = Status::InternalError(vars_str); - } + _has_in_flight_packet = true; + _next_packet_seq++; - return st; + _batch->reset(); + return Status::OK(); } -IndexChannel::~IndexChannel() {} +IndexChannel::~IndexChannel() { +} -Status IndexChannel::init(RuntimeState* state, const std::vector& tablets) { +Status IndexChannel::init(RuntimeState* state, + const std::vector& tablets) { for (auto& tablet : tablets) { auto location = _parent->_location->find_tablet(tablet.tablet_id); if (location == nullptr) { @@ -380,37 +287,119 @@ Status IndexChannel::init(RuntimeState* state, const std::vectoropen(); + } + for (auto& it : _node_channels) { + auto channel = it.second; + auto st = channel->open_wait(); + if (!st.ok()) { + LOG(WARNING) << "tablet open failed, load_id=" << _parent->_load_id + << ", node=" << channel->node_info()->host + << ":" << channel->node_info()->brpc_port + << ", errmsg=" << st.get_error_msg(); + if (_handle_failed_node(channel)) { + LOG(WARNING) << "open failed, load_id=" << _parent->_load_id; + return st; + } + } + } + return Status::OK(); +} + Status IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) { auto it = _channels_by_tablet.find(tablet_id); DCHECK(it != std::end(_channels_by_tablet)) << "unknown tablet, tablet_id=" << tablet_id; for (auto channel : it->second) { - // if this node channel is already failed, this add_row will be skipped + if (channel->already_failed()) { + continue; + } auto st = channel->add_row(tuple, tablet_id); if (!st.ok()) { - mark_as_failed(channel); + LOG(WARNING) << "NodeChannel add row failed, load_id=" << _parent->_load_id + << ", tablet_id=" << tablet_id + << ", node=" << channel->node_info()->host + << ":" << channel->node_info()->brpc_port + << ", errmsg=" << st.get_error_msg(); + if (_handle_failed_node(channel)) { + LOG(WARNING) << "add row failed, load_id=" << _parent->_load_id; + return st; + } } } + return Status::OK(); +} - if (has_intolerable_failure()) { - return Status::InternalError("index channel has intoleralbe failure"); +Status IndexChannel::close(RuntimeState* state) { + std::vector need_wait_channels; + need_wait_channels.reserve(_node_channels.size()); + + Status close_status; + for (auto& it : _node_channels) { + auto channel = it.second; + if (channel->already_failed() || !close_status.ok()) { + channel->cancel(); + continue; + } + auto st = channel->close(state); + if (st.ok()) { + need_wait_channels.push_back(channel); + } else { + LOG(WARNING) << "close node channel failed, load_id=" << _parent->_load_id + << ", node=" << channel->node_info()->host + << ":" << channel->node_info()->brpc_port + << ", errmsg=" << st.get_error_msg(); + if (_handle_failed_node(channel)) { + LOG(WARNING) << "close failed, load_id=" << _parent->_load_id; + close_status = st; + } + } } - return Status::OK(); + if (close_status.ok()) { + for (auto channel : need_wait_channels) { + auto st = channel->close_wait(state); + if (!st.ok()) { + LOG(WARNING) << "close_wait node channel failed, load_id=" << _parent->_load_id + << ", node=" << channel->node_info()->host + << ":" << channel->node_info()->brpc_port + << ", errmsg=" << st.get_error_msg(); + if (_handle_failed_node(channel)) { + LOG(WARNING) << "close_wait failed, load_id=" << _parent->_load_id; + return st; + } + } + } + } + return close_status; +} + +void IndexChannel::cancel() { + for (auto& it : _node_channels) { + it.second->cancel(); + } } -bool IndexChannel::has_intolerable_failure() { - return _failed_channels.size() >= ((_parent->_num_repicas + 1) / 2); +bool IndexChannel::_handle_failed_node(NodeChannel* channel) { + DCHECK(!channel->already_failed()); + channel->set_failed(); + _num_failed_channels++; + return _num_failed_channels >= ((_parent->_num_repicas + 1) / 2); } -OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, - const std::vector& texprs, Status* status) +OlapTableSink::OlapTableSink(ObjectPool* pool, + const RowDescriptor& row_desc, + const std::vector& texprs, + Status* status) : _pool(pool), _input_row_desc(row_desc), _filter_bitmap(1024) { if (!texprs.empty()) { *status = Expr::create_expr_trees(_pool, texprs, &_output_expr_ctxs); } } -OlapTableSink::~OlapTableSink() {} +OlapTableSink::~OlapTableSink() { +} Status OlapTableSink::init(const TDataSink& t_sink) { DCHECK(t_sink.__isset.olap_table_sink); @@ -454,8 +443,8 @@ Status OlapTableSink::prepare(RuntimeState* state) { SCOPED_TIMER(_profile->total_time_counter()); // Prepare the exprs to run. - RETURN_IF_ERROR( - Expr::prepare(_output_expr_ctxs, state, _input_row_desc, _expr_mem_tracker.get())); + RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, + _input_row_desc, _expr_mem_tracker.get())); // get table's tuple descriptor _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id); @@ -466,17 +455,17 @@ Status OlapTableSink::prepare(RuntimeState* state) { if (!_output_expr_ctxs.empty()) { if (_output_expr_ctxs.size() != _output_tuple_desc->slots().size()) { LOG(WARNING) << "number of exprs is not same with slots, num_exprs=" - << _output_expr_ctxs.size() - << ", num_slots=" << _output_tuple_desc->slots().size(); + << _output_expr_ctxs.size() + << ", num_slots=" << _output_tuple_desc->slots().size(); return Status::InternalError("number of exprs is not same with slots"); } for (int i = 0; i < _output_expr_ctxs.size(); ++i) { if (!is_type_compatible(_output_expr_ctxs[i]->root()->type().type, _output_tuple_desc->slots()[i]->type().type)) { LOG(WARNING) << "type of exprs is not match slot's, expr_type=" - << _output_expr_ctxs[i]->root()->type().type - << ", slot_type=" << _output_tuple_desc->slots()[i]->type().type - << ", slot_name=" << _output_tuple_desc->slots()[i]->col_name(); + << _output_expr_ctxs[i]->root()->type().type + << ", slot_type=" << _output_tuple_desc->slots()[i]->type().type + << ", slot_name=" << _output_tuple_desc->slots()[i]->col_name(); return Status::InternalError("expr's type is not same with slot's"); } } @@ -524,8 +513,8 @@ Status OlapTableSink::prepare(RuntimeState* state) { _convert_batch_timer = ADD_TIMER(_profile, "ConvertBatchTime"); _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime"); _open_timer = ADD_TIMER(_profile, "OpenTime"); - _close_timer = ADD_TIMER(_profile, "CloseWaitTime"); - _non_blocking_send_timer = ADD_TIMER(_profile, "NonBlockingSendTime"); + _close_timer = ADD_TIMER(_profile, "CloseTime"); + _wait_in_flight_packet_timer = ADD_TIMER(_profile, "WaitInFlightPacketTime"); _serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime"); _load_mem_limit = state->get_load_mem_limit(); @@ -557,29 +546,9 @@ Status OlapTableSink::open(RuntimeState* state) { // Prepare the exprs to run. RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state)); - for (auto index_channel : _channels) { - index_channel->for_each_node_channel([](NodeChannel* ch) { ch->open(); }); - } - - for (auto index_channel : _channels) { - index_channel->for_each_node_channel([&index_channel](NodeChannel* ch) { - auto st = ch->open_wait(); - if (!st.ok()) { - LOG(WARNING) << "tablet open failed, " << ch->print_load_info() - << ", node=" << ch->node_info()->host << ":" - << ch->node_info()->brpc_port << ", errmsg=" << st.get_error_msg(); - index_channel->mark_as_failed(ch); - } - }); - - if (index_channel->has_intolerable_failure()) { - LOG(WARNING) << "open failed, load_id=" << _load_id; - return Status::InternalError("intolerable failure in opening node channels"); - } + for (auto channel : _channels) { + RETURN_IF_ERROR(channel->open()); } - - _sender_thread = std::thread(&OlapTableSink::_send_batch_process, this); - return Status::OK(); } @@ -615,7 +584,7 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) { if (!_partition->find_tablet(tuple, &partition, &dist_hash)) { std::stringstream ss; ss << "no partition for this tuple. tuple=" - << Tuple::to_string(tuple, *_output_tuple_desc); + << Tuple::to_string(tuple, *_output_tuple_desc); #if BE_TEST LOG(INFO) << ss.str(); #else @@ -641,86 +610,57 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { // only if status is ok can we call this _profile->total_time_counter(). // if status is not ok, this sink may not be prepared, so that _profile is null SCOPED_TIMER(_profile->total_time_counter()); - // BE id -> add_batch method counter - std::unordered_map node_add_batch_counter_map; - int64_t serialize_batch_ns = 0, mem_exceeded_block_ns = 0, queue_push_lock_ns = 0, - actual_consume_ns = 0; { SCOPED_TIMER(_close_timer); - for (auto index_channel : _channels) { - index_channel->for_each_node_channel( - [](NodeChannel* ch) { WARN_IF_ERROR(ch->mark_close(), ""); }); - } - - for (auto index_channel : _channels) { - index_channel->for_each_node_channel([&status, &state, &node_add_batch_counter_map, - &serialize_batch_ns, &mem_exceeded_block_ns, - &queue_push_lock_ns, - &actual_consume_ns](NodeChannel* ch) { - status = ch->close_wait(state); - if (!status.ok()) { - LOG(WARNING) << "close channel failed, " << ch->print_load_info(); - } - ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns, - &mem_exceeded_block_ns, &queue_push_lock_ns, - &actual_consume_ns); - }); + for (auto channel : _channels) { + status = channel->close(state); + if (!status.ok()) { + LOG(WARNING) << "close channel failed, load_id=" << print_id(_load_id) + << ", txn_id=" << _txn_id; + } } } - // TODO need to be improved - LOG(INFO) << "total mem_exceeded_block_ns=" << mem_exceeded_block_ns - << ", total queue_push_lock_ns=" << queue_push_lock_ns - << ", total actual_consume_ns=" << actual_consume_ns; - COUNTER_SET(_input_rows_counter, _number_input_rows); COUNTER_SET(_output_rows_counter, _number_output_rows); COUNTER_SET(_filtered_rows_counter, _number_filtered_rows); COUNTER_SET(_send_data_timer, _send_data_ns); COUNTER_SET(_convert_batch_timer, _convert_batch_ns); COUNTER_SET(_validate_data_timer, _validate_data_ns); - COUNTER_SET(_non_blocking_send_timer, _non_blocking_send_ns); - COUNTER_SET(_serialize_batch_timer, serialize_batch_ns); + COUNTER_SET(_wait_in_flight_packet_timer, _wait_in_flight_packet_ns); + COUNTER_SET(_serialize_batch_timer, _serialize_batch_ns); // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node - int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + - state->num_rows_load_unselected(); + int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + state->num_rows_load_unselected(); state->set_num_rows_load_total(num_rows_load_total); state->update_num_rows_load_filtered(_number_filtered_rows); // print log of add batch time of all node, for tracing load performance easily std::stringstream ss; ss << "finished to close olap table sink. load_id=" << print_id(_load_id) - << ", txn_id=" << _txn_id << ", node add batch time(ms)/wait lock time(ms)/num: "; - for (auto const& pair : node_add_batch_counter_map) { - ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_us / 1000) - << ")(" << (pair.second.add_batch_wait_lock_time_us / 1000) << ")(" + << ", txn_id=" << _txn_id << ", node add batch time(ms)/wait lock time(ms)/num: "; + for (auto const& pair : _node_add_batch_counter_map) { + ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_ns / 1000) << ")(" + << (pair.second.add_batch_wait_lock_time_ns / 1000) << ")(" << pair.second.add_batch_num << ")} "; } LOG(INFO) << ss.str(); + } else { for (auto channel : _channels) { - channel->for_each_node_channel([](NodeChannel* ch) { ch->cancel(); }); + channel->cancel(); } } - - // Sender join() must put after node channels mark_close/cancel. - // But there is no specific sequence required between sender join() & close_wait(). - if (_sender_thread.joinable()) { - _sender_thread.join(); - } - Expr::close(_output_expr_ctxs, state); _output_batch.reset(); return status; } -void OlapTableSink::_convert_batch(RuntimeState* state, RowBatch* input_batch, - RowBatch* output_batch) { +void OlapTableSink::_convert_batch(RuntimeState* state, RowBatch* input_batch, RowBatch* output_batch) { DCHECK_GE(output_batch->capacity(), input_batch->num_rows()); int commit_rows = 0; for (int i = 0; i < input_batch->num_rows(); ++i) { auto src_row = input_batch->get_row(i); - Tuple* dst_tuple = - (Tuple*)output_batch->tuple_data_pool()->allocate(_output_tuple_desc->byte_size()); + Tuple* dst_tuple = (Tuple*)output_batch->tuple_data_pool()->allocate( + _output_tuple_desc->byte_size()); bool ignore_this_row = false; for (int j = 0; j < _output_expr_ctxs.size(); ++j) { auto src_val = _output_expr_ctxs[j]->get_value(src_row); @@ -787,10 +727,10 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* if (str_val->len > desc->type().len) { std::stringstream ss; ss << "the length of input is too long than schema. " - << "column_name: " << desc->col_name() << "; " - << "input_str: [" << std::string(str_val->ptr, str_val->len) << "] " - << "schema length: " << desc->type().len << "; " - << "actual length: " << str_val->len << "; "; + << "column_name: " << desc->col_name() << "; " + << "input_str: [" << std::string(str_val->ptr, str_val->len) << "] " + << "schema length: " << desc->type().len << "; " + << "actual length: " << str_val->len << "; "; #if BE_TEST LOG(INFO) << ss.str(); #else @@ -803,8 +743,9 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* continue; } // padding 0 to CHAR field - if (desc->type().type == TYPE_CHAR && str_val->len < desc->type().len) { - auto new_ptr = (char*)batch->tuple_data_pool()->allocate(desc->type().len); + if (desc->type().type == TYPE_CHAR + && str_val->len < desc->type().len) { + auto new_ptr = (char*)batch->tuple_data_pool()->allocate(desc->type().len); memcpy(new_ptr, str_val->ptr, str_val->len); memset(new_ptr + str_val->len, 0, desc->type().len - str_val->len); @@ -835,9 +776,9 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* if (*dec_val > _max_decimal_val[i] || *dec_val < _min_decimal_val[i]) { std::stringstream ss; ss << "decimal value is not valid for defination, column=" << desc->col_name() - << ", value=" << dec_val->to_string() - << ", precision=" << desc->type().precision - << ", scale=" << desc->type().scale; + << ", value=" << dec_val->to_string() + << ", precision=" << desc->type().precision + << ", scale=" << desc->type().scale; #if BE_TEST LOG(INFO) << ss.str(); #else @@ -873,9 +814,9 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* if (dec_val > _max_decimalv2_val[i] || dec_val < _min_decimalv2_val[i]) { std::stringstream ss; ss << "decimal value is not valid for defination, column=" << desc->col_name() - << ", value=" << dec_val.to_string() - << ", precision=" << desc->type().precision - << ", scale=" << desc->type().scale; + << ", value=" << dec_val.to_string() + << ", precision=" << desc->type().precision + << ", scale=" << desc->type().scale; #if BE_TEST LOG(INFO) << ss.str(); #else @@ -893,7 +834,7 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* if (!HyperLogLog::is_valid(*hll_val)) { std::stringstream ss; ss << "Content of HLL type column is invalid" - << "column_name: " << desc->col_name() << "; "; + << "column_name: " << desc->col_name() << "; "; #if BE_TEST LOG(INFO) << ss.str(); #else @@ -914,24 +855,5 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* return filtered_rows; } -void OlapTableSink::_send_batch_process() { - SCOPED_RAW_TIMER(&_non_blocking_send_ns); - while (true) { - int running_channels_num = 0; - for (auto index_channel : _channels) { - index_channel->for_each_node_channel([&running_channels_num](NodeChannel* ch) { - running_channels_num += ch->try_send_and_fetch_status(); - }); - } - - if (running_channels_num == 0) { - LOG(INFO) << "all node channels are stopped(maybe finished/offending/cancelled), " - "consumer thread exit."; - return; - } - SleepFor(MonoDelta::FromMilliseconds(config::olap_table_sink_send_interval_ms)); - } } - -} // namespace stream_load -} // namespace doris +} diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 9ff18cf773d65e..77fb89f1cbd853 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -18,23 +18,22 @@ #pragma once #include -#include #include #include #include #include #include -#include "common/object_pool.h" #include "common/status.h" +#include "common/object_pool.h" #include "exec/data_sink.h" #include "exec/tablet_info.h" #include "gen_cpp/Types_types.h" #include "gen_cpp/internal_service.pb.h" #include "gen_cpp/palo_internal_service.pb.h" #include "util/bitmap.h" -#include "util/ref_count_closure.h" #include "util/thrift_util.h" +#include "util/ref_count_closure.h" namespace doris { @@ -48,103 +47,18 @@ class ExprContext; class TExpr; namespace stream_load { - + class OlapTableSink; -// The counter of add_batch rpc of a single node -struct AddBatchCounter { - // total execution time of a add_batch rpc - int64_t add_batch_execution_time_us = 0; - // lock waiting time in a add_batch rpc - int64_t add_batch_wait_lock_time_us = 0; - // number of add_batch call - int64_t add_batch_num = 0; - AddBatchCounter& operator+=(const AddBatchCounter& rhs) { - add_batch_execution_time_us += rhs.add_batch_execution_time_us; - add_batch_wait_lock_time_us += rhs.add_batch_wait_lock_time_us; - add_batch_num += rhs.add_batch_num; - return *this; - } - friend AddBatchCounter operator+(const AddBatchCounter& lhs, const AddBatchCounter& rhs) { - AddBatchCounter sum = lhs; - sum += rhs; - return sum; - } -}; - -// It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence. -// So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction. -// Delete this point is safe, don't worry about RPC callback will run after ReusableClosure deleted. -template -class ReusableClosure : public google::protobuf::Closure { -public: - ReusableClosure() : cid(INVALID_BTHREAD_ID) {} - ~ReusableClosure() { - // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. - join(); - } - - static ReusableClosure* create() { return new ReusableClosure(); } - - void addFailedHandler(std::function fn) { failed_handler = fn; } - void addSuccessHandler(std::function fn) { success_handler = fn; } - - void join() { - if (cid != INVALID_BTHREAD_ID) { - brpc::Join(cid); - } - } - - // plz follow this order: reset() -> set_in_flight() -> send brpc batch - void reset() { - join(); - DCHECK(_packet_in_flight == false); - cntl.Reset(); - cid = cntl.call_id(); - } - - void set_in_flight() { - DCHECK(_packet_in_flight == false); - _packet_in_flight = true; - } - - bool is_packet_in_flight() { return _packet_in_flight; } - - void end_mark() { - DCHECK(_is_last_rpc == false); - _is_last_rpc = true; - } - - void Run() override { - DCHECK(_packet_in_flight); - if (cntl.Failed()) { - LOG(WARNING) << "failed to send brpc batch, error=" << berror(cntl.ErrorCode()) - << ", error_text=" << cntl.ErrorText(); - failed_handler(); - } else { - success_handler(result, _is_last_rpc); - } - _packet_in_flight = false; - } - - brpc::Controller cntl; - T result; - -private: - brpc::CallId cid; - std::atomic _packet_in_flight{false}; - std::atomic _is_last_rpc{false}; - std::function failed_handler; - std::function success_handler; -}; - class NodeChannel { public: NodeChannel(OlapTableSink* parent, int64_t index_id, int64_t node_id, int32_t schema_hash); ~NodeChannel() noexcept; // called before open, used to add tablet loacted in this backend - void add_tablet(const TTabletWithPartition& tablet) { _all_tablets.emplace_back(tablet); } + void add_tablet(const TTabletWithPartition& tablet) { + _all_tablets.emplace_back(tablet); + } Status init(RuntimeState* state); @@ -154,128 +68,99 @@ class NodeChannel { Status add_row(Tuple* tuple, int64_t tablet_id); - // two ways to stop channel: - // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response. - // 2. just cancel() - Status mark_close(); + Status close(RuntimeState* state); Status close_wait(RuntimeState* state); void cancel(); - // return: - // 0: stopped, send finished(eos request has been sent), or any internal error; - // 1: running, haven't reach eos. - // only allow 1 rpc in flight - int try_send_and_fetch_status(); - - void time_report(std::unordered_map* add_batch_counter_map, - int64_t* serialize_batch_ns, int64_t* mem_exceeded_block_ns, - int64_t* queue_push_lock_ns, int64_t* actual_consume_ns) { - (*add_batch_counter_map)[_node_id] += _add_batch_counter; - *serialize_batch_ns += _serialize_batch_ns; - *mem_exceeded_block_ns += _mem_exceeded_block_ns; - *queue_push_lock_ns += _queue_push_lock_ns; - *actual_consume_ns += _actual_consume_ns; - } - int64_t node_id() const { return _node_id; } + + void set_failed() { _already_failed = true; } + bool already_failed() const { return _already_failed; } const NodeInfo* node_info() const { return _node_info; } - std::string print_load_info() const { return _load_info; } - std::string name() const { return _name; } - Status none_of(std::initializer_list vars); +private: + Status _send_cur_batch(bool eos = false); + // wait inflight packet finish, return error if inflight packet return failed + Status _wait_in_flight_packet(); + + Status _close(RuntimeState* state); private: OlapTableSink* _parent = nullptr; int64_t _index_id = -1; int64_t _node_id = -1; int32_t _schema_hash = 0; - std::string _load_info; - std::string _name; TupleDescriptor* _tuple_desc = nullptr; const NodeInfo* _node_info = nullptr; + bool _already_failed = false; + bool _has_in_flight_packet = false; // this should be set in init() using config int _rpc_timeout_ms = 60000; int64_t _next_packet_seq = 0; - // user cancel or get some errors - std::atomic _cancelled{false}; - - // send finished means the consumer thread which send the rpc can exit - std::atomic _send_finished{false}; - - // add batches finished means the last rpc has be responsed, used to check whether this channel can be closed - std::atomic _add_batches_finished{false}; - - bool _eos_is_produced{false}; // only for restricting producer behaviors - - std::unique_ptr _row_desc; - int _batch_size = 0; - std::unique_ptr _cur_batch; - PTabletWriterAddBatchRequest _cur_add_batch_request; - - std::mutex _pending_batches_lock; - using AddBatchReq = std::pair, PTabletWriterAddBatchRequest>; - std::queue _pending_batches; - std::atomic _pending_batches_num{0}; - + std::unique_ptr _batch; palo::PInternalService_Stub* _stub = nullptr; RefCountClosure* _open_closure = nullptr; - ReusableClosure* _add_batch_closure = nullptr; + RefCountClosure* _add_batch_closure = nullptr; std::vector _all_tablets; - std::vector _tablet_commit_infos; - - AddBatchCounter _add_batch_counter; - int64_t _serialize_batch_ns = 0; - - int64_t _mem_exceeded_block_ns = 0; - int64_t _queue_push_lock_ns = 0; - int64_t _actual_consume_ns = 0; + PTabletWriterAddBatchRequest _add_batch_request; }; class IndexChannel { public: IndexChannel(OlapTableSink* parent, int64_t index_id, int32_t schema_hash) - : _parent(parent), _index_id(index_id), _schema_hash(schema_hash) {} + : _parent(parent), _index_id(index_id), + _schema_hash(schema_hash) { + } ~IndexChannel(); - Status init(RuntimeState* state, const std::vector& tablets); - + Status init(RuntimeState* state, + const std::vector& tablets); + Status open(); Status add_row(Tuple* tuple, int64_t tablet_id); - void for_each_node_channel(const std::function& func) { - for (auto& it : _node_channels) { - func(it.second); - } - } + Status close(RuntimeState* state); - void mark_as_failed(const NodeChannel* ch) { _failed_channels.insert(ch->node_id()); } - bool has_intolerable_failure(); + void cancel(); + +private: + // return true if this load can't success. + bool _handle_failed_node(NodeChannel* channel); private: OlapTableSink* _parent; int64_t _index_id; int32_t _schema_hash; + int _num_failed_channels = 0; // BeId -> channel std::unordered_map _node_channels; // from tablet_id to backend channel std::unordered_map> _channels_by_tablet; - // BeId - std::set _failed_channels; }; -// Write data to Olap Table. -// When OlapTableSink::open() called, there will be a consumer thread running in the background. -// When you call OlapTableSink::send(), you will be the productor who products pending batches. -// Join the consumer thread in close(). +// The counter of add_batch rpc of a single node +struct AddBatchCounter { + // total execution time of a add_batch rpc + int64_t add_batch_execution_time_ns = 0; + // lock waiting time in a add_batch rpc + int64_t add_batch_wait_lock_time_ns = 0; + // number of add_batch call + int64_t add_batch_num = 0; +}; + +// write data to Olap Table. +// this class distributed data according to class OlapTableSink : public DataSink { public: // Construct from thrift struct which is generated by FE. - OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector& texprs, + OlapTableSink(ObjectPool* pool, + const RowDescriptor& row_desc, + const std::vector& texprs, Status* status); ~OlapTableSink() override; @@ -287,11 +172,29 @@ class OlapTableSink : public DataSink { Status send(RuntimeState* state, RowBatch* batch) override; - // close() will send RPCs too. If RPCs failed, return error. Status close(RuntimeState* state, Status close_status) override; // Returns the runtime profile for the sink. - RuntimeProfile* profile() override { return _profile; } + RuntimeProfile* profile() override { + return _profile; + } + + // these 2 counters does not thread-safe. make sure only one thread + // at a time can modify them. + int64_t* mutable_wait_in_flight_packet_ns() { return &_wait_in_flight_packet_ns; } + int64_t* mutable_serialize_batch_ns() { return &_serialize_batch_ns; } + void update_node_add_batch_counter(int64_t be_id, int64_t add_batch_time_ns, int64_t wait_lock_time_ns) { + auto search = _node_add_batch_counter_map.find(be_id); + if (search == _node_add_batch_counter_map.end()) { + AddBatchCounter new_counter; + _node_add_batch_counter_map.emplace(be_id, std::move(new_counter)); + } + + AddBatchCounter& counter = _node_add_batch_counter_map[be_id]; + counter.add_batch_execution_time_ns += add_batch_time_ns; + counter.add_batch_wait_lock_time_ns += wait_lock_time_ns; + counter.add_batch_num += 1; + } private: // convert input batch to output batch which will be loaded into OLAP table. @@ -303,11 +206,6 @@ class OlapTableSink : public DataSink { // invalid row number is set in Bitmap int _validate_data(RuntimeState* state, RowBatch* batch, Bitmap* filter_bitmap); - // the consumer func of sending pending batches in every NodeChannel. - // use polling & NodeChannel::try_send_and_fetch_status() to achieve nonblocking sending. - // only focus on pending batches and channel status, the internal errors of NodeChannels will be handled by the productor - void _send_batch_process(); - private: friend class NodeChannel; friend class IndexChannel; @@ -356,8 +254,6 @@ class OlapTableSink : public DataSink { // index_channel std::vector _channels; - std::thread _sender_thread; - std::vector _max_decimal_val; std::vector _min_decimal_val; @@ -368,7 +264,7 @@ class OlapTableSink : public DataSink { int64_t _convert_batch_ns = 0; int64_t _validate_data_ns = 0; int64_t _send_data_ns = 0; - int64_t _non_blocking_send_ns = 0; + int64_t _wait_in_flight_packet_ns = 0; int64_t _serialize_batch_ns = 0; int64_t _number_input_rows = 0; int64_t _number_output_rows = 0; @@ -382,9 +278,12 @@ class OlapTableSink : public DataSink { RuntimeProfile::Counter* _validate_data_timer = nullptr; RuntimeProfile::Counter* _open_timer = nullptr; RuntimeProfile::Counter* _close_timer = nullptr; - RuntimeProfile::Counter* _non_blocking_send_timer = nullptr; + RuntimeProfile::Counter* _wait_in_flight_packet_timer = nullptr; RuntimeProfile::Counter* _serialize_batch_timer = nullptr; + // BE id -> add_batch method counter + std::unordered_map _node_add_batch_counter_map; + // load mem limit is for remote load channel int64_t _load_mem_limit = -1; @@ -392,5 +291,5 @@ class OlapTableSink : public DataSink { int64_t _load_channel_timeout_s = 0; }; -} // namespace stream_load -} // namespace doris +} +} diff --git a/be/test/exec/tablet_sink_test.cpp b/be/test/exec/tablet_sink_test.cpp index 0f563b384dfca0..bef07940d7f0bd 100644 --- a/be/test/exec/tablet_sink_test.cpp +++ b/be/test/exec/tablet_sink_test.cpp @@ -19,23 +19,25 @@ #include -#include "common/config.h" #include "gen_cpp/HeartbeatService_types.h" #include "gen_cpp/internal_service.pb.h" -#include "runtime/bufferpool/reservation_tracker.h" +#include "common/config.h" #include "runtime/decimal_value.h" -#include "runtime/descriptor_helper.h" #include "runtime/exec_env.h" -#include "runtime/result_queue_mgr.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" -#include "runtime/stream_load/load_stream_mgr.h" #include "runtime/thread_resource_mgr.h" #include "runtime/tuple_row.h" +#include "runtime/stream_load/load_stream_mgr.h" #include "service/brpc.h" #include "util/brpc_stub_cache.h" #include "util/cpu_info.h" #include "util/debug/leakcheck_disabler.h" +#include "runtime/descriptor_helper.h" +#include "runtime/bufferpool/reservation_tracker.h" +#include "runtime/exec_env.h" +#include "runtime/result_queue_mgr.h" +#include "runtime/thread_resource_mgr.h" namespace doris { namespace stream_load { @@ -44,8 +46,8 @@ Status k_add_batch_status; class OlapTableSinkTest : public testing::Test { public: - OlapTableSinkTest() {} - virtual ~OlapTableSinkTest() {} + OlapTableSinkTest() { } + virtual ~OlapTableSinkTest() { } void SetUp() override { k_add_batch_status = Status::OK(); _env = ExecEnv::GetInstance(); @@ -64,16 +66,9 @@ class OlapTableSinkTest : public testing::Test { SAFE_DELETE(_env->_master_info); SAFE_DELETE(_env->_thread_mgr); SAFE_DELETE(_env->_buffer_reservation); - if (_server) { - _server->Stop(100); - _server->Join(); - SAFE_DELETE(_server); - } } - private: - ExecEnv* _env = nullptr; - brpc::Server* _server = nullptr; + ExecEnv* _env; }; TDataSink get_data_sink(TDescriptorTable* desc_tbl) { @@ -111,42 +106,24 @@ TDataSink get_data_sink(TDescriptorTable* desc_tbl) { { TTupleDescriptorBuilder tuple_builder; - tuple_builder.add_slot(TSlotDescriptorBuilder() - .type(TYPE_INT) - .column_name("c1") - .column_pos(1) - .build()); - tuple_builder.add_slot(TSlotDescriptorBuilder() - .type(TYPE_BIGINT) - .column_name("c2") - .column_pos(2) - .build()); - tuple_builder.add_slot(TSlotDescriptorBuilder() - .string_type(10) - .column_name("c3") - .column_pos(3) - .build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_INT).column_name("c1").column_pos(1).build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_BIGINT).column_name("c2").column_pos(2).build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().string_type(10).column_name("c3").column_pos(3).build()); tuple_builder.build(&dtb); } { TTupleDescriptorBuilder tuple_builder; - tuple_builder.add_slot(TSlotDescriptorBuilder() - .type(TYPE_INT) - .column_name("c1") - .column_pos(1) - .build()); - tuple_builder.add_slot(TSlotDescriptorBuilder() - .type(TYPE_BIGINT) - .column_name("c2") - .column_pos(2) - .build()); - tuple_builder.add_slot(TSlotDescriptorBuilder() - .string_type(20) - .column_name("c3") - .column_pos(3) - .build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_INT).column_name("c1").column_pos(1).build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_BIGINT).column_name("c2").column_pos(2).build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().string_type(20).column_name("c3").column_pos(3).build()); tuple_builder.build(&dtb); } @@ -235,16 +212,10 @@ TDataSink get_decimal_sink(TDescriptorTable* desc_tbl) { { TTupleDescriptorBuilder tuple_builder; - tuple_builder.add_slot(TSlotDescriptorBuilder() - .type(TYPE_INT) - .column_name("c1") - .column_pos(1) - .build()); - tuple_builder.add_slot(TSlotDescriptorBuilder() - .decimal_type(5, 2) - .column_name("c2") - .column_pos(2) - .build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_INT).column_name("c1").column_pos(1).build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().decimal_type(5, 2).column_name("c2").column_pos(2).build()); tuple_builder.build(&dtb); } @@ -300,8 +271,8 @@ TDataSink get_decimal_sink(TDescriptorTable* desc_tbl) { class TestInternalService : public palo::PInternalService { public: - TestInternalService() {} - virtual ~TestInternalService() {} + TestInternalService() { } + virtual ~TestInternalService() { } void transmit_data(::google::protobuf::RpcController* controller, const ::doris::PTransmitDataParams* request, @@ -310,6 +281,7 @@ class TestInternalService : public palo::PInternalService { done->Run(); } + void tablet_writer_open(google::protobuf::RpcController* controller, const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, @@ -334,7 +306,7 @@ class TestInternalService : public palo::PInternalService { if (request->has_row_batch() && _row_desc != nullptr) { MemTracker tracker; RowBatch batch(*_row_desc, request->row_batch(), &tracker); - for (int i = 0; i < batch.num_rows(); ++i) { + for (int i = 0; i < batch.num_rows(); ++i){ LOG(INFO) << batch.get_row(i)->to_string(*_row_desc); _output_set->emplace(batch.get_row(i)->to_string(*_row_desc)); } @@ -358,13 +330,13 @@ class TestInternalService : public palo::PInternalService { TEST_F(OlapTableSinkTest, normal) { // start brpc service first - _server = new brpc::Server(); + auto server = new brpc::Server(); auto service = new TestInternalService(); - ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); + server->AddService(service, brpc::SERVER_OWNS_SERVICE); brpc::ServerOptions options; { debug::ScopedLeakCheckDisabler disable_lsan; - _server->Start(4356, &options); + server->Start(4356, &options); } TUniqueId fragment_id; @@ -389,7 +361,7 @@ TEST_F(OlapTableSinkTest, normal) { LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string(); RowDescriptor row_desc(*desc_tbl, {0}, {false}); - + OlapTableSink sink(&obj_pool, row_desc, {}, &st); ASSERT_TRUE(st.ok()); @@ -461,17 +433,21 @@ TEST_F(OlapTableSinkTest, normal) { // 2node * 2 ASSERT_EQ(1, state.num_rows_load_filtered()); + + server->Stop(100); + server->Join(); + delete server; } TEST_F(OlapTableSinkTest, convert) { // start brpc service first - _server = new brpc::Server(); + auto server = new brpc::Server(); auto service = new TestInternalService(); - ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); + server->AddService(service, brpc::SERVER_OWNS_SERVICE); brpc::ServerOptions options; { debug::ScopedLeakCheckDisabler disable_lsan; - _server->Start(4356, &options); + server->Start(4356, &options); } TUniqueId fragment_id; @@ -494,7 +470,7 @@ TEST_F(OlapTableSinkTest, convert) { RowDescriptor row_desc(*desc_tbl, {0}, {false}); - // expr + // expr std::vector exprs; exprs.resize(3); exprs[0].nodes.resize(1); @@ -594,6 +570,10 @@ TEST_F(OlapTableSinkTest, convert) { // 2node * 2 ASSERT_EQ(0, state.num_rows_load_filtered()); + + server->Stop(100); + server->Join(); + delete server; } TEST_F(OlapTableSinkTest, init_fail1) { @@ -615,7 +595,7 @@ TEST_F(OlapTableSinkTest, init_fail1) { RowDescriptor row_desc(*desc_tbl, {0}, {false}); - // expr + // expr std::vector exprs; exprs.resize(1); exprs[0].nodes.resize(1); @@ -673,7 +653,7 @@ TEST_F(OlapTableSinkTest, init_fail3) { RowDescriptor row_desc(*desc_tbl, {0}, {false}); - // expr + // expr std::vector exprs; exprs.resize(3); exprs[0].nodes.resize(1); @@ -732,7 +712,7 @@ TEST_F(OlapTableSinkTest, init_fail4) { RowDescriptor row_desc(*desc_tbl, {0}, {false}); - // expr + // expr std::vector exprs; exprs.resize(3); exprs[0].nodes.resize(1); @@ -775,13 +755,13 @@ TEST_F(OlapTableSinkTest, init_fail4) { TEST_F(OlapTableSinkTest, add_batch_failed) { // start brpc service first - _server = new brpc::Server(); + auto server = new brpc::Server(); auto service = new TestInternalService(); - ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); + server->AddService(service, brpc::SERVER_OWNS_SERVICE); brpc::ServerOptions options; { debug::ScopedLeakCheckDisabler disable_lsan; - _server->Start(4356, &options); + server->Start(4356, &options); } TUniqueId fragment_id; @@ -802,7 +782,7 @@ TEST_F(OlapTableSinkTest, add_batch_failed) { RowDescriptor row_desc(*desc_tbl, {0}, {false}); - // expr + // expr std::vector exprs; exprs.resize(3); exprs[0].nodes.resize(1); @@ -865,17 +845,21 @@ TEST_F(OlapTableSinkTest, add_batch_failed) { // close st = sink.close(&state, Status::OK()); ASSERT_FALSE(st.ok()); + + server->Stop(100); + server->Join(); + delete server; } TEST_F(OlapTableSinkTest, decimal) { // start brpc service first - _server = new brpc::Server(); + auto server = new brpc::Server(); auto service = new TestInternalService(); - ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); + server->AddService(service, brpc::SERVER_OWNS_SERVICE); brpc::ServerOptions options; { debug::ScopedLeakCheckDisabler disable_lsan; - _server->Start(4356, &options); + server->Start(4356, &options); } TUniqueId fragment_id; @@ -901,7 +885,7 @@ TEST_F(OlapTableSinkTest, decimal) { service->_row_desc = &row_desc; std::set output_set; service->_output_set = &output_set; - + OlapTableSink sink(&obj_pool, row_desc, {}, &st); ASSERT_TRUE(st.ok()); @@ -962,10 +946,14 @@ TEST_F(OlapTableSinkTest, decimal) { ASSERT_TRUE(output_set.count("[(12 12.3)]") > 0); ASSERT_TRUE(output_set.count("[(13 123.12)]") > 0); // ASSERT_TRUE(output_set.count("[(14 999.99)]") > 0); + + server->Stop(100); + server->Join(); + delete server; } -} // namespace stream_load -} // namespace doris +} +} int main(int argc, char* argv[]) { doris::CpuInfo::init();