diff --git a/src/node_platform.cc b/src/node_platform.cc index 3d1cc522db380b..3aab5196a2bccc 100644 --- a/src/node_platform.cc +++ b/src/node_platform.cc @@ -40,9 +40,10 @@ static void PlatformWorkerThread(void* data) { worker_data->platform_workers_ready->Signal(lock); } - while (std::unique_ptr task = pending_worker_tasks->BlockingPop()) { + while (std::unique_ptr task = + pending_worker_tasks->Lock().BlockingPop()) { task->Run(); - pending_worker_tasks->NotifyOfCompletion(); + pending_worker_tasks->Lock().NotifyOfCompletion(); } } @@ -73,13 +74,15 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { } void PostDelayedTask(std::unique_ptr task, double delay_in_seconds) { - tasks_.Push(std::make_unique(this, std::move(task), - delay_in_seconds)); + auto locked = tasks_.Lock(); + locked.Push(std::make_unique( + this, std::move(task), delay_in_seconds)); uv_async_send(&flush_tasks_); } void Stop() { - tasks_.Push(std::make_unique(this)); + auto locked = tasks_.Lock(); + locked.Push(std::make_unique(this)); uv_async_send(&flush_tasks_); } @@ -100,8 +103,14 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { static void FlushTasks(uv_async_t* flush_tasks) { DelayedTaskScheduler* scheduler = ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop); - while (std::unique_ptr task = scheduler->tasks_.Pop()) + + std::queue> tasks_to_run = + scheduler->tasks_.Lock().PopAll(); + while (!tasks_to_run.empty()) { + std::unique_ptr task = std::move(tasks_to_run.front()); + tasks_to_run.pop(); task->Run(); + } } class StopTask : public Task { @@ -149,7 +158,8 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { static void RunTask(uv_timer_t* timer) { DelayedTaskScheduler* scheduler = ContainerOf(&DelayedTaskScheduler::loop_, timer->loop); - scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer)); + scheduler->pending_worker_tasks_->Lock().Push( + scheduler->TakeTimerTask(timer)); } std::unique_ptr TakeTimerTask(uv_timer_t* timer) { @@ -203,7 +213,7 @@ WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) { } void WorkerThreadsTaskRunner::PostTask(std::unique_ptr task) { - pending_worker_tasks_.Push(std::move(task)); + pending_worker_tasks_.Lock().Push(std::move(task)); } void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr task, @@ -212,11 +222,11 @@ void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr task, } void WorkerThreadsTaskRunner::BlockingDrain() { - pending_worker_tasks_.BlockingDrain(); + pending_worker_tasks_.Lock().BlockingDrain(); } void WorkerThreadsTaskRunner::Shutdown() { - pending_worker_tasks_.Stop(); + pending_worker_tasks_.Lock().Stop(); delayed_task_scheduler_->Stop(); for (size_t i = 0; i < threads_.size(); i++) { CHECK_EQ(0, uv_thread_join(threads_[i].get())); @@ -253,12 +263,13 @@ void PerIsolatePlatformData::PostIdleTaskImpl( void PerIsolatePlatformData::PostTaskImpl(std::unique_ptr task, const v8::SourceLocation& location) { - if (flush_tasks_ == nullptr) { - // V8 may post tasks during Isolate disposal. In that case, the only - // sensible path forward is to discard the task. - return; - } - foreground_tasks_.Push(std::move(task)); + // The task can be posted from any V8 background worker thread, even when + // the foreground task runner is being cleaned up by Shutdown(). In that + // case, make sure we wait until the shutdown is completed (which leads + // to flush_tasks_ == nullptr, and the task will be discarded). + auto locked = foreground_tasks_.Lock(); + if (flush_tasks_ == nullptr) return; + locked.Push(std::move(task)); uv_async_send(flush_tasks_); } @@ -266,16 +277,13 @@ void PerIsolatePlatformData::PostDelayedTaskImpl( std::unique_ptr task, double delay_in_seconds, const v8::SourceLocation& location) { - if (flush_tasks_ == nullptr) { - // V8 may post tasks during Isolate disposal. In that case, the only - // sensible path forward is to discard the task. - return; - } + auto locked = foreground_delayed_tasks_.Lock(); + if (flush_tasks_ == nullptr) return; std::unique_ptr delayed(new DelayedTask()); delayed->task = std::move(task); delayed->platform_data = shared_from_this(); delayed->timeout = delay_in_seconds; - foreground_delayed_tasks_.Push(std::move(delayed)); + locked.Push(std::move(delayed)); uv_async_send(flush_tasks_); } @@ -301,32 +309,30 @@ void PerIsolatePlatformData::AddShutdownCallback(void (*callback)(void*), } void PerIsolatePlatformData::Shutdown() { - if (flush_tasks_ == nullptr) - return; + auto foreground_tasks_locked = foreground_tasks_.Lock(); + auto foreground_delayed_tasks_locked = foreground_delayed_tasks_.Lock(); - // While there should be no V8 tasks in the queues at this point, it is - // possible that Node.js-internal tasks from e.g. the inspector are still - // lying around. We clear these queues and ignore the return value, - // effectively deleting the tasks instead of running them. - foreground_delayed_tasks_.PopAll(); - foreground_tasks_.PopAll(); + foreground_delayed_tasks_locked.PopAll(); + foreground_tasks_locked.PopAll(); scheduled_delayed_tasks_.clear(); - // Both destroying the scheduled_delayed_tasks_ lists and closing - // flush_tasks_ handle add tasks to the event loop. We keep a count of all - // non-closed handles, and when that reaches zero, we inform any shutdown - // callbacks that the platform is done as far as this Isolate is concerned. - self_reference_ = shared_from_this(); - uv_close(reinterpret_cast(flush_tasks_), - [](uv_handle_t* handle) { - std::unique_ptr flush_tasks { - reinterpret_cast(handle) }; - PerIsolatePlatformData* platform_data = - static_cast(flush_tasks->data); - platform_data->DecreaseHandleCount(); - platform_data->self_reference_.reset(); - }); - flush_tasks_ = nullptr; + if (flush_tasks_ != nullptr) { + // Both destroying the scheduled_delayed_tasks_ lists and closing + // flush_tasks_ handle add tasks to the event loop. We keep a count of all + // non-closed handles, and when that reaches zero, we inform any shutdown + // callbacks that the platform is done as far as this Isolate is concerned. + self_reference_ = shared_from_this(); + uv_close(reinterpret_cast(flush_tasks_), + [](uv_handle_t* handle) { + std::unique_ptr flush_tasks{ + reinterpret_cast(handle)}; + PerIsolatePlatformData* platform_data = + static_cast(flush_tasks->data); + platform_data->DecreaseHandleCount(); + platform_data->self_reference_.reset(); + }); + flush_tasks_ = nullptr; + } } void PerIsolatePlatformData::DecreaseHandleCount() { @@ -472,39 +478,48 @@ void NodePlatform::DrainTasks(Isolate* isolate) { bool PerIsolatePlatformData::FlushForegroundTasksInternal() { bool did_work = false; - while (std::unique_ptr delayed = - foreground_delayed_tasks_.Pop()) { + std::queue> delayed_tasks_to_schedule = + foreground_delayed_tasks_.Lock().PopAll(); + while (!delayed_tasks_to_schedule.empty()) { + std::unique_ptr delayed = + std::move(delayed_tasks_to_schedule.front()); + delayed_tasks_to_schedule.pop(); + did_work = true; uint64_t delay_millis = llround(delayed->timeout * 1000); delayed->timer.data = static_cast(delayed.get()); uv_timer_init(loop_, &delayed->timer); - // Timers may not guarantee queue ordering of events with the same delay if - // the delay is non-zero. This should not be a problem in practice. + // Timers may not guarantee queue ordering of events with the same delay + // if the delay is non-zero. This should not be a problem in practice. uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0); uv_unref(reinterpret_cast(&delayed->timer)); uv_handle_count_++; - scheduled_delayed_tasks_.emplace_back(delayed.release(), - [](DelayedTask* delayed) { - uv_close(reinterpret_cast(&delayed->timer), - [](uv_handle_t* handle) { - std::unique_ptr task { - static_cast(handle->data) }; - task->platform_data->DecreaseHandleCount(); - }); - }); + scheduled_delayed_tasks_.emplace_back( + delayed.release(), [](DelayedTask* delayed) { + uv_close(reinterpret_cast(&delayed->timer), + [](uv_handle_t* handle) { + std::unique_ptr task{ + static_cast(handle->data)}; + task->platform_data->DecreaseHandleCount(); + }); + }); + } + + std::queue> tasks; + { + auto locked = foreground_tasks_.Lock(); + tasks = locked.PopAll(); } - // Move all foreground tasks into a separate queue and flush that queue. - // This way tasks that are posted while flushing the queue will be run on the - // next call of FlushForegroundTasksInternal. - std::queue> tasks = foreground_tasks_.PopAll(); + while (!tasks.empty()) { std::unique_ptr task = std::move(tasks.front()); tasks.pop(); did_work = true; RunForegroundTask(std::move(task)); } + return did_work; } @@ -594,66 +609,63 @@ TaskQueue::TaskQueue() outstanding_tasks_(0), stopped_(false), task_queue_() { } template -void TaskQueue::Push(std::unique_ptr task) { - Mutex::ScopedLock scoped_lock(lock_); - outstanding_tasks_++; - task_queue_.push(std::move(task)); - tasks_available_.Signal(scoped_lock); +TaskQueue::Locked::Locked(TaskQueue* queue) + : queue_(queue), lock_(queue->lock_) {} + +template +void TaskQueue::Locked::Push(std::unique_ptr task) { + queue_->outstanding_tasks_++; + queue_->task_queue_.push(std::move(task)); + queue_->tasks_available_.Signal(lock_); } template -std::unique_ptr TaskQueue::Pop() { - Mutex::ScopedLock scoped_lock(lock_); - if (task_queue_.empty()) { +std::unique_ptr TaskQueue::Locked::Pop() { + if (queue_->task_queue_.empty()) { return std::unique_ptr(nullptr); } - std::unique_ptr result = std::move(task_queue_.front()); - task_queue_.pop(); + std::unique_ptr result = std::move(queue_->task_queue_.front()); + queue_->task_queue_.pop(); return result; } template -std::unique_ptr TaskQueue::BlockingPop() { - Mutex::ScopedLock scoped_lock(lock_); - while (task_queue_.empty() && !stopped_) { - tasks_available_.Wait(scoped_lock); +std::unique_ptr TaskQueue::Locked::BlockingPop() { + while (queue_->task_queue_.empty() && !queue_->stopped_) { + queue_->tasks_available_.Wait(lock_); } - if (stopped_) { + if (queue_->stopped_) { return std::unique_ptr(nullptr); } - std::unique_ptr result = std::move(task_queue_.front()); - task_queue_.pop(); + std::unique_ptr result = std::move(queue_->task_queue_.front()); + queue_->task_queue_.pop(); return result; } template -void TaskQueue::NotifyOfCompletion() { - Mutex::ScopedLock scoped_lock(lock_); - if (--outstanding_tasks_ == 0) { - tasks_drained_.Broadcast(scoped_lock); +void TaskQueue::Locked::NotifyOfCompletion() { + if (--queue_->outstanding_tasks_ == 0) { + queue_->tasks_drained_.Broadcast(lock_); } } template -void TaskQueue::BlockingDrain() { - Mutex::ScopedLock scoped_lock(lock_); - while (outstanding_tasks_ > 0) { - tasks_drained_.Wait(scoped_lock); +void TaskQueue::Locked::BlockingDrain() { + while (queue_->outstanding_tasks_ > 0) { + queue_->tasks_drained_.Wait(lock_); } } template -void TaskQueue::Stop() { - Mutex::ScopedLock scoped_lock(lock_); - stopped_ = true; - tasks_available_.Broadcast(scoped_lock); +void TaskQueue::Locked::Stop() { + queue_->stopped_ = true; + queue_->tasks_available_.Broadcast(lock_); } template -std::queue> TaskQueue::PopAll() { - Mutex::ScopedLock scoped_lock(lock_); +std::queue> TaskQueue::Locked::PopAll() { std::queue> result; - result.swap(task_queue_); + result.swap(queue_->task_queue_); return result; } diff --git a/src/node_platform.h b/src/node_platform.h index d6f445ce9cd93d..6462f06f6983b2 100644 --- a/src/node_platform.h +++ b/src/node_platform.h @@ -22,16 +22,28 @@ class PerIsolatePlatformData; template class TaskQueue { public: + class Locked { + public: + void Push(std::unique_ptr task); + std::unique_ptr Pop(); + std::unique_ptr BlockingPop(); + void NotifyOfCompletion(); + void BlockingDrain(); + void Stop(); + std::queue> PopAll(); + + private: + friend class TaskQueue; + explicit Locked(TaskQueue* queue); + + TaskQueue* queue_; + Mutex::ScopedLock lock_; + }; + TaskQueue(); ~TaskQueue() = default; - void Push(std::unique_ptr task); - std::unique_ptr Pop(); - std::unique_ptr BlockingPop(); - std::queue> PopAll(); - void NotifyOfCompletion(); - void BlockingDrain(); - void Stop(); + Locked Lock() { return Locked(this); } private: Mutex lock_; @@ -98,6 +110,8 @@ class PerIsolatePlatformData void RunForegroundTask(std::unique_ptr task); static void RunForegroundTask(uv_timer_t* timer); + uv_async_t* flush_tasks_ = nullptr; + struct ShutdownCallback { void (*cb)(void*); void* data; @@ -110,7 +124,9 @@ class PerIsolatePlatformData v8::Isolate* const isolate_; uv_loop_t* const loop_; - uv_async_t* flush_tasks_ = nullptr; + + // When acquiring locks for both task queues, lock foreground_tasks_ + // first then foreground_delayed_tasks_ to avoid deadlocks. TaskQueue foreground_tasks_; TaskQueue foreground_delayed_tasks_;