Skip to content

Commit ca755a0

Browse files
committed
feature: only one event loop for all TcpTransport.
1 parent c5ef32e commit ca755a0

File tree

4 files changed

+245
-140
lines changed

4 files changed

+245
-140
lines changed

src/transport/EventLoop.cpp

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#include "EventLoop.h"
18+
#include <event2/thread.h>
19+
20+
using namespace rocketmq;
21+
22+
EventLoop *EventLoop::GetDefaultEventLoop() {
23+
static EventLoop defaultEventLoop;
24+
return &defaultEventLoop;
25+
}
26+
27+
EventLoop::EventLoop(const struct event_config *config, bool run_immediately)
28+
: m_eventBase(nullptr), m_loopThread(nullptr) {
29+
30+
#ifdef WIN32
31+
evthread_use_windows_threads();
32+
#else
33+
evthread_use_pthreads();
34+
#endif
35+
36+
if (config == nullptr) {
37+
m_eventBase = event_base_new();
38+
} else {
39+
m_eventBase = event_base_new_with_config(config);
40+
}
41+
42+
if (m_eventBase == nullptr) {
43+
// failure...
44+
}
45+
46+
evthread_make_base_notifiable(m_eventBase);
47+
48+
if (run_immediately) {
49+
start();
50+
}
51+
}
52+
53+
EventLoop::~EventLoop() {
54+
stop();
55+
56+
if (m_eventBase != nullptr) {
57+
event_base_free(m_eventBase);
58+
m_eventBase = nullptr;
59+
}
60+
}
61+
62+
void EventLoop::start() {
63+
if (m_loopThread == nullptr) {
64+
// start event loop
65+
m_loopThread = new boost::thread(boost::bind(&EventLoop::runLoop, this));
66+
}
67+
}
68+
69+
void EventLoop::stop() {
70+
if (m_loopThread != nullptr) {
71+
m_loopThread->interrupt();
72+
m_loopThread->join();
73+
74+
delete m_loopThread;
75+
m_loopThread = nullptr;
76+
}
77+
}
78+
79+
void EventLoop::runLoop() {
80+
for (;;) {
81+
int ret;
82+
83+
{
84+
boost::this_thread::disable_interruption di;
85+
ret = event_base_loop(m_eventBase, EVLOOP_NONBLOCK);
86+
}
87+
88+
if (ret == 1) {
89+
// no event
90+
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
91+
}
92+
}
93+
}
94+
95+
BufferEvent *EventLoop::createBufferEvent(socket_t fd, int options) {
96+
struct bufferevent *event = bufferevent_socket_new(m_eventBase, fd, options);
97+
if (event == nullptr) {
98+
return nullptr;
99+
}
100+
101+
return new BufferEvent(event);
102+
}
103+
104+
BufferEvent::~BufferEvent() {
105+
if (m_bufferEvent != nullptr) {
106+
bufferevent_free(m_bufferEvent);
107+
m_bufferEvent = nullptr;
108+
}
109+
}

src/transport/EventLoop.h

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#ifndef __EVENTLOOP_H__
18+
#define __EVENTLOOP_H__
19+
20+
#include <boost/noncopyable.hpp>
21+
#include <boost/thread/thread.hpp>
22+
23+
#include <event2/buffer.h>
24+
#include <event2/event.h>
25+
#include <event2/bufferevent.h>
26+
27+
#define socket_t evutil_socket_t
28+
29+
namespace rocketmq {
30+
31+
class BufferEvent;
32+
33+
class EventLoop : public boost::noncopyable {
34+
public:
35+
static EventLoop *GetDefaultEventLoop();
36+
37+
public:
38+
explicit EventLoop(const struct event_config *config = nullptr, bool run_immediately = true);
39+
virtual ~EventLoop();
40+
41+
void start();
42+
void stop();
43+
44+
BufferEvent *createBufferEvent(socket_t fd, int options);
45+
46+
private:
47+
void runLoop();
48+
49+
private:
50+
struct event_base *m_eventBase;
51+
boost::thread *m_loopThread;
52+
};
53+
54+
class BufferEvent : public boost::noncopyable {
55+
public:
56+
virtual ~BufferEvent();
57+
58+
void setCallback(bufferevent_data_cb readcb, bufferevent_data_cb writecb,
59+
bufferevent_event_cb eventcb, void *cbarg) {
60+
// TODO: wrap callback
61+
bufferevent_setcb(m_bufferEvent, readcb, writecb, eventcb, cbarg);
62+
}
63+
64+
void setWatermark(short events, size_t lowmark, size_t highmark) {
65+
bufferevent_setwatermark(m_bufferEvent, events, lowmark, highmark);
66+
}
67+
68+
int enable(short event) {
69+
return bufferevent_enable(m_bufferEvent, event);
70+
}
71+
72+
int connect(const struct sockaddr *addr, int socklen) {
73+
return bufferevent_socket_connect(m_bufferEvent, (struct sockaddr*) addr, socklen);
74+
}
75+
76+
int write(const void *data, size_t size) {
77+
return bufferevent_write(m_bufferEvent, data, size);
78+
}
79+
80+
socket_t getfd() {
81+
return bufferevent_getfd(m_bufferEvent);
82+
}
83+
84+
private:
85+
explicit BufferEvent(struct bufferevent *event) : m_bufferEvent(event) {};
86+
friend EventLoop;
87+
88+
private:
89+
struct bufferevent *m_bufferEvent;
90+
};
91+
92+
}
93+
94+
#endif //__EVENTLOOP_H__

0 commit comments

Comments
 (0)