Skip to content

Commit ec579ce

Browse files
committed
feature: only one event loop for all TcpTransport.
1 parent c5ef32e commit ec579ce

File tree

6 files changed

+1190
-1089
lines changed

6 files changed

+1190
-1089
lines changed

src/transport/EventLoop.cpp

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#include "EventLoop.h"
18+
#include <event2/thread.h>
19+
20+
#include "Logging.h"
21+
22+
using namespace rocketmq;
23+
24+
EventLoop *EventLoop::GetDefaultEventLoop() {
25+
static EventLoop defaultEventLoop;
26+
return &defaultEventLoop;
27+
}
28+
29+
EventLoop::EventLoop(const struct event_config *config, bool run_immediately)
30+
: m_eventBase(nullptr), m_loopThread(nullptr) {
31+
32+
// tell libevent support multi-threads
33+
#ifdef WIN32
34+
evthread_use_windows_threads();
35+
#else
36+
evthread_use_pthreads();
37+
#endif
38+
39+
if (config == nullptr) {
40+
m_eventBase = event_base_new();
41+
} else {
42+
m_eventBase = event_base_new_with_config(config);
43+
}
44+
45+
if (m_eventBase == nullptr) {
46+
// failure...
47+
LOG_ERROR("create event base failed!");
48+
return;
49+
}
50+
51+
evthread_make_base_notifiable(m_eventBase);
52+
53+
if (run_immediately) {
54+
start();
55+
}
56+
}
57+
58+
EventLoop::~EventLoop() {
59+
stop();
60+
61+
if (m_eventBase != nullptr) {
62+
event_base_free(m_eventBase);
63+
m_eventBase = nullptr;
64+
}
65+
}
66+
67+
void EventLoop::start() {
68+
if (m_loopThread == nullptr) {
69+
// start event loop
70+
m_loopThread = new boost::thread(boost::bind(&EventLoop::runLoop, this));
71+
}
72+
}
73+
74+
void EventLoop::stop() {
75+
if (m_loopThread != nullptr) {
76+
m_loopThread->interrupt();
77+
m_loopThread->join();
78+
79+
delete m_loopThread;
80+
m_loopThread = nullptr;
81+
}
82+
}
83+
84+
void EventLoop::runLoop() {
85+
for (;;) {
86+
int ret;
87+
88+
{
89+
boost::this_thread::disable_interruption di;
90+
ret = event_base_loop(m_eventBase, EVLOOP_NONBLOCK);
91+
}
92+
93+
if (ret == 1) {
94+
// no event
95+
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
96+
}
97+
}
98+
}
99+
100+
#define OPT_UNLOCK_CALLBACKS (BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS)
101+
102+
BufferEvent *EventLoop::createBufferEvent(socket_t fd, int options) {
103+
struct bufferevent *event = bufferevent_socket_new(m_eventBase, fd, options);
104+
if (event == nullptr) {
105+
return nullptr;
106+
}
107+
108+
bool unlock = (options & OPT_UNLOCK_CALLBACKS) == OPT_UNLOCK_CALLBACKS;
109+
110+
return new BufferEvent(event, unlock);
111+
}
112+
113+
BufferEvent::BufferEvent(struct bufferevent *event, bool unlockCallbacks)
114+
: m_bufferEvent(event),
115+
m_unlockCallbacks(unlockCallbacks),
116+
m_readCallback(nullptr),
117+
m_writeCallback(nullptr),
118+
m_eventCallback(nullptr),
119+
m_callbackTransport() {
120+
121+
if (m_bufferEvent != nullptr) {
122+
bufferevent_setcb(m_bufferEvent, read_callback, write_callback, event_callback, this);
123+
}
124+
}
125+
126+
BufferEvent::~BufferEvent() {
127+
if (m_bufferEvent != nullptr) {
128+
// free function will set all callbacks to NULL first.
129+
bufferevent_free(m_bufferEvent);
130+
m_bufferEvent = nullptr;
131+
}
132+
}
133+
134+
void BufferEvent::setCallback(BufferEventDataCallback readCallback, BufferEventDataCallback writeCallback,
135+
BufferEventEventCallback eventCallback, boost::shared_ptr<TcpTransport> transport) {
136+
137+
// use lock in bufferevent
138+
bufferevent_lock(m_bufferEvent);
139+
140+
// wrap callback
141+
m_readCallback = readCallback;
142+
m_writeCallback = writeCallback;
143+
m_eventCallback = eventCallback;
144+
m_callbackTransport = transport;
145+
146+
bufferevent_unlock(m_bufferEvent);
147+
}
148+
149+
void BufferEvent::read_callback(struct bufferevent *bev, void *ctx) {
150+
auto event = (BufferEvent *) ctx;
151+
152+
if (event->m_unlockCallbacks)
153+
bufferevent_lock(event->m_bufferEvent);
154+
155+
BufferEventDataCallback callback = event->m_readCallback;
156+
boost::shared_ptr<TcpTransport> transport = event->m_callbackTransport.lock();
157+
158+
if (event->m_unlockCallbacks)
159+
bufferevent_unlock(event->m_bufferEvent);
160+
161+
if (callback) {
162+
callback(event, transport.get());
163+
}
164+
}
165+
166+
void BufferEvent::write_callback(struct bufferevent *bev, void *ctx) {
167+
auto event = (BufferEvent *) ctx;
168+
169+
if (event->m_unlockCallbacks)
170+
bufferevent_lock(event->m_bufferEvent);
171+
172+
BufferEventDataCallback callback = event->m_writeCallback;
173+
boost::shared_ptr<TcpTransport> transport = event->m_callbackTransport.lock();
174+
175+
if (event->m_unlockCallbacks)
176+
bufferevent_unlock(event->m_bufferEvent);
177+
178+
if (callback) {
179+
callback(event, transport.get());
180+
}
181+
}
182+
183+
void BufferEvent::event_callback(struct bufferevent *bev, short what, void *ctx) {
184+
auto event = (BufferEvent *) ctx;
185+
186+
if (event->m_unlockCallbacks)
187+
bufferevent_lock(event->m_bufferEvent);
188+
189+
BufferEventEventCallback callback = event->m_eventCallback;
190+
boost::shared_ptr<TcpTransport> transport = event->m_callbackTransport.lock();
191+
192+
if (event->m_unlockCallbacks)
193+
bufferevent_unlock(event->m_bufferEvent);
194+
195+
if (callback) {
196+
callback(event, what, transport.get());
197+
}
198+
}

