Skip to content

Commit

Permalink
Support IOEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Apr 8, 2024
1 parent b24717b commit 7de2d5b
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 142 deletions.
36 changes: 3 additions & 33 deletions src/brpc/event_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,7 @@ EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag) {
return g_edisp[tag * FLAGS_event_dispatcher_num + index];
}

int EventData::OnCreated(const EventDataOptions& options) {
if (options.user_id == INVALID_VREF_ID) {
LOG(ERROR) << "Invalid user_id=-1";
return -1;
}
int IOEventData::OnCreated(const IOEventDataOptions& options) {
if (!options.input_cb) {
LOG(ERROR) << "Invalid input_cb=NULL";
return -1;
Expand All @@ -89,34 +85,8 @@ int EventData::OnCreated(const EventDataOptions& options) {
return 0;
}

void EventData::BeforeRecycled() {
_options = {INVALID_EVENT_DATA_ID, NULL, NULL};
}

void MakeEventDataIdInvalid(EventDataId& id) {
EventData::SetFailedById(id);
id = INVALID_EVENT_DATA_ID;
}

int EventDispatcher::CallInputEventCallback(EventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr) {
EventDataUniquePtr data;
if (EventData::Address(event_data_id, &data) != 0) {
return -1;
}
data->CallInputEventCallback(events, thread_attr);
return 0;
}

int EventDispatcher::CallOutputEventCallback(EventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr) {
EventDataUniquePtr data;
if (EventData::Address(event_data_id, &data) != 0) {
return -1;
}
return data->CallOutputEventCallback(events, thread_attr);
void IOEventData::BeforeRecycled() {
_options = { NULL, NULL, NULL };
}

} // namespace brpc
Expand Down
175 changes: 146 additions & 29 deletions src/brpc/event_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,65 +26,63 @@

namespace brpc {

// Unique identifier of a EventData.
// Unique identifier of a IOEventData.
// Users shall store EventDataId instead of EventData and call EventData::Address()
// to convert the identifier to an unique_ptr at each access. Whenever a
// unique_ptr is not destructed, the enclosed EventData will not be recycled.
typedef VRefId EventDataId;
typedef VRefId IOEventDataId;

const VRefId INVALID_EVENT_DATA_ID = INVALID_VREF_ID;
const VRefId INVALID_IO_EVENT_DATA_ID = INVALID_VREF_ID;

class EventData;
class IOEventData;

typedef VersionedRefWithIdUniquePtr<EventData> EventDataUniquePtr;
typedef VersionedRefWithIdUniquePtr<IOEventData> EventDataUniquePtr;

// User callback type of input event and output event.
typedef int (*InputEventCallback) (VRefId id, uint32_t events,
typedef int (*InputEventCallback) (void* id, uint32_t events,
const bthread_attr_t& thread_attr);
typedef InputEventCallback OutputEventCallback;

struct EventDataOptions {
// Find user object to handle event by `user_id'.
uint64_t user_id;
struct IOEventDataOptions {
// Callback for input event.
InputEventCallback input_cb;
// Callback for output event.
OutputEventCallback output_cb;
// User data.
void* user_data;
};

// EventDispatcher finds EventData by EventDataId which is
// EventDispatcher finds IOEventData by IOEventDataId which is
// stored in epoll/kqueue data, and calls its callback, so
// EventDispatcher supports various IO types, such as socket,
// pipe, eventfd, timerfd, etc.
class EventData : public VersionedRefWithId<EventData> {
class IOEventData : public VersionedRefWithId<IOEventData> {
public:
explicit EventData(Forbidden f)
: VersionedRefWithId<EventData>(f)
, _options{INVALID_EVENT_DATA_ID, NULL, NULL} {}
explicit IOEventData(Forbidden f)
: VersionedRefWithId<IOEventData>(f)
, _options{ NULL, NULL, NULL } {}

DISALLOW_COPY_AND_ASSIGN(EventData);
DISALLOW_COPY_AND_ASSIGN(IOEventData);

int CallInputEventCallback(uint32_t events,
const bthread_attr_t& thread_attr) {
return _options.input_cb(_options.user_id, events, thread_attr);
return _options.input_cb(_options.user_data, events, thread_attr);
}

int CallOutputEventCallback(uint32_t events,
const bthread_attr_t& thread_attr) {
return _options.output_cb(_options.user_id, events, thread_attr);
return _options.output_cb(_options.user_data, events, thread_attr);
}

private:
friend class VersionedRefWithId<EventData>;
friend class VersionedRefWithId<IOEventData>;

int OnCreated(const EventDataOptions& options);
int OnCreated(const IOEventDataOptions& options);
void BeforeRecycled();

EventDataOptions _options;
IOEventDataOptions _options;
};

void MakeEventDataIdInvalid(EventDataId& id);

namespace rdma {
class RdmaEndpoint;
}
Expand All @@ -94,6 +92,7 @@ class RdmaEndpoint;
class EventDispatcher {
friend class Socket;
friend class rdma::RdmaEndpoint;
template <typename T> friend class IOEvent;
public:
EventDispatcher();

Expand All @@ -120,19 +119,19 @@ friend class rdma::RdmaEndpoint;
// When the file descriptor is removed from internal epoll, the Socket
// will be dereferenced once additionally.
// Returns 0 on success, -1 otherwise.
int AddConsumer(EventDataId event_data_id, int fd);
int AddConsumer(IOEventDataId event_data_id, int fd);

// Watch EPOLLOUT event on `fd' into epoll device. If `pollin' is
// true, EPOLLIN event will also be included and EPOLL_CTL_MOD will
// be used instead of EPOLL_CTL_ADD. When event arrives,
// `Socket::HandleEpollOut' will be called with `socket_id'
// Returns 0 on success, -1 otherwise and errno is set
int RegisterEvent(EventDataId event_data_id, int fd, bool pollin);
int RegisterEvent(IOEventDataId event_data_id, int fd, bool pollin);

// Remove EPOLLOUT event on `fd'. If `pollin' is true, EPOLLIN event
// will be kept and EPOLL_CTL_MOD will be used instead of EPOLL_CTL_DEL
// Returns 0 on success, -1 otherwise and errno is set
int UnregisterEvent(EventDataId event_data_id, int fd, bool pollin);
int UnregisterEvent(IOEventDataId event_data_id, int fd, bool pollin);

private:
DISALLOW_COPY_AND_ASSIGN(EventDispatcher);
Expand All @@ -147,12 +146,29 @@ friend class rdma::RdmaEndpoint;
int RemoveConsumer(int fd);

// Call user callback of input event and output event.
static int CallInputEventCallback(EventDataId event_data_id,
template<bool IsInputEvent>
static int OnEvent(IOEventDataId event_data_id, uint32_t events,
const bthread_attr_t& thread_attr) {
EventDataUniquePtr data;
if (IOEventData::Address(event_data_id, &data) != 0) {
return -1;
}
return IsInputEvent ?
data->CallInputEventCallback(events, thread_attr) :
data->CallOutputEventCallback(events, thread_attr);
}

static int CallInputEventCallback(IOEventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr);
static int CallOutputEventCallback(EventDataId event_data_id,
const bthread_attr_t& thread_attr) {
return OnEvent<true>(event_data_id, events, thread_attr);
}

static int CallOutputEventCallback(IOEventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr);
const bthread_attr_t& thread_attr) {
return OnEvent<false>(event_data_id, events, thread_attr);
}

// The epoll/kqueue fd to watch events.
int _event_dispatcher_fd;
Expand All @@ -172,6 +188,107 @@ friend class rdma::RdmaEndpoint;

EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag);

// IOEvent is used to manage the IO events of a file descriptor conveniently.
template <typename T>
class IOEvent {
public:
IOEvent()
: _init(false)
, _event_data_id(INVALID_IO_EVENT_DATA_ID)
, _bthread_tag(bthread_self_tag()) {}

int Init(void* user_data) {
if (_init) {
LOG(WARNING) << "IOEvent has been initialized";
return 0;
}
IOEventDataOptions options{ OnInputEvent, OnOutputEvent, user_data };
if (IOEventData::Create(&_event_data_id, options) != 0) {
LOG(ERROR) << "Fail to create EventData";
return -1;
}
_init = true;
return 0;
}

void Reset() {
if (_init) {
IOEventData::SetFailedById(_event_data_id);
_init = false;
}
}

int AddConsumer(int fd) {
if (!_init) {
LOG(ERROR) << "IOEvent has not been initialized";
return -1;
}
return GetGlobalEventDispatcher(fd, _bthread_tag)
.AddConsumer(_event_data_id, fd);
}


int RemoveConsumer(int fd) {
if (!_init) {
LOG(ERROR) << "IOEvent has not been initialized";
return -1;
}
return GetGlobalEventDispatcher(fd, _bthread_tag).RemoveConsumer(fd);
}


int RegisterEvent(int fd, bool pollin) {
if (!_init) {
LOG(ERROR) << "IOEvent has not been initialized";
return -1;
}
return GetGlobalEventDispatcher(fd, _bthread_tag)
.RegisterEvent(_event_data_id, fd, pollin);
}

int UnregisterEvent(int fd, bool pollin) {
if (!_init) {
LOG(ERROR) << "IOEvent has not been initialized";
return -1;
}
return GetGlobalEventDispatcher(fd, _bthread_tag)
.UnregisterEvent(_event_data_id, fd, pollin);
}

void set_bthread_tag(bthread_tag_t bthread_tag) {
_bthread_tag = bthread_tag;
}
bthread_tag_t bthread_tag() const {
return _bthread_tag;
}

private:

static int OnInputEvent(void* user_data, uint32_t events,
const bthread_attr_t& thread_attr) {
static_assert(
butil::is_callable_return_int<decltype(&T::OnInputEvent),
void*, uint32_t,
bthread_attr_t>::value,
"T::OnInputEvent signature mismatch");
return T::OnInputEvent(user_data, events, thread_attr);
}

static int OnOutputEvent(void* user_data, uint32_t events,
const bthread_attr_t& thread_attr) {
static_assert(
butil::is_callable_return_int<decltype(&T::OnOutputEvent),
void*, uint32_t,
bthread_attr_t>::value,
"T::OnInputEvent signature mismatch");
return T::OnOutputEvent(user_data, events, thread_attr);
}

bool _init;
IOEventDataId _event_data_id;
bthread_tag_t _bthread_tag;
};

} // namespace brpc


Expand Down
13 changes: 6 additions & 7 deletions src/brpc/event_dispatcher_epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {

// Set _thread_attr before creating epoll thread to make sure
// everyting seems sane to the thread.
_thread_attr = (consumer_thread_attr ?
*consumer_thread_attr : BTHREAD_ATTR_NORMAL);
_thread_attr = consumer_thread_attr ?
*consumer_thread_attr : BTHREAD_ATTR_NORMAL;

//_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread
// that created by epoll_wait() never to quit.
Expand All @@ -81,8 +81,7 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
// when the older comlog (e.g. 3.1.85) calls com_openlog_r(). Since this
// is also a potential issue for consumer threads, using the same attr
// should be a reasonable solution.
int rc = bthread_start_background(
&_tid, &epoll_thread_attr, RunThis, this);
int rc = bthread_start_background(&_tid, &epoll_thread_attr, RunThis, this);
if (rc) {
LOG(FATAL) << "Fail to create epoll thread: " << berror(rc);
return -1;
Expand Down Expand Up @@ -110,7 +109,7 @@ void EventDispatcher::Join() {
}
}

int EventDispatcher::RegisterEvent(EventDataId event_data_id,
int EventDispatcher::RegisterEvent(IOEventDataId event_data_id,
int fd, bool pollin) {
if (_event_dispatcher_fd < 0) {
errno = EINVAL;
Expand Down Expand Up @@ -138,7 +137,7 @@ int EventDispatcher::RegisterEvent(EventDataId event_data_id,
return 0;
}

int EventDispatcher::UnregisterEvent(EventDataId event_data_id,
int EventDispatcher::UnregisterEvent(IOEventDataId event_data_id,
int fd, bool pollin) {
if (pollin) {
epoll_event evt;
Expand All @@ -154,7 +153,7 @@ int EventDispatcher::UnregisterEvent(EventDataId event_data_id,
return -1;
}

int EventDispatcher::AddConsumer(EventDataId event_data_id, int fd) {
int EventDispatcher::AddConsumer(IOEventDataId event_data_id, int fd) {
if (_event_dispatcher_fd < 0) {
errno = EINVAL;
return -1;
Expand Down
Loading

0 comments on commit 7de2d5b

Please sign in to comment.