Skip to content

Commit

Permalink
PTP: BlockingDrain for Threadpool and TaskQueue
Browse files Browse the repository at this point in the history
This completes the NodePlatform rewiring begun in a previous commit.

This BlockingDrain will wait on both V8 Tasks and libuv Tasks.
It waits on all Tasks in the Threadpool, even though NodePlatform
only cares about BlockingDrain'ing the V8 Tasks.
  • Loading branch information
davisjam committed Sep 6, 2018
1 parent 3a2bbb6 commit 8f6df02
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 12 deletions.
5 changes: 1 addition & 4 deletions src/node_platform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,7 @@ void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task,
}

void WorkerThreadsTaskRunner::BlockingDrain() {
// TODO(davisjam): No support for this in threadpool::Threadpool
// at the moment.
// I believe this is the cause of the segfaults at the end of running 'node'.
// pending_worker_tasks_.BlockingDrain();
tp_->BlockingDrain();
}

void WorkerThreadsTaskRunner::Shutdown() {
Expand Down
39 changes: 35 additions & 4 deletions src/node_threadpool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "env-inl.h"
#include "debug_utils.h"
#include "util.h"

#include <algorithm>

// TODO(davisjam): DO NOT MERGE. Only for debugging.
Expand Down Expand Up @@ -42,6 +43,8 @@ void Worker::_Run(void* data) {
task->UpdateState(Task::ASSIGNED);
task->Run();
task->UpdateState(Task::COMPLETED);

queue->NotifyOfCompletion();
}
}

Expand Down Expand Up @@ -139,7 +142,9 @@ void LibuvTask::Run() {
***************/

TaskQueue::TaskQueue()
: queue_(), stopped_(false), lock_(), tasks_available_() {
: queue_(), outstanding_tasks_(0), stopped_(false)
, lock_()
, task_available_(), tasks_drained_() {
}

bool TaskQueue::Push(std::unique_ptr<Task> task) {
Expand All @@ -151,7 +156,8 @@ bool TaskQueue::Push(std::unique_ptr<Task> task) {

task->UpdateState(Task::QUEUED);
queue_.push(std::move(task));
tasks_available_.Signal(scoped_lock);
outstanding_tasks_++;
task_available_.Signal(scoped_lock);

return true;
}
Expand All @@ -172,7 +178,7 @@ std::unique_ptr<Task> TaskQueue::BlockingPop(void) {
Mutex::ScopedLock scoped_lock(lock_);

while (queue_.empty() && !stopped_) {
tasks_available_.Wait(scoped_lock);
task_available_.Wait(scoped_lock);
}

if (queue_.empty()) {
Expand All @@ -184,10 +190,27 @@ std::unique_ptr<Task> TaskQueue::BlockingPop(void) {
return result;
}

void TaskQueue::NotifyOfCompletion(void) {
Mutex::ScopedLock scoped_lock(lock_);
outstanding_tasks_--;
CHECK_GE(outstanding_tasks_, 0);
if (!outstanding_tasks_) {
tasks_drained_.Broadcast(scoped_lock);
}
}

void TaskQueue::BlockingDrain(void) {
Mutex::ScopedLock scoped_lock(lock_);
while (outstanding_tasks_) {
tasks_drained_.Wait(scoped_lock);
}
LOG("TaskQueue::BlockingDrain: Fully drained\n");
}

void TaskQueue::Stop(void) {
Mutex::ScopedLock scoped_lock(lock_);
stopped_ = true;
tasks_available_.Broadcast(scoped_lock);
task_available_.Broadcast(scoped_lock);
}

int TaskQueue::Length(void) const {
Expand Down Expand Up @@ -234,5 +257,13 @@ int Threadpool::QueueLength(void) const {
return queue_.Length();
}

void Threadpool::BlockingDrain(void) {
queue_.BlockingDrain();
}

int Threadpool::NWorkers(void) const {
return workers_.size();
}

} // namespace threadpool
} // namespace node
22 changes: 18 additions & 4 deletions src/node_threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,18 @@ class TaskQueue {

// Return true if Push succeeds, else false.
bool Push(std::unique_ptr<Task> task);
std::unique_ptr<Task> Pop(void);

// Returns nullptr when we're done.
// Non-blocking Pop. Returns nullptr if queue is empty.
std::unique_ptr<Task> Pop(void);
// Blocking Pop. Returns nullptr if queue is empty or Stop'd.
std::unique_ptr<Task> BlockingPop(void);

// Workers should call this after completing a Task.
void NotifyOfCompletion(void);

// Block until there are no Tasks pending or scheduled.
void BlockingDrain(void);

// Subsequent Push() will fail.
// Pop calls will return nullptr once queue is drained.
void Stop();
Expand All @@ -160,11 +167,15 @@ class TaskQueue {
private:
// Structures.
std::queue<std::unique_ptr<Task>> queue_;
int outstanding_tasks_; // Number of Tasks in non-COMPLETED states.
bool stopped_;

// Synchronization.
Mutex lock_;
ConditionVariable tasks_available_;
// Signal'd when there is at least one task in the queue.
ConditionVariable task_available_;
// Signal'd when all Push'd Tasks are in COMPLETED state.
ConditionVariable tasks_drained_;
};

// A threadpool works on asynchronous Tasks.
Expand All @@ -187,7 +198,10 @@ class Threadpool {

void Post(std::unique_ptr<Task> task);
int QueueLength(void) const;
int NWorkers(void) const { return workers_.size(); }
// Block until there are no tasks pending or scheduled in the TP.
void BlockingDrain(void);

int NWorkers(void) const;

private:
TaskQueue queue_;
Expand Down

0 comments on commit 8f6df02

Please sign in to comment.