Skip to content

Commit

Permalink
local resume_rq each task group
Browse files Browse the repository at this point in the history
  • Loading branch information
MrGuin committed Sep 27, 2023
1 parent c9b7ad5 commit e5f06dc
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 31 deletions.
6 changes: 5 additions & 1 deletion src/bthread/task_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
TaskGroup* g = _groups[s % ngroup];
// g is possibly NULL because of concurrent _destroy_group
if (g) {
if (g->pop_resume_task(tid)) {
stolen = true;
break;
}
if (g->_rq.steal(tid)) {
stolen = true;
break;
Expand Down Expand Up @@ -432,7 +436,7 @@ void TaskControl::print_resume_q_sizes(std::ostream &os) {
// ngroup > _ngroup: nums[_ngroup ... ngroup-1] = 0
// ngroup < _ngroup: just ignore _groups[_ngroup ... ngroup-1]
for (size_t i = 0; i < ngroup; ++i) {
nums[i] = (_groups[i] ? _groups[i]->_resume_rq_cnt->load(std::memory_order_relaxed) : 0);
nums[i] = (_groups[i] ? _groups[i]->_resume_rq_cnt.load(std::memory_order_relaxed) : 0);
}
}
for (size_t i = 0; i < ngroup; ++i) {
Expand Down
6 changes: 3 additions & 3 deletions src/bthread/task_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ TaskGroup::TaskGroup(TaskControl* c)
#ifndef NDEBUG
, _sched_recursive_guard(0)
#endif
, _resume_rq_cnt(ResumeRunQueue::Instance().first)
, _resume_rq(ResumeRunQueue::Instance().second)
, _resume_consumer_token(*_resume_rq)
, _resume_rq_cnt(0)
, _resume_rq(1000)
, _resume_consumer_token(_resume_rq)
{
_steal_seed = butil::fast_rand();
_steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)];
Expand Down
23 changes: 2 additions & 21 deletions src/bthread/task_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,6 @@ class ExitException : public std::exception {
void* _value;
};

// Global resumed tasks.
class ResumeRunQueue {
public:
static std::pair<std::shared_ptr<std::atomic<int>>,
std::shared_ptr<moodycamel::ConcurrentQueue<bthread_t>>> Instance() {
static ResumeRunQueue instance;
return {instance.queue_size_, instance.concurrent_queue_};
}

private:
ResumeRunQueue() {
queue_size_ = std::make_shared<std::atomic<int>>(0);
concurrent_queue_ = std::make_shared<moodycamel::ConcurrentQueue<bthread_t>>(10000);
}

std::shared_ptr<std::atomic<int>> queue_size_;
std::shared_ptr<moodycamel::ConcurrentQueue<bthread_t>> concurrent_queue_;
};

// Thread-local group of tasks.
// Notice that most methods involving context switching are static otherwise
// pointer `this' may change after wakeup. The **pg parameters in following
Expand Down Expand Up @@ -277,8 +258,8 @@ friend class TaskControl;

int _sched_recursive_guard;

std::shared_ptr<std::atomic<int>> _resume_rq_cnt;
std::shared_ptr<moodycamel::ConcurrentQueue<bthread_t>> _resume_rq;
std::atomic<int> _resume_rq_cnt;
moodycamel::ConcurrentQueue<bthread_t> _resume_rq;
moodycamel::ConsumerToken _resume_consumer_token;
};

Expand Down
12 changes: 6 additions & 6 deletions src/bthread/task_group_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,21 @@ inline void TaskGroup::push_rq(bthread_t tid) {
}

inline bool TaskGroup::pop_resume_task(bthread_t* tid) {
int tmp_cnt = _resume_rq_cnt->load(std::memory_order_relaxed);
if (tmp_cnt>0 && _resume_rq_cnt->compare_exchange_strong(tmp_cnt, tmp_cnt-1)){
if(_resume_rq->try_dequeue(_resume_consumer_token, *tid)){
int tmp_cnt = _resume_rq_cnt.load(std::memory_order_relaxed);
if (tmp_cnt > 0 && _resume_rq_cnt.compare_exchange_strong(tmp_cnt, tmp_cnt-1)){
if(_resume_rq.try_dequeue(_resume_consumer_token, *tid)){
return true;
}
else {
(*_resume_rq_cnt) ++;
_resume_rq_cnt++;
}
}
return false;
}

inline bool TaskGroup::push_resume_task(bthread_t tid){
if(_resume_rq->enqueue(tid)){
(*_resume_rq_cnt) ++;
if(_resume_rq.enqueue(tid)){
_resume_rq_cnt++;
return true;
}
return false;
Expand Down

0 comments on commit e5f06dc

Please sign in to comment.