Skip to content

Commit

Permalink
[Refactor] Replace some boost to std in OlapScanNode (#3934)
Browse files Browse the repository at this point in the history
Replace some boost to std in OlapScanNode.

This refactor seems solve the problem describe in #3929.
Because I found that BE will crash to calling `boost::condition_variable.notify_all()`.
But after upgrade to this, BE does not crash any more.
  • Loading branch information
morningman authored Jun 29, 2020
1 parent 2c8fdb6 commit 0cbacaf
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 36 deletions.
40 changes: 20 additions & 20 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ OlapScanNode::OlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
_start(false),
_scanner_done(false),
_transfer_done(false),
_wait_duration(0, 0, 1, 0),
_status(Status::OK()),
_resource_info(nullptr),
_buffered_bytes(0),
Expand Down Expand Up @@ -186,9 +185,9 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo

// check if Canceled.
if (state->is_cancelled()) {
boost::unique_lock<boost::mutex> l(_row_batches_lock);
std::unique_lock<std::mutex> l(_row_batches_lock);
_transfer_done = true;
boost::lock_guard<SpinLock> guard(_status_mutex);
std::lock_guard<SpinLock> guard(_status_mutex);
if (LIKELY(_status.ok())) {
_status = Status::Cancelled("Cancelled");
}
Expand Down Expand Up @@ -216,13 +215,14 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo
// wait for batch from queue
RowBatch* materialized_batch = NULL;
{
boost::unique_lock<boost::mutex> l(_row_batches_lock);
std::unique_lock<std::mutex> l(_row_batches_lock);
while (_materialized_row_batches.empty() && !_transfer_done) {
if (state->is_cancelled()) {
_transfer_done = true;
}

_row_batch_added_cv.timed_wait(l, _wait_duration);
// use wait_for, not wait, in case to capture the state->is_cancelled()
_row_batch_added_cv.wait_for(l, std::chrono::seconds(1));
}

if (!_materialized_row_batches.empty()) {
Expand All @@ -249,7 +249,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo
COUNTER_SET(_rows_returned_counter, _num_rows_returned);

{
boost::unique_lock<boost::mutex> l(_row_batches_lock);
std::unique_lock<std::mutex> l(_row_batches_lock);
_transfer_done = true;
}

Expand All @@ -276,7 +276,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo

// all scanner done, change *eos to true
*eos = true;
boost::lock_guard<SpinLock> guard(_status_mutex);
std::lock_guard<SpinLock> guard(_status_mutex);
return _status;
}

Expand All @@ -295,7 +295,7 @@ Status OlapScanNode::close(RuntimeState* state) {

// change done status
{
boost::unique_lock<boost::mutex> l(_row_batches_lock);
std::unique_lock<std::mutex> l(_row_batches_lock);
_transfer_done = true;
}
// notify all scanner thread
Expand Down Expand Up @@ -696,8 +696,8 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
_progress.set_logging_level(1);

_transfer_thread.add_thread(
new boost::thread(
&OlapScanNode::transfer_thread, this, state));
new boost::thread(
&OlapScanNode::transfer_thread, this, state));

return Status::OK();
}
Expand Down Expand Up @@ -1080,7 +1080,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
for (auto scanner : _olap_scanners) {
status = Expr::clone_if_not_exists(_conjunct_ctxs, state, scanner->conjunct_ctxs());
if (!status.ok()) {
boost::lock_guard<SpinLock> guard(_status_mutex);
std::lock_guard<SpinLock> guard(_status_mutex);
_status = status;
break;
}
Expand Down Expand Up @@ -1117,7 +1117,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
int assigned_thread_num = 0;
// copy to local
{
boost::unique_lock<boost::mutex> l(_scan_batches_lock);
std::unique_lock<std::mutex> l(_scan_batches_lock);
assigned_thread_num = _running_thread;
// int64_t buf_bytes = __sync_fetch_and_add(&_buffered_bytes, 0);
// How many thread can apply to this query
Expand Down Expand Up @@ -1169,7 +1169,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
RowBatchInterface* scan_batch = NULL;
{
// 1 scanner idle task not empty, assign new sanner task
boost::unique_lock<boost::mutex> l(_scan_batches_lock);
std::unique_lock<std::mutex> l(_scan_batches_lock);

// scanner_row_num = 16k
// 16k * 10 * 12 * 8 = 15M(>2s) --> nice=10
Expand Down Expand Up @@ -1211,7 +1211,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {

state->resource_pool()->release_thread_token(true);
VLOG(1) << "TransferThread finish.";
boost::unique_lock<boost::mutex> l(_row_batches_lock);
std::unique_lock<std::mutex> l(_row_batches_lock);
_transfer_done = true;
_row_batch_added_cv.notify_all();
}
Expand All @@ -1224,7 +1224,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
if (!scanner->is_open()) {
status = scanner->open();
if (!status.ok()) {
boost::lock_guard<SpinLock> guard(_status_mutex);
std::lock_guard<SpinLock> guard(_status_mutex);
_status = status;
eos = true;
}
Expand Down Expand Up @@ -1276,19 +1276,19 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
}

{
boost::unique_lock<boost::mutex> l(_scan_batches_lock);
std::unique_lock<std::mutex> l(_scan_batches_lock);
// if we failed, check status.
if (UNLIKELY(!status.ok())) {
_transfer_done = true;
boost::lock_guard<SpinLock> guard(_status_mutex);
std::lock_guard<SpinLock> guard(_status_mutex);
if (LIKELY(_status.ok())) {
_status = status;
}
}

bool global_status_ok = false;
{
boost::lock_guard<SpinLock> guard(_status_mutex);
std::lock_guard<SpinLock> guard(_status_mutex);
global_status_ok = _status.ok();
}
if (UNLIKELY(!global_status_ok)) {
Expand All @@ -1312,7 +1312,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
// that can assure this object can keep live before we finish.
scanner->close(_runtime_state);

boost::unique_lock<boost::mutex> l(_scan_batches_lock);
std::unique_lock<std::mutex> l(_scan_batches_lock);
_progress.update(1);
if (_progress.done()) {
// this is the right out
Expand All @@ -1324,7 +1324,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {

Status OlapScanNode::add_one_batch(RowBatchInterface* row_batch) {
{
boost::unique_lock<boost::mutex> l(_row_batches_lock);
std::unique_lock<std::mutex> l(_row_batches_lock);

while (UNLIKELY(_materialized_row_batches.size()
>= _max_materialized_row_batches
Expand Down
24 changes: 10 additions & 14 deletions be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@
#ifndef DORIS_BE_SRC_QUERY_EXEC_OLAP_SCAN_NODE_H
#define DORIS_BE_SRC_QUERY_EXEC_OLAP_SCAN_NODE_H

#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/recursive_mutex.hpp>
#include <boost/thread/thread.hpp>
#include <boost/variant/static_visitor.hpp>
#include <boost/thread.hpp>
#include <condition_variable>
#include <queue>

#include "exec/olap_common.h"
Expand Down Expand Up @@ -192,10 +189,10 @@ class OlapScanNode : public ScanNode {
// Pool for storing allocated scanner objects. We don't want to use the
// runtime pool to ensure that the scanner objects are deleted before this
// object is.
boost::scoped_ptr<ObjectPool> _scanner_pool;
std::unique_ptr<ObjectPool> _scanner_pool;

// Thread group for transfer thread
boost::thread_group _transfer_thread;

// Keeps track of total splits and the number finished.
ProgressUpdater _progress;

Expand All @@ -205,14 +202,14 @@ class OlapScanNode : public ScanNode {
// queued to avoid freeing attached resources prematurely (row batches will never depend
// on resources attached to earlier batches in the queue).
// This lock cannot be taken together with any other locks except _lock.
boost::mutex _row_batches_lock;
boost::condition_variable _row_batch_added_cv;
boost::condition_variable _row_batch_consumed_cv;
std::mutex _row_batches_lock;
std::condition_variable _row_batch_added_cv;
std::condition_variable _row_batch_consumed_cv;

std::list<RowBatchInterface*> _materialized_row_batches;

boost::mutex _scan_batches_lock;
boost::condition_variable _scan_batch_added_cv;
std::mutex _scan_batches_lock;
std::condition_variable _scan_batch_added_cv;
int32_t _scanner_task_finish_count;

std::list<RowBatchInterface*> _scan_row_batches;
Expand All @@ -225,7 +222,6 @@ class OlapScanNode : public ScanNode {
bool _transfer_done;
size_t _direct_conjunct_size;

boost::posix_time::time_duration _wait_duration;
int _total_assign_num;
int _nice;

Expand Down
4 changes: 2 additions & 2 deletions fe/src/main/java/org/apache/doris/system/Backend.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.DiskInfo.DiskState;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
Expand Down Expand Up @@ -687,7 +686,8 @@ private int getDiskNum() {
* status.newItem = xxx;
*/
public class BackendStatus {
public String lastSuccessReportTabletsTime = FeConstants.null_string;
// this will be output as json, so not using FeConstants.null_string;
public String lastSuccessReportTabletsTime = "N/A";
}
}

0 comments on commit 0cbacaf

Please sign in to comment.