From 7de2d5be770dd11e669b8cef549b7c00210962b5 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Sun, 7 Apr 2024 22:22:22 +0800 Subject: [PATCH] Support IOEvent --- src/brpc/event_dispatcher.cpp | 36 +---- src/brpc/event_dispatcher.h | 175 ++++++++++++++++++++---- src/brpc/event_dispatcher_epoll.cpp | 13 +- src/brpc/event_dispatcher_kqueue.cpp | 10 +- src/brpc/socket.cpp | 54 ++++---- src/brpc/socket.h | 17 +-- src/butil/type_traits.h | 13 +- test/brpc_event_dispatcher_unittest.cpp | 38 ++--- 8 files changed, 214 insertions(+), 142 deletions(-) diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp index 067bfb4da6..53495ea6d7 100644 --- a/src/brpc/event_dispatcher.cpp +++ b/src/brpc/event_dispatcher.cpp @@ -71,11 +71,7 @@ EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag) { return g_edisp[tag * FLAGS_event_dispatcher_num + index]; } -int EventData::OnCreated(const EventDataOptions& options) { - if (options.user_id == INVALID_VREF_ID) { - LOG(ERROR) << "Invalid user_id=-1"; - return -1; - } +int IOEventData::OnCreated(const IOEventDataOptions& options) { if (!options.input_cb) { LOG(ERROR) << "Invalid input_cb=NULL"; return -1; @@ -89,34 +85,8 @@ int EventData::OnCreated(const EventDataOptions& options) { return 0; } -void EventData::BeforeRecycled() { - _options = {INVALID_EVENT_DATA_ID, NULL, NULL}; -} - -void MakeEventDataIdInvalid(EventDataId& id) { - EventData::SetFailedById(id); - id = INVALID_EVENT_DATA_ID; -} - -int EventDispatcher::CallInputEventCallback(EventDataId event_data_id, - uint32_t events, - const bthread_attr_t& thread_attr) { - EventDataUniquePtr data; - if (EventData::Address(event_data_id, &data) != 0) { - return -1; - } - data->CallInputEventCallback(events, thread_attr); - return 0; -} - -int EventDispatcher::CallOutputEventCallback(EventDataId event_data_id, - uint32_t events, - const bthread_attr_t& thread_attr) { - EventDataUniquePtr data; - if (EventData::Address(event_data_id, &data) != 0) { - return -1; - } - return data->CallOutputEventCallback(events, thread_attr); +void IOEventData::BeforeRecycled() { + _options = { NULL, NULL, NULL }; } } // namespace brpc diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h index ad4f13df0c..aa9b4d12a8 100644 --- a/src/brpc/event_dispatcher.h +++ b/src/brpc/event_dispatcher.h @@ -26,65 +26,63 @@ namespace brpc { -// Unique identifier of a EventData. +// Unique identifier of a IOEventData. // Users shall store EventDataId instead of EventData and call EventData::Address() // to convert the identifier to an unique_ptr at each access. Whenever a // unique_ptr is not destructed, the enclosed EventData will not be recycled. -typedef VRefId EventDataId; +typedef VRefId IOEventDataId; -const VRefId INVALID_EVENT_DATA_ID = INVALID_VREF_ID; +const VRefId INVALID_IO_EVENT_DATA_ID = INVALID_VREF_ID; -class EventData; +class IOEventData; -typedef VersionedRefWithIdUniquePtr EventDataUniquePtr; +typedef VersionedRefWithIdUniquePtr EventDataUniquePtr; // User callback type of input event and output event. -typedef int (*InputEventCallback) (VRefId id, uint32_t events, +typedef int (*InputEventCallback) (void* id, uint32_t events, const bthread_attr_t& thread_attr); typedef InputEventCallback OutputEventCallback; -struct EventDataOptions { - // Find user object to handle event by `user_id'. - uint64_t user_id; +struct IOEventDataOptions { // Callback for input event. InputEventCallback input_cb; // Callback for output event. OutputEventCallback output_cb; + // User data. + void* user_data; }; -// EventDispatcher finds EventData by EventDataId which is +// EventDispatcher finds IOEventData by IOEventDataId which is // stored in epoll/kqueue data, and calls its callback, so // EventDispatcher supports various IO types, such as socket, // pipe, eventfd, timerfd, etc. -class EventData : public VersionedRefWithId { +class IOEventData : public VersionedRefWithId { public: - explicit EventData(Forbidden f) - : VersionedRefWithId(f) - , _options{INVALID_EVENT_DATA_ID, NULL, NULL} {} + explicit IOEventData(Forbidden f) + : VersionedRefWithId(f) + , _options{ NULL, NULL, NULL } {} - DISALLOW_COPY_AND_ASSIGN(EventData); + DISALLOW_COPY_AND_ASSIGN(IOEventData); int CallInputEventCallback(uint32_t events, const bthread_attr_t& thread_attr) { - return _options.input_cb(_options.user_id, events, thread_attr); + return _options.input_cb(_options.user_data, events, thread_attr); } int CallOutputEventCallback(uint32_t events, const bthread_attr_t& thread_attr) { - return _options.output_cb(_options.user_id, events, thread_attr); + return _options.output_cb(_options.user_data, events, thread_attr); } private: -friend class VersionedRefWithId; +friend class VersionedRefWithId; - int OnCreated(const EventDataOptions& options); + int OnCreated(const IOEventDataOptions& options); void BeforeRecycled(); - EventDataOptions _options; + IOEventDataOptions _options; }; -void MakeEventDataIdInvalid(EventDataId& id); - namespace rdma { class RdmaEndpoint; } @@ -94,6 +92,7 @@ class RdmaEndpoint; class EventDispatcher { friend class Socket; friend class rdma::RdmaEndpoint; +template friend class IOEvent; public: EventDispatcher(); @@ -120,19 +119,19 @@ friend class rdma::RdmaEndpoint; // When the file descriptor is removed from internal epoll, the Socket // will be dereferenced once additionally. // Returns 0 on success, -1 otherwise. - int AddConsumer(EventDataId event_data_id, int fd); + int AddConsumer(IOEventDataId event_data_id, int fd); // Watch EPOLLOUT event on `fd' into epoll device. If `pollin' is // true, EPOLLIN event will also be included and EPOLL_CTL_MOD will // be used instead of EPOLL_CTL_ADD. When event arrives, // `Socket::HandleEpollOut' will be called with `socket_id' // Returns 0 on success, -1 otherwise and errno is set - int RegisterEvent(EventDataId event_data_id, int fd, bool pollin); + int RegisterEvent(IOEventDataId event_data_id, int fd, bool pollin); // Remove EPOLLOUT event on `fd'. If `pollin' is true, EPOLLIN event // will be kept and EPOLL_CTL_MOD will be used instead of EPOLL_CTL_DEL // Returns 0 on success, -1 otherwise and errno is set - int UnregisterEvent(EventDataId event_data_id, int fd, bool pollin); + int UnregisterEvent(IOEventDataId event_data_id, int fd, bool pollin); private: DISALLOW_COPY_AND_ASSIGN(EventDispatcher); @@ -147,12 +146,29 @@ friend class rdma::RdmaEndpoint; int RemoveConsumer(int fd); // Call user callback of input event and output event. - static int CallInputEventCallback(EventDataId event_data_id, + template + static int OnEvent(IOEventDataId event_data_id, uint32_t events, + const bthread_attr_t& thread_attr) { + EventDataUniquePtr data; + if (IOEventData::Address(event_data_id, &data) != 0) { + return -1; + } + return IsInputEvent ? + data->CallInputEventCallback(events, thread_attr) : + data->CallOutputEventCallback(events, thread_attr); + } + + static int CallInputEventCallback(IOEventDataId event_data_id, uint32_t events, - const bthread_attr_t& thread_attr); - static int CallOutputEventCallback(EventDataId event_data_id, + const bthread_attr_t& thread_attr) { + return OnEvent(event_data_id, events, thread_attr); + } + + static int CallOutputEventCallback(IOEventDataId event_data_id, uint32_t events, - const bthread_attr_t& thread_attr); + const bthread_attr_t& thread_attr) { + return OnEvent(event_data_id, events, thread_attr); + } // The epoll/kqueue fd to watch events. int _event_dispatcher_fd; @@ -172,6 +188,107 @@ friend class rdma::RdmaEndpoint; EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag); +// IOEvent is used to manage the IO events of a file descriptor conveniently. +template +class IOEvent { +public: + IOEvent() + : _init(false) + , _event_data_id(INVALID_IO_EVENT_DATA_ID) + , _bthread_tag(bthread_self_tag()) {} + + int Init(void* user_data) { + if (_init) { + LOG(WARNING) << "IOEvent has been initialized"; + return 0; + } + IOEventDataOptions options{ OnInputEvent, OnOutputEvent, user_data }; + if (IOEventData::Create(&_event_data_id, options) != 0) { + LOG(ERROR) << "Fail to create EventData"; + return -1; + } + _init = true; + return 0; + } + + void Reset() { + if (_init) { + IOEventData::SetFailedById(_event_data_id); + _init = false; + } + } + + int AddConsumer(int fd) { + if (!_init) { + LOG(ERROR) << "IOEvent has not been initialized"; + return -1; + } + return GetGlobalEventDispatcher(fd, _bthread_tag) + .AddConsumer(_event_data_id, fd); + } + + + int RemoveConsumer(int fd) { + if (!_init) { + LOG(ERROR) << "IOEvent has not been initialized"; + return -1; + } + return GetGlobalEventDispatcher(fd, _bthread_tag).RemoveConsumer(fd); + } + + + int RegisterEvent(int fd, bool pollin) { + if (!_init) { + LOG(ERROR) << "IOEvent has not been initialized"; + return -1; + } + return GetGlobalEventDispatcher(fd, _bthread_tag) + .RegisterEvent(_event_data_id, fd, pollin); + } + + int UnregisterEvent(int fd, bool pollin) { + if (!_init) { + LOG(ERROR) << "IOEvent has not been initialized"; + return -1; + } + return GetGlobalEventDispatcher(fd, _bthread_tag) + .UnregisterEvent(_event_data_id, fd, pollin); + } + + void set_bthread_tag(bthread_tag_t bthread_tag) { + _bthread_tag = bthread_tag; + } + bthread_tag_t bthread_tag() const { + return _bthread_tag; + } + +private: + + static int OnInputEvent(void* user_data, uint32_t events, + const bthread_attr_t& thread_attr) { + static_assert( + butil::is_callable_return_int::value, + "T::OnInputEvent signature mismatch"); + return T::OnInputEvent(user_data, events, thread_attr); + } + + static int OnOutputEvent(void* user_data, uint32_t events, + const bthread_attr_t& thread_attr) { + static_assert( + butil::is_callable_return_int::value, + "T::OnInputEvent signature mismatch"); + return T::OnOutputEvent(user_data, events, thread_attr); + } + + bool _init; + IOEventDataId _event_data_id; + bthread_tag_t _bthread_tag; +}; + } // namespace brpc diff --git a/src/brpc/event_dispatcher_epoll.cpp b/src/brpc/event_dispatcher_epoll.cpp index ad5eb6eecb..64717b1623 100644 --- a/src/brpc/event_dispatcher_epoll.cpp +++ b/src/brpc/event_dispatcher_epoll.cpp @@ -69,8 +69,8 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { // Set _thread_attr before creating epoll thread to make sure // everyting seems sane to the thread. - _thread_attr = (consumer_thread_attr ? - *consumer_thread_attr : BTHREAD_ATTR_NORMAL); + _thread_attr = consumer_thread_attr ? + *consumer_thread_attr : BTHREAD_ATTR_NORMAL; //_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread // that created by epoll_wait() never to quit. @@ -81,8 +81,7 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { // when the older comlog (e.g. 3.1.85) calls com_openlog_r(). Since this // is also a potential issue for consumer threads, using the same attr // should be a reasonable solution. - int rc = bthread_start_background( - &_tid, &epoll_thread_attr, RunThis, this); + int rc = bthread_start_background(&_tid, &epoll_thread_attr, RunThis, this); if (rc) { LOG(FATAL) << "Fail to create epoll thread: " << berror(rc); return -1; @@ -110,7 +109,7 @@ void EventDispatcher::Join() { } } -int EventDispatcher::RegisterEvent(EventDataId event_data_id, +int EventDispatcher::RegisterEvent(IOEventDataId event_data_id, int fd, bool pollin) { if (_event_dispatcher_fd < 0) { errno = EINVAL; @@ -138,7 +137,7 @@ int EventDispatcher::RegisterEvent(EventDataId event_data_id, return 0; } -int EventDispatcher::UnregisterEvent(EventDataId event_data_id, +int EventDispatcher::UnregisterEvent(IOEventDataId event_data_id, int fd, bool pollin) { if (pollin) { epoll_event evt; @@ -154,7 +153,7 @@ int EventDispatcher::UnregisterEvent(EventDataId event_data_id, return -1; } -int EventDispatcher::AddConsumer(EventDataId event_data_id, int fd) { +int EventDispatcher::AddConsumer(IOEventDataId event_data_id, int fd) { if (_event_dispatcher_fd < 0) { errno = EINVAL; return -1; diff --git a/src/brpc/event_dispatcher_kqueue.cpp b/src/brpc/event_dispatcher_kqueue.cpp index 7c0d3daeea..97ad29bba6 100644 --- a/src/brpc/event_dispatcher_kqueue.cpp +++ b/src/brpc/event_dispatcher_kqueue.cpp @@ -111,7 +111,7 @@ void EventDispatcher::Join() { } } -int EventDispatcher::RegisterEvent(EventDataId event_data_id, +int EventDispatcher::RegisterEvent(IOEventDataId event_data_id, int fd, bool pollin) { if (_event_dispatcher_fd < 0) { errno = EINVAL; @@ -135,7 +135,7 @@ int EventDispatcher::RegisterEvent(EventDataId event_data_id, return 0; } -int EventDispatcher::UnregisterEvent(EventDataId event_data_id, +int EventDispatcher::UnregisterEvent(IOEventDataId event_data_id, int fd, bool pollin) { struct kevent evt; EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); @@ -150,7 +150,7 @@ int EventDispatcher::UnregisterEvent(EventDataId event_data_id, return 0; } -int EventDispatcher::AddConsumer(EventDataId event_data_id, int fd) { +int EventDispatcher::AddConsumer(IOEventDataId event_data_id, int fd) { if (_event_dispatcher_fd < 0) { errno = EINVAL; return -1; @@ -206,14 +206,14 @@ void EventDispatcher::Run() { for (int i = 0; i < n; ++i) { if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) { // We don't care about the return value. - CallInputEventCallback((EventDataId)e[i].udata, + CallInputEventCallback((IOEventDataId)e[i].udata, e[i].filter, _thread_attr); } } for (int i = 0; i < n; ++i) { if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) { // We don't care about the return value. - CallOutputEventCallback((EventDataId)e[i].udata, + CallOutputEventCallback((IOEventDataId)e[i].udata, e[i].filter, _thread_attr); } } diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 7a0d3ac332..74bf132bd9 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -434,7 +434,6 @@ Socket::Socket(Forbidden f) , _on_edge_triggered_events(NULL) , _user(NULL) , _conn(NULL) - , _event_data_id(0) , _preferred_index(-1) , _hc_count(0) , _last_msg_size(0) @@ -467,8 +466,7 @@ Socket::Socket(Forbidden f) , _stream_set(NULL) , _total_streams_unconsumed_size(0) , _ninflight_app_health_check(0) - , _http_request_method(HTTP_METHOD_GET) -{ + , _http_request_method(HTTP_METHOD_GET) { CreateVarsOnce(); pthread_mutex_init(&_id_wait_list_mutex, NULL); _epollout_butex = bthread::butex_create_checked >(); @@ -583,8 +581,7 @@ int Socket::ResetFileDescriptor(int fd) { EnableKeepaliveIfNeeded(fd); if (_on_edge_triggered_events) { - if (GetGlobalEventDispatcher(fd, _bthread_tag).AddConsumer( - _event_data_id, fd) != 0) { + if (_io_event.AddConsumer(fd) != 0) { PLOG(ERROR) << "Fail to add SocketId=" << id() << " into EventDispatcher"; _fd.store(-1, butil::memory_order_release); @@ -665,15 +662,14 @@ int Socket::Create(const SocketOptions& options, SocketId* id) { } int Socket::OnCreated(const SocketOptions& options) { - EventDataOptions event_data_options{ - id(), StartInputEvent, HandleEpollOut - }; - if (EventData::Create(&_event_data_id, event_data_options) !=0) { - LOG(ERROR) << "Fail to create EventDispatcherData"; + if (_io_event.Init((void*)id()) != 0) { + LOG(ERROR) << "Fail to init IOEvent"; + SetFailed(ENOMEM, "%s", "Fail to init IOEvent"); return -1; } + _io_event.set_bthread_tag(options.bthread_tag); auto guard = butil::MakeScopeGuard([this] { - MakeEventDataIdInvalid(_event_data_id); + _io_event.Reset(); }); g_vars->nsocket << 1; @@ -745,7 +741,6 @@ int Socket::OnCreated(const SocketOptions& options) { _last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed); _unwritten_bytes.store(0, butil::memory_order_relaxed); _keepalive_options = options.keepalive_options; - _bthread_tag = options.bthread_tag; CHECK(NULL == _write_head.load(butil::memory_order_relaxed)); // Must be the last one! Internal fields of this Socket may be accessed // just after calling ResetFileDescriptor. @@ -783,19 +778,17 @@ void Socket::BeforeRecycled() { sp->RemoveRefManually(); } - // Recycle `_event_data_id'. - MakeEventDataIdInvalid(_event_data_id); - const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed); if (ValidFileDescriptor(prev_fd)) { if (_on_edge_triggered_events != NULL) { - GetGlobalEventDispatcher(prev_fd, _bthread_tag).RemoveConsumer(prev_fd); + _io_event.RemoveConsumer(prev_fd); } close(prev_fd); if (create_by_connect) { g_vars->channel_conn << -1; } } + _io_event.Reset(); #if BRPC_WITH_RDMA if (_rdma_ep) { @@ -947,7 +940,7 @@ int Socket::WaitAndReset(int32_t expected_nref) { const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed); if (ValidFileDescriptor(prev_fd)) { if (_on_edge_triggered_events != NULL) { - GetGlobalEventDispatcher(prev_fd, _bthread_tag).RemoveConsumer(prev_fd); + _io_event.RemoveConsumer(prev_fd); } close(prev_fd); if (CreatedByConnect()) { @@ -1191,8 +1184,7 @@ int Socket::WaitEpollOut(int fd, bool pollin, const timespec* abstime) { // Do not need to check addressable since it will be called by // health checker which called `SetFailed' before const int expected_val = _epollout_butex->load(butil::memory_order_relaxed); - EventDispatcher& edisp = GetGlobalEventDispatcher(fd, _bthread_tag); - if (edisp.RegisterEvent(_event_data_id, fd, pollin) != 0) { + if (_io_event.RegisterEvent(fd, pollin) != 0) { return -1; } @@ -1204,7 +1196,7 @@ int Socket::WaitEpollOut(int fd, bool pollin, const timespec* abstime) { } // Ignore return value since `fd' might have been removed // by `RemoveConsumer' in `SetFailed' - butil::ignore_result(edisp.UnregisterEvent(id(), fd, pollin)); + butil::ignore_result(_io_event.UnregisterEvent(fd, pollin)); errno = saved_errno; // Could be writable or spurious wakeup (by former epollout) return rc; @@ -1252,7 +1244,7 @@ int Socket::Connect(const timespec* abstime, // be added into epoll device soon SocketId connect_id; SocketOptions options; - options.bthread_tag = _bthread_tag; + options.bthread_tag = _io_event.bthread_tag(); options.user = req; if (Socket::Create(options, &connect_id) != 0) { LOG(FATAL) << "Fail to create Socket"; @@ -1267,8 +1259,7 @@ int Socket::Connect(const timespec* abstime, // Add `sockfd' into epoll so that `HandleEpollOutRequest' will // be called with `req' when epoll event reaches - if (GetGlobalEventDispatcher(sockfd, _bthread_tag).RegisterEvent( - s->event_data_id(), sockfd, false) != 0) { + if (s->_io_event.RegisterEvent(sockfd, false) != 0) { const int saved_errno = errno; PLOG(WARNING) << "Fail to add fd=" << sockfd << " into epoll"; s->SetFailed(saved_errno, "Fail to add fd=%d into epoll: %s", @@ -1336,7 +1327,7 @@ int Socket::ConnectIfNot(const timespec* abstime, WriteRequest* req) { return 0; } // Set tag for client side socket - _bthread_tag = bthread_self_tag(); + _io_event.set_bthread_tag(bthread_self_tag()); // Have to hold a reference for `req' SocketUniquePtr s; ReAddress(&s); @@ -1359,8 +1350,9 @@ void Socket::WakeAsEpollOut() { bthread::butex_wake_except(_epollout_butex, 0); } -int Socket::HandleEpollOut(SocketId id, uint32_t, - const bthread_attr_t&) { +int Socket::OnOutputEvent(void* user_data, uint32_t, + const bthread_attr_t&) { + auto id = reinterpret_cast(user_data); SocketUniquePtr s; // Since Sockets might have been `SetFailed' before they were // added into epoll, these sockets miss the signal inside @@ -1406,8 +1398,7 @@ int Socket::HandleEpollOutRequest(int error_code, EpollOutRequest* req) { } // We've got the right to call user callback // The timer will be removed inside destructor of EpollOutRequest - GetGlobalEventDispatcher(req->fd, _bthread_tag) - .UnregisterEvent(_event_data_id, req->fd, false); + butil::ignore_result(_io_event.UnregisterEvent(req->fd, false)); return req->on_epollout_event(req->fd, error_code, req->data); } @@ -2119,8 +2110,9 @@ AuthContext* Socket::mutable_auth_context() { return _auth_context; } -int Socket::StartInputEvent(SocketId id, uint32_t events, - const bthread_attr_t& thread_attr) { +int Socket::OnInputEvent(void* user_data, uint32_t events, + const bthread_attr_t& thread_attr) { + auto id = reinterpret_cast(user_data); SocketUniquePtr s; if (Address(id, &s) < 0) { return -1; @@ -2458,7 +2450,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) { ptr->_rdma_ep->DebugInfo(os); } #endif - { os << "\nbthread_tag=" << ptr->_bthread_tag; } + { os << "\nbthread_tag=" << ptr->_io_event.bthread_tag(); } } int Socket::CheckHealth() { diff --git a/src/brpc/socket.h b/src/brpc/socket.h index e2de5419ba..1c431f533a 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -246,6 +246,7 @@ friend class OnAppHealthCheckDone; friend class HealthCheckManager; friend class policy::H2GlobalStreamCreator; friend class VersionedRefWithId; +friend class IOEvent; friend void DereferenceSocket(Socket*); class SharedPart; struct WriteRequest; @@ -413,8 +414,8 @@ friend void DereferenceSocket(Socket*); // Start to process edge-triggered events from the fd. // This function does not block caller. - static int StartInputEvent(SocketId id, uint32_t events, - const bthread_attr_t& thread_attr); + static int OnInputEvent(void* user_data, uint32_t events, + const bthread_attr_t& thread_attr); static const int PROGRESS_INIT = 1; bool MoreReadEvents(int* progress); @@ -653,14 +654,12 @@ friend void DereferenceSocket(Socket*); WriteRequest*, int error_code, const std::string& error_text); void ReleaseAllFailedWriteRequests(WriteRequest*); - EventDataId event_data_id() const { return _event_data_id; } - // Try to wake socket just like epollout has arrived void WakeAsEpollOut(); - // Generic callback for Socket to handle epollout event - static int HandleEpollOut(SocketId socket_id, uint32_t events, - const bthread_attr_t& thread_attr); + // Generic callback for Socket to handle output event. + static int OnOutputEvent(void* user_data, uint32_t, + const bthread_attr_t&); class EpollOutRequest; // Callback to handle epollout event whose request data @@ -732,7 +731,6 @@ friend void DereferenceSocket(Socket*); // [ Set in ResetFileDescriptor ] butil::atomic _fd; // -1 when not connected. - bthread_tag_t _bthread_tag; // bthread tag of this socket int _tos; // Type of service which is actually only 8bits. int64_t _reset_fd_real_us; // When _fd was reset, in microseconds. @@ -758,8 +756,7 @@ friend void DereferenceSocket(Socket*); // Initialized by SocketOptions.app_connect. std::shared_ptr _app_connect; - // Identifier of EventData in ResourcePool. - EventDataId _event_data_id; + IOEvent _io_event; // last chosen index of the protocol as a heuristic value to avoid // iterating all protocol handlers each time. diff --git a/src/butil/type_traits.h b/src/butil/type_traits.h index 67bf029a7a..20d01589f8 100644 --- a/src/butil/type_traits.h +++ b/src/butil/type_traits.h @@ -352,15 +352,20 @@ template struct is_enum : is_enum { }; template struct is_enum : is_enum { }; // Whether a callable returns type which is same as ReturnType. -template +template struct is_callable_return_same : public butil::is_same()(std::declval()...))> {}; + std::declval()(std::declval()...))> {}; // Whether a callable returns void. -template +template struct is_callable_return_void - : public is_callable_return_same {}; + : public is_callable_return_same {}; + +// Whether a callable returns int. +template +struct is_callable_return_int + : public is_callable_return_same {}; // Whether a function of class returns type which is same as ReturnType. template diff --git a/test/brpc_event_dispatcher_unittest.cpp b/test/brpc_event_dispatcher_unittest.cpp index 0335e1d0a0..185e9f2dc8 100644 --- a/test/brpc_event_dispatcher_unittest.cpp +++ b/test/brpc_event_dispatcher_unittest.cpp @@ -407,7 +407,6 @@ class EventPipe : public brpc::VersionedRefWithId { public: explicit EventPipe(Forbidden f) : brpc::VersionedRefWithId(f) - , _event_data_id(INVALID_EVENT_PIPE_ID) , _pipe_fds{-1, -1} , _input_event_count(0) {} @@ -423,41 +422,33 @@ class EventPipe : public brpc::VersionedRefWithId { private: friend class VersionedRefWithId; +friend class brpc::IOEvent; int OnCreated() { - brpc::EventDataOptions event_data_options { - id(), - HandleEpollIn, - HandleEpollOut - }; - if (brpc::EventData::Create(&_event_data_id, event_data_options) != 0) { - LOG(ERROR) << "Fail to create EventData"; - return -1; - } - auto guard = butil::MakeScopeGuard([this] { - brpc::MakeEventDataIdInvalid(_event_data_id); - }); - if (pipe(_pipe_fds)) { PLOG(FATAL) << "Fail to create _pipe_fds"; return -1; } - if (brpc::GetGlobalEventDispatcher(_pipe_fds[0], bthread_self_tag()) - .AddConsumer(_event_data_id, _pipe_fds[0]) != 0) { + if (_io_event.Init((void*)id()) != 0) { + LOG(ERROR) << "Fail to init IOEvent"; + return -1; + } + _io_event.set_bthread_tag(bthread_self_tag()); + if (_io_event.AddConsumer(_pipe_fds[0]) != 0) { PLOG(ERROR) << "Fail to add SocketId=" << id() << " into EventDispatcher"; return -1; } - guard.dismiss(); + _input_event_count = 0; return 0; } void BeforeRecycled() { - brpc::MakeEventDataIdInvalid(_event_data_id); brpc::GetGlobalEventDispatcher(_pipe_fds[0], bthread_self_tag()) .RemoveConsumer(_pipe_fds[0]); + _io_event.Reset(); if (_pipe_fds[0] >= 0) { close(_pipe_fds[0]); } @@ -466,8 +457,9 @@ friend class VersionedRefWithId; } } - static int HandleEpollIn(EventPipeId id, uint32_t events, - const bthread_attr_t& thread_attr) { + static int OnInputEvent(void* user_data, uint32_t, + const bthread_attr_t&) { + auto id = reinterpret_cast(user_data); EventPipeUniquePtr ptr; if (EventPipe::Address(id, &ptr) != 0) { LOG(WARNING) << "Fail to address EventPipe"; @@ -490,13 +482,13 @@ friend class VersionedRefWithId; return 0; } - static int HandleEpollOut(EventPipeId id, uint32_t events, - const bthread_attr_t& thread_attr) { + static int OnOutputEvent(void*, uint32_t, + const bthread_attr_t&) { EXPECT_TRUE(false) << "Should not be called"; return 0; } - brpc::EventDataId _event_data_id; + brpc::IOEvent _io_event; int _pipe_fds[2]; size_t _input_event_count;