From 0cbacaf01d377d702068d2f04c97d4c74e3af388 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Mon, 29 Jun 2020 19:13:03 +0800 Subject: [PATCH] [Refactor] Replace some boost to std in OlapScanNode (#3934) Replace some boost to std in OlapScanNode. This refactor seems solve the problem describe in #3929. Because I found that BE will crash to calling `boost::condition_variable.notify_all()`. But after upgrade to this, BE does not crash any more. --- be/src/exec/olap_scan_node.cpp | 40 +++++++++---------- be/src/exec/olap_scan_node.h | 24 +++++------ .../java/org/apache/doris/system/Backend.java | 4 +- 3 files changed, 32 insertions(+), 36 deletions(-) diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 06f64f8ddb416b..ffde6900dca9c1 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -57,7 +57,6 @@ OlapScanNode::OlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const Descr _start(false), _scanner_done(false), _transfer_done(false), - _wait_duration(0, 0, 1, 0), _status(Status::OK()), _resource_info(nullptr), _buffered_bytes(0), @@ -186,9 +185,9 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo // check if Canceled. if (state->is_cancelled()) { - boost::unique_lock l(_row_batches_lock); + std::unique_lock l(_row_batches_lock); _transfer_done = true; - boost::lock_guard guard(_status_mutex); + std::lock_guard guard(_status_mutex); if (LIKELY(_status.ok())) { _status = Status::Cancelled("Cancelled"); } @@ -216,13 +215,14 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo // wait for batch from queue RowBatch* materialized_batch = NULL; { - boost::unique_lock l(_row_batches_lock); + std::unique_lock l(_row_batches_lock); while (_materialized_row_batches.empty() && !_transfer_done) { if (state->is_cancelled()) { _transfer_done = true; } - _row_batch_added_cv.timed_wait(l, _wait_duration); + // use wait_for, not wait, in case to capture the state->is_cancelled() + _row_batch_added_cv.wait_for(l, std::chrono::seconds(1)); } if (!_materialized_row_batches.empty()) { @@ -249,7 +249,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo COUNTER_SET(_rows_returned_counter, _num_rows_returned); { - boost::unique_lock l(_row_batches_lock); + std::unique_lock l(_row_batches_lock); _transfer_done = true; } @@ -276,7 +276,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo // all scanner done, change *eos to true *eos = true; - boost::lock_guard guard(_status_mutex); + std::lock_guard guard(_status_mutex); return _status; } @@ -295,7 +295,7 @@ Status OlapScanNode::close(RuntimeState* state) { // change done status { - boost::unique_lock l(_row_batches_lock); + std::unique_lock l(_row_batches_lock); _transfer_done = true; } // notify all scanner thread @@ -696,8 +696,8 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { _progress.set_logging_level(1); _transfer_thread.add_thread( - new boost::thread( - &OlapScanNode::transfer_thread, this, state)); + new boost::thread( + &OlapScanNode::transfer_thread, this, state)); return Status::OK(); } @@ -1080,7 +1080,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { for (auto scanner : _olap_scanners) { status = Expr::clone_if_not_exists(_conjunct_ctxs, state, scanner->conjunct_ctxs()); if (!status.ok()) { - boost::lock_guard guard(_status_mutex); + std::lock_guard guard(_status_mutex); _status = status; break; } @@ -1117,7 +1117,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { int assigned_thread_num = 0; // copy to local { - boost::unique_lock l(_scan_batches_lock); + std::unique_lock l(_scan_batches_lock); assigned_thread_num = _running_thread; // int64_t buf_bytes = __sync_fetch_and_add(&_buffered_bytes, 0); // How many thread can apply to this query @@ -1169,7 +1169,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { RowBatchInterface* scan_batch = NULL; { // 1 scanner idle task not empty, assign new sanner task - boost::unique_lock l(_scan_batches_lock); + std::unique_lock l(_scan_batches_lock); // scanner_row_num = 16k // 16k * 10 * 12 * 8 = 15M(>2s) --> nice=10 @@ -1211,7 +1211,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { state->resource_pool()->release_thread_token(true); VLOG(1) << "TransferThread finish."; - boost::unique_lock l(_row_batches_lock); + std::unique_lock l(_row_batches_lock); _transfer_done = true; _row_batch_added_cv.notify_all(); } @@ -1224,7 +1224,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { if (!scanner->is_open()) { status = scanner->open(); if (!status.ok()) { - boost::lock_guard guard(_status_mutex); + std::lock_guard guard(_status_mutex); _status = status; eos = true; } @@ -1276,11 +1276,11 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { } { - boost::unique_lock l(_scan_batches_lock); + std::unique_lock l(_scan_batches_lock); // if we failed, check status. if (UNLIKELY(!status.ok())) { _transfer_done = true; - boost::lock_guard guard(_status_mutex); + std::lock_guard guard(_status_mutex); if (LIKELY(_status.ok())) { _status = status; } @@ -1288,7 +1288,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { bool global_status_ok = false; { - boost::lock_guard guard(_status_mutex); + std::lock_guard guard(_status_mutex); global_status_ok = _status.ok(); } if (UNLIKELY(!global_status_ok)) { @@ -1312,7 +1312,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { // that can assure this object can keep live before we finish. scanner->close(_runtime_state); - boost::unique_lock l(_scan_batches_lock); + std::unique_lock l(_scan_batches_lock); _progress.update(1); if (_progress.done()) { // this is the right out @@ -1324,7 +1324,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { Status OlapScanNode::add_one_batch(RowBatchInterface* row_batch) { { - boost::unique_lock l(_row_batches_lock); + std::unique_lock l(_row_batches_lock); while (UNLIKELY(_materialized_row_batches.size() >= _max_materialized_row_batches diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index 335a1005ab71f0..aa0127d830a156 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -18,12 +18,9 @@ #ifndef DORIS_BE_SRC_QUERY_EXEC_OLAP_SCAN_NODE_H #define DORIS_BE_SRC_QUERY_EXEC_OLAP_SCAN_NODE_H -#include -#include -#include -#include -#include -#include +#include +#include +#include #include #include "exec/olap_common.h" @@ -192,10 +189,10 @@ class OlapScanNode : public ScanNode { // Pool for storing allocated scanner objects. We don't want to use the // runtime pool to ensure that the scanner objects are deleted before this // object is. - boost::scoped_ptr _scanner_pool; + std::unique_ptr _scanner_pool; - // Thread group for transfer thread boost::thread_group _transfer_thread; + // Keeps track of total splits and the number finished. ProgressUpdater _progress; @@ -205,14 +202,14 @@ class OlapScanNode : public ScanNode { // queued to avoid freeing attached resources prematurely (row batches will never depend // on resources attached to earlier batches in the queue). // This lock cannot be taken together with any other locks except _lock. - boost::mutex _row_batches_lock; - boost::condition_variable _row_batch_added_cv; - boost::condition_variable _row_batch_consumed_cv; + std::mutex _row_batches_lock; + std::condition_variable _row_batch_added_cv; + std::condition_variable _row_batch_consumed_cv; std::list _materialized_row_batches; - boost::mutex _scan_batches_lock; - boost::condition_variable _scan_batch_added_cv; + std::mutex _scan_batches_lock; + std::condition_variable _scan_batch_added_cv; int32_t _scanner_task_finish_count; std::list _scan_row_batches; @@ -225,7 +222,6 @@ class OlapScanNode : public ScanNode { bool _transfer_done; size_t _direct_conjunct_size; - boost::posix_time::time_duration _wait_duration; int _total_assign_num; int _nice; diff --git a/fe/src/main/java/org/apache/doris/system/Backend.java b/fe/src/main/java/org/apache/doris/system/Backend.java index 2144c840b71b34..31d1c9c4aad978 100644 --- a/fe/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/src/main/java/org/apache/doris/system/Backend.java @@ -21,7 +21,6 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.DiskInfo.DiskState; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -687,7 +686,8 @@ private int getDiskNum() { * status.newItem = xxx; */ public class BackendStatus { - public String lastSuccessReportTabletsTime = FeConstants.null_string; + // this will be output as json, so not using FeConstants.null_string; + public String lastSuccessReportTabletsTime = "N/A"; } }