-
Notifications
You must be signed in to change notification settings - Fork 1
/
EventLoop.cc
192 lines (168 loc) · 5.07 KB
/
EventLoop.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#include "EventLoop.h"
#include "Logger.h"
#include "Poller.h"
#include "Channel.h"
#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <memory>
// 防止一个线程创建多个EventLoop thread_local
__thread EventLoop *t_loopInThisThread = nullptr;
// 定义默认的Poller IO复用接口的超时时间
const int kPollTimeMs = 10000;
// 创建wakeupfd,用来notify唤醒subReactor处理新来的channel
int createEventfd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0)
{
LOG_FATAL("eventfd error:%d \n", errno);
}
return evtfd;
}
EventLoop::EventLoop()
: looping_(false)
, quit_(false)
, callingPendingFunctors_(false)
, threadId_(CurrentThread::tid())
, poller_(Poller::newDefaultPoller(this))
, wakeupFd_(createEventfd())
, wakeupChannel_(new Channel(this, wakeupFd_))
{
LOG_DEBUG("EventLoop created %p in thread %d \n", this, threadId_);
if (t_loopInThisThread)
{
LOG_FATAL("Another EventLoop %p exists in this thread %d \n", t_loopInThisThread, threadId_);
}
else
{
t_loopInThisThread = this;
}
// 设置wakeupfd的事件类型以及发生事件后的回调操作
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
// 每一个eventloop都将监听wakeupchannel的EPOLLIN读事件了
wakeupChannel_->enableReading();
}
EventLoop::~EventLoop()
{
wakeupChannel_->disableAll();
wakeupChannel_->remove();
::close(wakeupFd_);
t_loopInThisThread = nullptr;
}
// 开启事件循环
void EventLoop::loop()
{
looping_ = true;
quit_ = false;
LOG_INFO("EventLoop %p start looping \n", this);
while(!quit_)
{
activeChannels_.clear();
// 监听两类fd 一种是client的fd,一种wakeupfd
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (Channel *channel : activeChannels_)
{
// Poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理相应的事件
channel->handleEvent(pollReturnTime_);
}
// 执行当前EventLoop事件循环需要处理的回调操作
/**
* IO线程 mainLoop accept fd《=channel subloop
* mainLoop 事先注册一个回调cb(需要subloop来执行) wakeup subloop后,执行下面的方法,执行之前mainloop注册的cb操作
*/
doPendingFunctors();
}
LOG_INFO("EventLoop %p stop looping. \n", this);
looping_ = false;
}
// 退出事件循环 1.loop在自己的线程中调用quit 2.在非loop的线程中,调用loop的quit
/**
* mainLoop
*
* no ==================== 生产者-消费者的线程安全的队列
*
* subLoop1 subLoop2 subLoop3
*/
void EventLoop::quit()
{
quit_ = true;
// 如果是在其它线程中,调用的quit 在一个subloop(woker)中,调用了mainLoop(IO)的quit
if (!isInLoopThread())
{
wakeup();
}
}
// 在当前loop中执行cb
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread()) // 在当前的loop线程中,执行cb
{
cb();
}
else // 在非当前loop线程中执行cb , 就需要唤醒loop所在线程,执行cb
{
queueInLoop(cb);
}
}
// 把cb放入队列中,唤醒loop所在的线程,执行cb
void EventLoop::queueInLoop(Functor cb)
{
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}
// 唤醒相应的,需要执行上面回调操作的loop的线程了
// || callingPendingFunctors_的意思是:当前loop正在执行回调,但是loop又有了新的回调
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup(); // 唤醒loop所在线程
}
}
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8", n);
}
}
// 用来唤醒loop所在的线程的 向wakeupfd_写一个数据,wakeupChannel就发生读事件,当前loop线程就会被唤醒
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
}
}
// EventLoop的方法 =》 Poller的方法
void EventLoop::updateChannel(Channel *channel)
{
poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel *channel)
{
poller_->removeChannel(channel);
}
bool EventLoop::hasChannel(Channel *channel)
{
return poller_->hasChannel(channel);
}
void EventLoop::doPendingFunctors() // 执行回调
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for (const Functor &functor : functors)
{
functor(); // 执行当前loop需要执行的回调操作
}
callingPendingFunctors_ = false;
}