From e5f06dc366c6e706bc15e959d27158ae63d11ba2 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Wed, 27 Sep 2023 15:55:20 +0800 Subject: [PATCH] local resume_rq each task group --- src/bthread/task_control.cpp | 6 +++++- src/bthread/task_group.cpp | 6 +++--- src/bthread/task_group.h | 23 ++--------------------- src/bthread/task_group_inl.h | 12 ++++++------ 4 files changed, 16 insertions(+), 31 deletions(-) diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index 78001618b8..042b9799b9 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -362,6 +362,10 @@ bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { TaskGroup* g = _groups[s % ngroup]; // g is possibly NULL because of concurrent _destroy_group if (g) { + if (g->pop_resume_task(tid)) { + stolen = true; + break; + } if (g->_rq.steal(tid)) { stolen = true; break; @@ -432,7 +436,7 @@ void TaskControl::print_resume_q_sizes(std::ostream &os) { // ngroup > _ngroup: nums[_ngroup ... ngroup-1] = 0 // ngroup < _ngroup: just ignore _groups[_ngroup ... ngroup-1] for (size_t i = 0; i < ngroup; ++i) { - nums[i] = (_groups[i] ? _groups[i]->_resume_rq_cnt->load(std::memory_order_relaxed) : 0); + nums[i] = (_groups[i] ? _groups[i]->_resume_rq_cnt.load(std::memory_order_relaxed) : 0); } } for (size_t i = 0; i < ngroup; ++i) { diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index ebe45991aa..1f563efcf9 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -197,9 +197,9 @@ TaskGroup::TaskGroup(TaskControl* c) #ifndef NDEBUG , _sched_recursive_guard(0) #endif - , _resume_rq_cnt(ResumeRunQueue::Instance().first) - , _resume_rq(ResumeRunQueue::Instance().second) - , _resume_consumer_token(*_resume_rq) + , _resume_rq_cnt(0) + , _resume_rq(1000) + , _resume_consumer_token(_resume_rq) { _steal_seed = butil::fast_rand(); _steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)]; diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h index cc8c4f1398..f4b0c9db17 100644 --- a/src/bthread/task_group.h +++ b/src/bthread/task_group.h @@ -52,25 +52,6 @@ class ExitException : public std::exception { void* _value; }; -// Global resumed tasks. -class ResumeRunQueue { -public: - static std::pair>, - std::shared_ptr>> Instance() { - static ResumeRunQueue instance; - return {instance.queue_size_, instance.concurrent_queue_}; - } - -private: - ResumeRunQueue() { - queue_size_ = std::make_shared>(0); - concurrent_queue_ = std::make_shared>(10000); - } - - std::shared_ptr> queue_size_; - std::shared_ptr> concurrent_queue_; -}; - // Thread-local group of tasks. // Notice that most methods involving context switching are static otherwise // pointer `this' may change after wakeup. The **pg parameters in following @@ -277,8 +258,8 @@ friend class TaskControl; int _sched_recursive_guard; - std::shared_ptr> _resume_rq_cnt; - std::shared_ptr> _resume_rq; + std::atomic _resume_rq_cnt; + moodycamel::ConcurrentQueue _resume_rq; moodycamel::ConsumerToken _resume_consumer_token; }; diff --git a/src/bthread/task_group_inl.h b/src/bthread/task_group_inl.h index f2041e147c..300cccd40d 100644 --- a/src/bthread/task_group_inl.h +++ b/src/bthread/task_group_inl.h @@ -98,21 +98,21 @@ inline void TaskGroup::push_rq(bthread_t tid) { } inline bool TaskGroup::pop_resume_task(bthread_t* tid) { - int tmp_cnt = _resume_rq_cnt->load(std::memory_order_relaxed); - if (tmp_cnt>0 && _resume_rq_cnt->compare_exchange_strong(tmp_cnt, tmp_cnt-1)){ - if(_resume_rq->try_dequeue(_resume_consumer_token, *tid)){ + int tmp_cnt = _resume_rq_cnt.load(std::memory_order_relaxed); + if (tmp_cnt > 0 && _resume_rq_cnt.compare_exchange_strong(tmp_cnt, tmp_cnt-1)){ + if(_resume_rq.try_dequeue(_resume_consumer_token, *tid)){ return true; } else { - (*_resume_rq_cnt) ++; + _resume_rq_cnt++; } } return false; } inline bool TaskGroup::push_resume_task(bthread_t tid){ - if(_resume_rq->enqueue(tid)){ - (*_resume_rq_cnt) ++; + if(_resume_rq.enqueue(tid)){ + _resume_rq_cnt++; return true; } return false;