-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocesser.cpp
120 lines (100 loc) · 3.17 KB
/
processer.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#include "processer.h"
#include "scheduler.h"
#include "error.h"
#include <assert.h>
namespace co {
atomic_t<uint32_t> Processer::s_id_{0};
Processer::Processer()
: id_(++s_id_)
{
runnable_list_.check_ = (void*)&s_id_;
}
void Processer::AddTaskRunnable(Task *tk)
{
DebugPrint(dbg_scheduler, "task(%s) add into proc(%u)", tk->DebugInfo(), id_);
tk->state_ = TaskState::runnable;
runnable_list_.push(tk);
}
uint32_t Processer::Run(uint32_t &done_count)
{
ContextScopedGuard guard;
(void)guard;
done_count = 0;
uint32_t c = 0;
DebugPrint(dbg_scheduler, "Run [Proc(%d) do_count:%u] --------------------------",
id_, (uint32_t)runnable_list_.size());
for (;;)
{
if (c >= runnable_list_.size()) break;
Task *tk = runnable_list_.pop();
if (!tk) break;
++c;
current_task_ = tk;
DebugPrint(dbg_switch, "enter task(%s)", tk->DebugInfo());
if (!tk->SwapIn()) {
fprintf(stderr, "swapcontext error:%s\n", strerror(errno));
current_task_ = nullptr;
runnable_list_.erase(tk);
tk->DecrementRef();
ThrowError(eCoErrorCode::ec_swapcontext_failed);
}
DebugPrint(dbg_switch, "leave task(%s) state=%d", tk->DebugInfo(), (int)tk->state_);
current_task_ = nullptr;
switch (tk->state_) {
case TaskState::runnable:
runnable_list_.push(tk);
break;
case TaskState::io_block:
g_Scheduler.io_wait_.SchedulerSwitch(tk);
break;
case TaskState::sleep:
g_Scheduler.sleep_wait_.SchedulerSwitch(tk);
break;
case TaskState::sys_block:
assert(tk->block_);
if (!tk->block_->AddWaitTask(tk))
AddTaskRunnable(tk);
break;
case TaskState::done:
default:
++done_count;
DebugPrint(dbg_task, "task(%s) done.", tk->DebugInfo());
if (tk->eptr_) {
std::exception_ptr ep = tk->eptr_;
tk->DecrementRef();
std::rethrow_exception(ep);
} else
tk->DecrementRef();
break;
}
}
return c;
}
void Processer::CoYield()
{
Task *tk = GetCurrentTask();
assert(tk);
tk->proc_ = this;
DebugPrint(dbg_yield, "yield task(%s) state=%d", tk->DebugInfo(), (int)tk->state_);
++tk->yield_count_;
if (!tk->SwapOut()) {
fprintf(stderr, "swapcontext error:%s\n", strerror(errno));
ThrowError(eCoErrorCode::ec_yield_failed);
}
}
Task* Processer::GetCurrentTask()
{
return current_task_;
}
std::size_t Processer::StealHalf(Processer & other)
{
std::size_t runnable_task_count = runnable_list_.size();
SList<Task> tasks = runnable_list_.pop_back((runnable_task_count + 1) / 2);
std::size_t c = tasks.size();
DebugPrint(dbg_scheduler, "proc[%u] steal proc[%u] work returns %d.",
other.id_, id_, (int)c);
if (!c) return 0;
other.runnable_list_.push(std::move(tasks));
return c;
}
} //namespace co