src/transport/EventLoop.h

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#ifndef __EVENTLOOP_H__
18+
#define __EVENTLOOP_H__
19+
20+
#include <boost/noncopyable.hpp>
21+
#include <boost/thread/thread.hpp>
22+
23+
#include <event2/buffer.h>
24+
#include <event2/event.h>
25+
#include <event2/bufferevent.h>
26+
27+
#define socket_t evutil_socket_t
28+
29+
namespace rocketmq {
30+
31+
class BufferEvent;
32+
33+
class EventLoop : public boost::noncopyable {
34+
public:
35+
static EventLoop *GetDefaultEventLoop();
36+
37+
public:
38+
explicit EventLoop(const struct event_config *config = nullptr, bool run_immediately = true);
39+
virtual ~EventLoop();
40+
41+
void start();
42+
void stop();
43+
44+
BufferEvent *createBufferEvent(socket_t fd, int options);
45+
46+
private:
47+
void runLoop();
48+
49+
private:
50+
struct event_base *m_eventBase;
51+
boost::thread *m_loopThread;
52+
};
53+
54+
class TcpTransport;
55+
56+
typedef void (*BufferEventDataCallback)(BufferEvent *event, TcpTransport *transport);
57+
typedef void (*BufferEventEventCallback)(BufferEvent *event, short what, TcpTransport *transport);
58+
59+
class BufferEvent : public boost::noncopyable {
60+
public:
61+
virtual ~BufferEvent();
62+
63+
void setCallback(BufferEventDataCallback readCallback, BufferEventDataCallback writeCallback,
64+
BufferEventEventCallback eventCallback, boost::shared_ptr<TcpTransport> transport);
65+
66+
void setWatermark(short events, size_t lowmark, size_t highmark) {
67+
bufferevent_setwatermark(m_bufferEvent, events, lowmark, highmark);
68+
}
69+
70+
int enable(short event) {
71+
return bufferevent_enable(m_bufferEvent, event);
72+
}
73+
74+
int connect(const struct sockaddr *addr, int socklen) {
75+
return bufferevent_socket_connect(m_bufferEvent, (struct sockaddr *) addr, socklen);
76+
}
77+
78+
int write(const void *data, size_t size) {
79+
return bufferevent_write(m_bufferEvent, data, size);
80+
}
81+
82+
size_t read(void *data, size_t size) {
83+
return bufferevent_read(m_bufferEvent, data, size);
84+
}
85+
86+
struct evbuffer *getInput() {
87+
return bufferevent_get_input(m_bufferEvent);
88+
}
89+
90+
socket_t getfd() {
91+
return bufferevent_getfd(m_bufferEvent);
92+
}
93+
94+
private:
95+
BufferEvent(struct bufferevent *event, bool unlockCallbacks);
96+
friend EventLoop;
97+
98+
static void read_callback(struct bufferevent *bev, void *ctx);
99+
static void write_callback(struct bufferevent *bev, void *ctx);
100+
static void event_callback(struct bufferevent *bev, short what, void *ctx);
101+
102+
private:
103+
struct bufferevent *m_bufferEvent;
104+
const bool m_unlockCallbacks;
105+
106+
BufferEventDataCallback m_readCallback;
107+
BufferEventDataCallback m_writeCallback;
108+
BufferEventEventCallback m_eventCallback;
109+
boost::weak_ptr<TcpTransport> m_callbackTransport; // avoid reference cycle
110+
};
111+
112+
}
113+
114+
#endif //__EVENTLOOP_H__

0 commit comments

Comments
 (0)