Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
EventDispatcher supports various IO types
Browse files Browse the repository at this point in the history
chenBright committed Mar 4, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 24fc31e commit 9b51e09
Showing 10 changed files with 970 additions and 170 deletions.
54 changes: 54 additions & 0 deletions src/brpc/event_dispatcher.cpp
Original file line number Diff line number Diff line change
@@ -71,6 +71,60 @@ EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag) {
return g_edisp[tag * FLAGS_event_dispatcher_num + index];
}

int EventData::OnCreate(uint64_t user_id,
brpc::InputEventCallback input_cb,
brpc::OutputEventCallback output_cb) {
if (user_id == INVALID_VREF_ID) {
LOG(ERROR) << "Invalid user_id=-1";
return -1;
}
if (!input_cb) {
LOG(ERROR) << "Invalid input_cb=NULL";
return -1;
}
if (!output_cb) {
LOG(ERROR) << "Invalid output_cb=NULL";
return -1;
}

_user_id = user_id;
_input_cb = input_cb;
_output_cb = output_cb;
return 0;
}

void EventData::OnRecycle() {
_user_id = INVALID_VREF_ID;
_input_cb = NULL;
_output_cb = NULL;
}

void MakeEventDataIdInvalid(EventDataId& id) {
EventData::SetFailed(id);
id = INVALID_EVENT_DATA_ID;
}

int EventDispatcher::CallInputEventCallback(EventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr) {
EventDataUniquePtr data;
if (EventData::Address(event_data_id, &data) != 0) {
return -1;
}
data->CallInputEventCallback(events, thread_attr);
return 0;
}

int EventDispatcher::CallOutputEventCallback(EventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr) {
EventDataUniquePtr data;
if (EventData::Address(event_data_id, &data) != 0) {
return -1;
}
return data->CallOutputEventCallback(events, thread_attr);
}

} // namespace brpc

#if defined(OS_LINUX)
84 changes: 76 additions & 8 deletions src/brpc/event_dispatcher.h
Original file line number Diff line number Diff line change
@@ -21,11 +21,71 @@

#include "butil/macros.h" // DISALLOW_COPY_AND_ASSIGN
#include "bthread/types.h" // bthread_t, bthread_attr_t
#include "brpc/socket.h" // Socket, SocketId
#include "brpc/versioned_ref_with_id.h"


namespace brpc {

// Unique identifier of a EventData.
// Users shall store EventDataId instead of EventData and call EventData::Address()
// to convert the identifier to an unique_ptr at each access. Whenever a
// unique_ptr is not destructed, the enclosed EventData will not be recycled.
typedef VRefId EventDataId;

const VRefId INVALID_EVENT_DATA_ID = INVALID_VREF_ID;

class EventData;

typedef VersionedRefWithIdUniquePtr<EventData> EventDataUniquePtr;

// User callback type of input event and output event.
typedef int (*InputEventCallback) (VRefId id, uint32_t events,
const bthread_attr_t& thread_attr);
typedef InputEventCallback OutputEventCallback;

// EventDispatcher finds EventData by EventDataId which is
// stored in epoll/kqueue data, and calls its callback, so
// EventDispatcher supports various IO types, such as socket,
// pipe, eventfd, timerfd, etc.
class EventData : public VersionedRefWithId<EventData> {
public:
explicit EventData(Forbidden f)
:VersionedRefWithId<EventData>(f)
, _user_id(INVALID_VREF_ID)
, _input_cb(NULL)
, _output_cb(NULL)
{}

int CallInputEventCallback(uint32_t events,
const bthread_attr_t& thread_attr) {
return _input_cb(_user_id, events, thread_attr);
}

int CallOutputEventCallback(uint32_t events,
const bthread_attr_t& thread_attr) {
return _output_cb(_user_id, events, thread_attr);
}

private:
friend class VersionedRefWithId<EventData>;

int OnCreate(uint64_t user_id,
InputEventCallback input_cb,
OutputEventCallback output_cb);
void OnFailed() {}
void OnRecycle();

uint64_t _user_id;
InputEventCallback _input_cb;
OutputEventCallback _output_cb;
};

void MakeEventDataIdInvalid(EventDataId& id);

namespace rdma {
class RdmaEndpoint;
}

// Dispatch edge-triggered events of file descriptors to consumers
// running in separate bthreads.
class EventDispatcher {
@@ -40,7 +100,7 @@ friend class rdma::RdmaEndpoint;
// Use |*consumer_thread_attr| (if it's not NULL) as the attribute to
// create bthreads running user callbacks.
// Returns 0 on success, -1 otherwise.
virtual int Start(const bthread_attr_t* consumer_thread_attr);
virtual int Start(const bthread_attr_t* thread_attr);

// True iff this dispatcher is running in a bthread
bool Running() const;
@@ -57,19 +117,19 @@ friend class rdma::RdmaEndpoint;
// When the file descriptor is removed from internal epoll, the Socket
// will be dereferenced once additionally.
// Returns 0 on success, -1 otherwise.
int AddConsumer(SocketId socket_id, int fd);
int AddConsumer(EventDataId event_data_id, int fd);

// Watch EPOLLOUT event on `fd' into epoll device. If `pollin' is
// true, EPOLLIN event will also be included and EPOLL_CTL_MOD will
// be used instead of EPOLL_CTL_ADD. When event arrives,
// `Socket::HandleEpollOut' will be called with `socket_id'
// Returns 0 on success, -1 otherwise and errno is set
int RegisterEvent(SocketId socket_id, int fd, bool pollin);
int RegisterEvent(EventDataId event_data_id, int fd, bool pollin);

// Remove EPOLLOUT event on `fd'. If `pollin' is true, EPOLLIN event
// will be kept and EPOLL_CTL_MOD will be used instead of EPOLL_CTL_DEL
// Returns 0 on success, -1 otherwise and errno is set
int UnregisterEvent(SocketId socket_id, int fd, bool pollin);
int UnregisterEvent(EventDataId event_data_id, int fd, bool pollin);

private:
DISALLOW_COPY_AND_ASSIGN(EventDispatcher);
@@ -83,8 +143,16 @@ friend class rdma::RdmaEndpoint;
// Remove the file descriptor `fd' from epoll.
int RemoveConsumer(int fd);

// The epoll to watch events.
int _epfd;
// Call user callback of input event and output event.
static int CallInputEventCallback(EventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr);
static int CallOutputEventCallback(EventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr);

// The epoll/kqueue fd to watch events.
int _event_dispatcher_fd;

// false unless Stop() is called.
volatile bool _stop;
@@ -93,7 +161,7 @@ friend class rdma::RdmaEndpoint;
bthread_t _tid;

// The attribute of bthreads calling user callbacks.
bthread_attr_t _consumer_thread_attr;
bthread_attr_t _thread_attr;

// Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit
int _wakeup_fds[2];
Loading

0 comments on commit 9b51e09

Please sign in to comment.