From 4c78a4404e05139c3a0d79f2642d6fe65551528e Mon Sep 17 00:00:00 2001 From: dutor <440396+dutor@users.noreply.github.com> Date: Thu, 4 Oct 2018 23:51:16 +0800 Subject: [PATCH] Replace libev with libevent in GenericWorker (#26) Turned on `-Wshadow`. --- CMakeLists.txt | 1 + common/concurrent/test/CMakeLists.txt | 22 ++++--- common/network/NetworkUtils.cpp | 4 +- common/thread/GenericWorker.cpp | 84 ++++++++++++++++----------- common/thread/GenericWorker.h | 48 +++++++-------- common/thread/test/CMakeLists.txt | 2 +- parser/Expressions.cpp | 3 +- parser/Expressions.h | 3 +- parser/test/CMakeLists.txt | 30 ++++++++-- raftex/test/CMakeLists.txt | 2 +- 10 files changed, 126 insertions(+), 73 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8530f1f88e9..ffb807f5557 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -97,6 +97,7 @@ add_compile_options(-Winvalid-pch) add_compile_options(-Wall) add_compile_options(-Werror) add_compile_options(-Wno-error=sign-compare) +add_compile_options(-Wshadow) include_directories(SYSTEM ${VGRAPH_HOME}/third-party/bzip2/_install/include) include_directories(SYSTEM ${VGRAPH_HOME}/third-party/double-conversion/_install/include) diff --git a/common/concurrent/test/CMakeLists.txt b/common/concurrent/test/CMakeLists.txt index 01f5afd7616..dbe2943740e 100644 --- a/common/concurrent/test/CMakeLists.txt +++ b/common/concurrent/test/CMakeLists.txt @@ -1,10 +1,18 @@ add_executable( - concurrent_test - BarrierTest.cpp - LatchTest.cpp - $ - $ - $ + concurrent_test + BarrierTest.cpp + LatchTest.cpp + $ + $ + $ +) +target_link_libraries( + concurrent_test + event + gtest + gtest_main + glog + gflags + pthread ) -target_link_libraries(concurrent_test ev gtest gtest_main pthread) add_test(NAME concurrent_test COMMAND concurrent_test) diff --git a/common/network/NetworkUtils.cpp b/common/network/NetworkUtils.cpp index 704b48a5a9a..d4dfe31b64e 100644 --- a/common/network/NetworkUtils.cpp +++ b/common/network/NetworkUtils.cpp @@ -58,8 +58,8 @@ std::vector NetworkUtils::getLocalIPs(bool ipv6) { struct sockaddr_in *ipv = (struct sockaddr_in*)p->ai_addr; addr = &(ipv->sin_addr); } else { - struct sockaddr_in6 *ipv6 = (struct sockaddr_in6*)p->ai_addr; - addr = (struct in_addr*) &(ipv6->sin6_addr); + struct sockaddr_in6 *ipv6_addr = (struct sockaddr_in6*)p->ai_addr; + addr = (struct in_addr*) &(ipv6_addr->sin6_addr); } inet_ntop(p->ai_family, addr, ipStr, addrStrLen); if (strcmp(ipStr, "127.0.0.1") != 0) { diff --git a/common/thread/GenericWorker.cpp b/common/thread/GenericWorker.cpp index 8aedd8bfdb3..6259e168e3a 100644 --- a/common/thread/GenericWorker.cpp +++ b/common/thread/GenericWorker.cpp @@ -6,11 +6,8 @@ #include "base/Base.h" #include "thread/GenericWorker.h" - -#ifndef EV_MULTIPLICITY -#define EV_MULTIPLICITY 1 -#endif -#include +#include +#include namespace vesoft { namespace thread { @@ -22,31 +19,47 @@ GenericWorker::~GenericWorker() { stop(); wait(); if (notifier_ != nullptr) { + event_free(notifier_); notifier_ = nullptr; } - if (evloop_ != nullptr) { - ev_loop_destroy(evloop_); - evloop_ = nullptr; + if (evbase_ != nullptr) { + event_base_free(evbase_); + evbase_ = nullptr; } } bool GenericWorker::start(std::string name) { - name_ = std::move(name); if (!stopped_.load(std::memory_order_acquire)) { + LOG(WARNING) << "GenericWroker already started"; return false; } - evloop_ = ev_loop_new(0); - ev_set_userdata(evloop_, this); + name_ = std::move(name); - auto cb = [] (struct ev_loop *loop, ev_async *, int) { - reinterpret_cast(ev_userdata(loop))->onNotify(); + // Create an event base + evbase_ = event_base_new(); + DCHECK(evbase_ != nullptr); + + // Create an eventfd for async notification + evfd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (evfd_ == -1) { + LOG(ERROR) << "Create eventfd failed: " << ::strerror(errno); + return false; + } + auto cb = [] (int fd, int16_t, void *arg) { + auto val = 0UL; + auto len = ::read(fd, &val, sizeof(val)); + DCHECK(len == sizeof(val)); + reinterpret_cast(arg)->onNotify(); }; - notifier_ = std::make_unique(); - ev_async_init(notifier_.get(), cb); - ev_async_start(evloop_, notifier_.get()); + auto events = EV_READ | EV_PERSIST; + notifier_ = event_new(evbase_, evfd_, events, cb, this); + DCHECK(notifier_ != nullptr); + event_add(notifier_, nullptr); + // Launch a new thread to run the event loop thread_ = std::make_unique(name_, &GenericWorker::loop, this); + // Mark this worker as started stopped_.store(false, std::memory_order_release); return true; @@ -71,19 +84,22 @@ bool GenericWorker::wait() { } void GenericWorker::loop() { - ev_run(evloop_, 0); + event_base_dispatch(evbase_); } void GenericWorker::notify() { if (notifier_ == nullptr) { return; } - ev_async_send(evloop_, notifier_.get()); + DCHECK(evfd_ != -1); + auto one = 1UL; + auto len = ::write(evfd_, &one, sizeof(one)); + DCHECK(len == sizeof(one)); } void GenericWorker::onNotify() { if (stopped_.load(std::memory_order_acquire)) { - ev_break(evloop_, EVBREAK_ALL); + event_base_loopexit(evbase_, NULL); // Even been broken, we still fall through to finish the current loop. } { @@ -102,24 +118,23 @@ void GenericWorker::onNotify() { std::lock_guard guard(lock_); newcomings.swap(pendingTimers_); } - auto cb = [] (struct ev_loop *loop, ev_timer *w, int) { - auto timer = reinterpret_cast(w->data); - auto worker = reinterpret_cast(ev_userdata(loop)); + auto cb = [] (int fd, int16_t, void *arg) { + auto timer = reinterpret_cast(arg); + auto worker = timer->owner_; timer->callback_(); - if (timer->intervalSec_ == 0.0) { + if (timer->intervalMSec_ == 0.0) { worker->purgeTimerInternal(timer->id_); - } else { - w->repeat = timer->intervalSec_; - ev_timer_again(loop, w); } }; for (auto &timer : newcomings) { - timer->ev_ = std::make_unique(); - auto delay = timer->delaySec_; - auto interval = timer->intervalSec_; - ev_timer_init(timer->ev_.get(), cb, delay, interval); - timer->ev_->data = timer.get(); - ev_timer_start(evloop_, timer->ev_.get()); + timer->ev_ = event_new(evbase_, -1, EV_PERSIST, cb, timer.get()); + + auto delay = timer->delayMSec_; + struct timeval tv; + tv.tv_sec = delay / 1000; + tv.tv_usec = delay % 1000 * 1000; + evtimer_add(timer->ev_, &tv); + auto id = timer->id_; activeTimers_[id] = std::move(timer); } @@ -141,6 +156,9 @@ GenericWorker::Timer::Timer(std::function cb) { } GenericWorker::Timer::~Timer() { + if (ev_ != nullptr) { + event_free(ev_); + } } void GenericWorker::purgeTimerTask(uint64_t id) { @@ -154,7 +172,7 @@ void GenericWorker::purgeTimerTask(uint64_t id) { void GenericWorker::purgeTimerInternal(uint64_t id) { auto iter = activeTimers_.find(id); if (iter != activeTimers_.end()) { - ev_timer_stop(evloop_, iter->second->ev_.get()); + evtimer_del(iter->second->ev_); activeTimers_.erase(iter); } } diff --git a/common/thread/GenericWorker.h b/common/thread/GenericWorker.h index b7cba4507ea..8c0a498fc5f 100644 --- a/common/thread/GenericWorker.h +++ b/common/thread/GenericWorker.h @@ -21,9 +21,8 @@ * but not for the performance critical situation. */ -struct ev_loop; -struct ev_timer; -struct ev_async; +struct event; +struct event_base; namespace vesoft { namespace thread { @@ -113,11 +112,12 @@ class GenericWorker final : public vesoft::cpp::NonCopyable, public vesoft::cpp: struct Timer { explicit Timer(std::function cb); ~Timer(); - uint64_t id_; - double delaySec_; - double intervalSec_; - std::function callback_; - std::unique_ptr ev_; + uint64_t id_; + uint64_t delayMSec_; + uint64_t intervalMSec_; + std::function callback_; + struct event *ev_{nullptr}; + GenericWorker *owner_{nullptr}; }; private: @@ -130,20 +130,21 @@ class GenericWorker final : public vesoft::cpp::NonCopyable, public vesoft::cpp: } private: - static constexpr uint64_t TIMER_ID_BITS = 6 * 8; - static constexpr uint64_t TIMER_ID_MASK = ((~0x0UL) >> (64 - TIMER_ID_BITS)); - std::string name_; - std::atomic stopped_{true}; - volatile uint64_t nextTimerId_{0}; - std::unique_ptr notifier_; - struct ev_loop *evloop_ = nullptr; - std::mutex lock_; - std::vector> pendingTasks_; + static constexpr uint64_t TIMER_ID_BITS = 6 * 8; + static constexpr uint64_t TIMER_ID_MASK = ((~0x0UL) >> (64 - TIMER_ID_BITS)); + std::string name_; + std::atomic stopped_{true}; + volatile uint64_t nextTimerId_{0}; + struct event_base *evbase_ = nullptr; + int evfd_ = -1; + struct event *notifier_ = nullptr; + std::mutex lock_; + std::vector> pendingTasks_; using TimerPtr = std::unique_ptr; - std::vector pendingTimers_; - std::vector purgingingTimers_; - std::unordered_map activeTimers_; - std::unique_ptr thread_; + std::vector pendingTimers_; + std::vector purgingingTimers_; + std::unordered_map activeTimers_; + std::unique_ptr thread_; }; template @@ -176,8 +177,9 @@ uint64_t GenericWorker::addRepeatTask(size_t ms, F &&f, Args &&...args) { template uint64_t GenericWorker::addTimerTask(size_t delay, size_t interval, F &&f, Args &&...args) { auto timer = std::make_unique(std::bind(f, args...)); - timer->delaySec_ = delay / 1000.; - timer->intervalSec_ = interval / 1000.; + timer->delayMSec_ = delay; + timer->intervalMSec_ = interval; + timer->owner_ = this; auto id = 0UL; { std::lock_guard guard(lock_); diff --git a/common/thread/test/CMakeLists.txt b/common/thread/test/CMakeLists.txt index 8f349a2942a..2e7719fe87d 100644 --- a/common/thread/test/CMakeLists.txt +++ b/common/thread/test/CMakeLists.txt @@ -7,5 +7,5 @@ add_executable( $ $ ) -target_link_libraries(thread_test ev gtest gtest_main pthread) +target_link_libraries(thread_test event gtest gtest_main glog gflags pthread) add_test(NAME thread_test COMMAND thread_test) diff --git a/parser/Expressions.cpp b/parser/Expressions.cpp index 70c826e12bf..10e16c01708 100644 --- a/parser/Expressions.cpp +++ b/parser/Expressions.cpp @@ -238,8 +238,9 @@ Expression::ReturnType ArithmeticExpression::eval() const { } return asInt(left) % asInt(right); default: - assert(false); + DCHECK(false); } + return false; } std::string RelationalExpression::toString() const { diff --git a/parser/Expressions.h b/parser/Expressions.h index 3601731703c..b91d3c1d789 100644 --- a/parser/Expressions.h +++ b/parser/Expressions.h @@ -59,8 +59,9 @@ class Expression { case 4: return asString(value).empty(); default: - assert(false); + DCHECK(false); } + return false; } static const std::string& asString(const ReturnType &value) { diff --git a/parser/test/CMakeLists.txt b/parser/test/CMakeLists.txt index 829712ef78d..917569a6679 100644 --- a/parser/test/CMakeLists.txt +++ b/parser/test/CMakeLists.txt @@ -1,9 +1,31 @@ -add_executable(parser_test ParserTest.cpp $) -target_link_libraries(parser_test gtest gtest_main pthread) +add_executable( + parser_test + ParserTest.cpp + $ +) +target_link_libraries( + parser_test + gtest + gtest_main + glog + gflags + pthread +) target_include_directories(parser_test SYSTEM BEFORE PUBLIC ${FLEX_INCLUDE_DIRS}) add_test(NAME parser_test COMMAND parser_test) -add_executable(scanner_test ScannerTest.cpp $) -target_link_libraries(scanner_test gtest gtest_main pthread) +add_executable( + scanner_test + ScannerTest.cpp + $ +) +target_link_libraries( + scanner_test + gtest + gtest_main + glog + gflags + pthread +) target_include_directories(scanner_test SYSTEM BEFORE PUBLIC ${FLEX_INCLUDE_DIRS}) add_test(NAME scanner_test COMMAND scanner_test) diff --git a/raftex/test/CMakeLists.txt b/raftex/test/CMakeLists.txt index 599a5cd9287..5a166f76a53 100644 --- a/raftex/test/CMakeLists.txt +++ b/raftex/test/CMakeLists.txt @@ -16,7 +16,7 @@ target_link_libraries( event double-conversion dl - ev + event -pthread ) add_test(NAME file_based_wal_test COMMAND file_based_wal_test)