Skip to content

Commit

Permalink
Replace libev with libevent in GenericWorker (vesoft-inc#26)
Browse files Browse the repository at this point in the history
Turned on `-Wshadow`.
  • Loading branch information
dutor authored Oct 4, 2018
1 parent d9d0970 commit 4c78a44
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 73 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ add_compile_options(-Winvalid-pch)
add_compile_options(-Wall)
add_compile_options(-Werror)
add_compile_options(-Wno-error=sign-compare)
add_compile_options(-Wshadow)

include_directories(SYSTEM ${VGRAPH_HOME}/third-party/bzip2/_install/include)
include_directories(SYSTEM ${VGRAPH_HOME}/third-party/double-conversion/_install/include)
Expand Down
22 changes: 15 additions & 7 deletions common/concurrent/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
add_executable(
concurrent_test
BarrierTest.cpp
LatchTest.cpp
$<TARGET_OBJECTS:concurrent_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:time_obj>
concurrent_test
BarrierTest.cpp
LatchTest.cpp
$<TARGET_OBJECTS:concurrent_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:time_obj>
)
target_link_libraries(
concurrent_test
event
gtest
gtest_main
glog
gflags
pthread
)
target_link_libraries(concurrent_test ev gtest gtest_main pthread)
add_test(NAME concurrent_test COMMAND concurrent_test)
4 changes: 2 additions & 2 deletions common/network/NetworkUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ std::vector<std::string> NetworkUtils::getLocalIPs(bool ipv6) {
struct sockaddr_in *ipv = (struct sockaddr_in*)p->ai_addr;
addr = &(ipv->sin_addr);
} else {
struct sockaddr_in6 *ipv6 = (struct sockaddr_in6*)p->ai_addr;
addr = (struct in_addr*) &(ipv6->sin6_addr);
struct sockaddr_in6 *ipv6_addr = (struct sockaddr_in6*)p->ai_addr;
addr = (struct in_addr*) &(ipv6_addr->sin6_addr);
}
inet_ntop(p->ai_family, addr, ipStr, addrStrLen);
if (strcmp(ipStr, "127.0.0.1") != 0) {
Expand Down
84 changes: 51 additions & 33 deletions common/thread/GenericWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@

#include "base/Base.h"
#include "thread/GenericWorker.h"

#ifndef EV_MULTIPLICITY
#define EV_MULTIPLICITY 1
#endif
#include <ev.h>
#include <sys/eventfd.h>
#include <event2/event.h>

namespace vesoft {
namespace thread {
Expand All @@ -22,31 +19,47 @@ GenericWorker::~GenericWorker() {
stop();
wait();
if (notifier_ != nullptr) {
event_free(notifier_);
notifier_ = nullptr;
}
if (evloop_ != nullptr) {
ev_loop_destroy(evloop_);
evloop_ = nullptr;
if (evbase_ != nullptr) {
event_base_free(evbase_);
evbase_ = nullptr;
}
}

bool GenericWorker::start(std::string name) {
name_ = std::move(name);
if (!stopped_.load(std::memory_order_acquire)) {
LOG(WARNING) << "GenericWroker already started";
return false;
}
evloop_ = ev_loop_new(0);
ev_set_userdata(evloop_, this);
name_ = std::move(name);

auto cb = [] (struct ev_loop *loop, ev_async *, int) {
reinterpret_cast<GenericWorker*>(ev_userdata(loop))->onNotify();
// Create an event base
evbase_ = event_base_new();
DCHECK(evbase_ != nullptr);

// Create an eventfd for async notification
evfd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evfd_ == -1) {
LOG(ERROR) << "Create eventfd failed: " << ::strerror(errno);
return false;
}
auto cb = [] (int fd, int16_t, void *arg) {
auto val = 0UL;
auto len = ::read(fd, &val, sizeof(val));
DCHECK(len == sizeof(val));
reinterpret_cast<GenericWorker*>(arg)->onNotify();
};
notifier_ = std::make_unique<ev_async>();
ev_async_init(notifier_.get(), cb);
ev_async_start(evloop_, notifier_.get());
auto events = EV_READ | EV_PERSIST;
notifier_ = event_new(evbase_, evfd_, events, cb, this);
DCHECK(notifier_ != nullptr);
event_add(notifier_, nullptr);

// Launch a new thread to run the event loop
thread_ = std::make_unique<NamedThread>(name_, &GenericWorker::loop, this);

// Mark this worker as started
stopped_.store(false, std::memory_order_release);

return true;
Expand All @@ -71,19 +84,22 @@ bool GenericWorker::wait() {
}

void GenericWorker::loop() {
ev_run(evloop_, 0);
event_base_dispatch(evbase_);
}

void GenericWorker::notify() {
if (notifier_ == nullptr) {
return;
}
ev_async_send(evloop_, notifier_.get());
DCHECK(evfd_ != -1);
auto one = 1UL;
auto len = ::write(evfd_, &one, sizeof(one));
DCHECK(len == sizeof(one));
}

void GenericWorker::onNotify() {
if (stopped_.load(std::memory_order_acquire)) {
ev_break(evloop_, EVBREAK_ALL);
event_base_loopexit(evbase_, NULL);
// Even been broken, we still fall through to finish the current loop.
}
{
Expand All @@ -102,24 +118,23 @@ void GenericWorker::onNotify() {
std::lock_guard<std::mutex> guard(lock_);
newcomings.swap(pendingTimers_);
}
auto cb = [] (struct ev_loop *loop, ev_timer *w, int) {
auto timer = reinterpret_cast<Timer*>(w->data);
auto worker = reinterpret_cast<GenericWorker*>(ev_userdata(loop));
auto cb = [] (int fd, int16_t, void *arg) {
auto timer = reinterpret_cast<Timer*>(arg);
auto worker = timer->owner_;
timer->callback_();
if (timer->intervalSec_ == 0.0) {
if (timer->intervalMSec_ == 0.0) {
worker->purgeTimerInternal(timer->id_);
} else {
w->repeat = timer->intervalSec_;
ev_timer_again(loop, w);
}
};
for (auto &timer : newcomings) {
timer->ev_ = std::make_unique<ev_timer>();
auto delay = timer->delaySec_;
auto interval = timer->intervalSec_;
ev_timer_init(timer->ev_.get(), cb, delay, interval);
timer->ev_->data = timer.get();
ev_timer_start(evloop_, timer->ev_.get());
timer->ev_ = event_new(evbase_, -1, EV_PERSIST, cb, timer.get());

auto delay = timer->delayMSec_;
struct timeval tv;
tv.tv_sec = delay / 1000;
tv.tv_usec = delay % 1000 * 1000;
evtimer_add(timer->ev_, &tv);

auto id = timer->id_;
activeTimers_[id] = std::move(timer);
}
Expand All @@ -141,6 +156,9 @@ GenericWorker::Timer::Timer(std::function<void(void)> cb) {
}

GenericWorker::Timer::~Timer() {
if (ev_ != nullptr) {
event_free(ev_);
}
}

void GenericWorker::purgeTimerTask(uint64_t id) {
Expand All @@ -154,7 +172,7 @@ void GenericWorker::purgeTimerTask(uint64_t id) {
void GenericWorker::purgeTimerInternal(uint64_t id) {
auto iter = activeTimers_.find(id);
if (iter != activeTimers_.end()) {
ev_timer_stop(evloop_, iter->second->ev_.get());
evtimer_del(iter->second->ev_);
activeTimers_.erase(iter);
}
}
Expand Down
48 changes: 25 additions & 23 deletions common/thread/GenericWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
* but not for the performance critical situation.
*/

struct ev_loop;
struct ev_timer;
struct ev_async;
struct event;
struct event_base;

namespace vesoft {
namespace thread {
Expand Down Expand Up @@ -113,11 +112,12 @@ class GenericWorker final : public vesoft::cpp::NonCopyable, public vesoft::cpp:
struct Timer {
explicit Timer(std::function<void(void)> cb);
~Timer();
uint64_t id_;
double delaySec_;
double intervalSec_;
std::function<void(void)> callback_;
std::unique_ptr<struct ev_timer> ev_;
uint64_t id_;
uint64_t delayMSec_;
uint64_t intervalMSec_;
std::function<void(void)> callback_;
struct event *ev_{nullptr};
GenericWorker *owner_{nullptr};
};

private:
Expand All @@ -130,20 +130,21 @@ class GenericWorker final : public vesoft::cpp::NonCopyable, public vesoft::cpp:
}

private:
static constexpr uint64_t TIMER_ID_BITS = 6 * 8;
static constexpr uint64_t TIMER_ID_MASK = ((~0x0UL) >> (64 - TIMER_ID_BITS));
std::string name_;
std::atomic<bool> stopped_{true};
volatile uint64_t nextTimerId_{0};
std::unique_ptr<ev_async> notifier_;
struct ev_loop *evloop_ = nullptr;
std::mutex lock_;
std::vector<std::function<void()>> pendingTasks_;
static constexpr uint64_t TIMER_ID_BITS = 6 * 8;
static constexpr uint64_t TIMER_ID_MASK = ((~0x0UL) >> (64 - TIMER_ID_BITS));
std::string name_;
std::atomic<bool> stopped_{true};
volatile uint64_t nextTimerId_{0};
struct event_base *evbase_ = nullptr;
int evfd_ = -1;
struct event *notifier_ = nullptr;
std::mutex lock_;
std::vector<std::function<void()>> pendingTasks_;
using TimerPtr = std::unique_ptr<Timer>;
std::vector<TimerPtr> pendingTimers_;
std::vector<uint64_t> purgingingTimers_;
std::unordered_map<uint64_t, TimerPtr> activeTimers_;
std::unique_ptr<NamedThread> thread_;
std::vector<TimerPtr> pendingTimers_;
std::vector<uint64_t> purgingingTimers_;
std::unordered_map<uint64_t, TimerPtr> activeTimers_;
std::unique_ptr<NamedThread> thread_;
};

template <typename F, typename...Args>
Expand Down Expand Up @@ -176,8 +177,9 @@ uint64_t GenericWorker::addRepeatTask(size_t ms, F &&f, Args &&...args) {
template <typename F, typename...Args>
uint64_t GenericWorker::addTimerTask(size_t delay, size_t interval, F &&f, Args &&...args) {
auto timer = std::make_unique<Timer>(std::bind(f, args...));
timer->delaySec_ = delay / 1000.;
timer->intervalSec_ = interval / 1000.;
timer->delayMSec_ = delay;
timer->intervalMSec_ = interval;
timer->owner_ = this;
auto id = 0UL;
{
std::lock_guard<std::mutex> guard(lock_);
Expand Down
2 changes: 1 addition & 1 deletion common/thread/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ add_executable(
$<TARGET_OBJECTS:concurrent_obj>
$<TARGET_OBJECTS:time_obj>
)
target_link_libraries(thread_test ev gtest gtest_main pthread)
target_link_libraries(thread_test event gtest gtest_main glog gflags pthread)
add_test(NAME thread_test COMMAND thread_test)
3 changes: 2 additions & 1 deletion parser/Expressions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,9 @@ Expression::ReturnType ArithmeticExpression::eval() const {
}
return asInt(left) % asInt(right);
default:
assert(false);
DCHECK(false);
}
return false;
}

std::string RelationalExpression::toString() const {
Expand Down
3 changes: 2 additions & 1 deletion parser/Expressions.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ class Expression {
case 4:
return asString(value).empty();
default:
assert(false);
DCHECK(false);
}
return false;
}

static const std::string& asString(const ReturnType &value) {
Expand Down
30 changes: 26 additions & 4 deletions parser/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,31 @@
add_executable(parser_test ParserTest.cpp $<TARGET_OBJECTS:parser_obj>)
target_link_libraries(parser_test gtest gtest_main pthread)
add_executable(
parser_test
ParserTest.cpp
$<TARGET_OBJECTS:parser_obj>
)
target_link_libraries(
parser_test
gtest
gtest_main
glog
gflags
pthread
)
target_include_directories(parser_test SYSTEM BEFORE PUBLIC ${FLEX_INCLUDE_DIRS})
add_test(NAME parser_test COMMAND parser_test)

add_executable(scanner_test ScannerTest.cpp $<TARGET_OBJECTS:parser_obj>)
target_link_libraries(scanner_test gtest gtest_main pthread)
add_executable(
scanner_test
ScannerTest.cpp
$<TARGET_OBJECTS:parser_obj>
)
target_link_libraries(
scanner_test
gtest
gtest_main
glog
gflags
pthread
)
target_include_directories(scanner_test SYSTEM BEFORE PUBLIC ${FLEX_INCLUDE_DIRS})
add_test(NAME scanner_test COMMAND scanner_test)
2 changes: 1 addition & 1 deletion raftex/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ target_link_libraries(
event
double-conversion
dl
ev
event
-pthread
)
add_test(NAME file_based_wal_test COMMAND file_based_wal_test)
Expand Down

0 comments on commit 4c78a44

Please sign in to comment.