Skip to content

Commit

Permalink
Make epoll event loop use EPOLLET and not reregister file descriptors.
Browse files Browse the repository at this point in the history
This results in fewer system calls and presumably more effcient code. It
also brings the epoll (and kqueue) code more in line with how the
windows IOCP code works, incidentally.
  • Loading branch information
bakpakin committed Oct 5, 2023
1 parent 8c0d65c commit 5f2e287
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 125 deletions.
179 changes: 58 additions & 121 deletions src/core/ev.c
Original file line number Diff line number Diff line change
Expand Up @@ -1482,11 +1482,11 @@ void janet_ev_deinit(void) {
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
/* Add the handle to the io completion port if not already added */
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
if (!(stream->flags & JANET_STREAM_IOCP)) {
if (!(stream->flags & JANET_STREAM_REGISTERED)) {
if (NULL == CreateIoCompletionPort(stream->handle, janet_vm.iocp, (ULONG_PTR) stream, 0)) {
janet_panicf("failed to listen for events: %V", janet_ev_lasterr());
}
stream->flags |= JANET_STREAM_IOCP;
stream->flags |= JANET_STREAM_REGISTERED;
}
return state;
}
Expand Down Expand Up @@ -1556,81 +1556,59 @@ static JanetTimestamp ts_now(void) {

static void janet_epoll_sync_callback(JanetEVGenericMessage msg) {
JanetListenerState *state = msg.argp;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
if (state == state->stream->read_state)
status1 = state->machine(state, JANET_ASYNC_EVENT_READ);
if (state == state->stream->write_state)
status2 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state);
} else {
/* Repost event */
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
if (state->stream) {
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
if (state == state->stream->read_state)
status1 = state->machine(state, JANET_ASYNC_EVENT_READ);
if (state == state->stream->write_state)
status2 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state);
} else {
/* Repost event */
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
}
}
}

/* Wait for the next event */
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
int is_first = !stream->read_state && !stream->write_state;
int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
struct epoll_event ev;
ev.events = 0;
if (stream->read_state) ev.events |= EPOLLIN;
if (stream->write_state) ev.events |= EPOLLOUT;
ev.data.ptr = stream;
int status;
do {
status = epoll_ctl(janet_vm.epoll, op, stream->handle, &ev);
} while (status == -1 && errno == EINTR);
if (status == -1) {
if (errno == EPERM) {
/* Couldn't add to event loop, so assume that it completes
* synchronously. In that case, fire the completion
* event manually, since this should be a read or write
* event to a file. So we just post a custom event to do the read/write
* asap. */
/* Use flag to indicate state is not registered in epoll */
state->flags = 1;
JanetEVGenericMessage msg = {0};
msg.argp = state;
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
} else {
/* Unexpected error */
janet_unlisten_impl(state);
janet_panicv(janet_ev_lasterr());
if (!(stream->flags & JANET_STREAM_REGISTERED)) {
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.data.ptr = stream;
int status;
do {
status = epoll_ctl(janet_vm.epoll, EPOLL_CTL_ADD, stream->handle, &ev);
} while (status == -1 && errno == EINTR);
if (status == -1) {
if (errno == EPERM) {
/* Couldn't add to event loop, so assume that it completes
* synchronously. In that case, fire the completion
* event manually, since this should be a read or write
* event to a file. So we just post a custom event to do the read/write
* asap. */
/* Use flag to indicate state is not registered in epoll */
state->flags = 1;
JanetEVGenericMessage msg = {0};
msg.argp = state;
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
} else {
/* Unexpected error */
janet_unlisten_impl(state);
janet_panicv(janet_ev_lasterr());
}
}
stream->flags |= JANET_STREAM_REGISTERED;
}
return state;
}

/* Tell system we are done listening for a certain event */
static void janet_unlisten(JanetListenerState *state) {
JanetStream *stream = state->stream;
if (stream && (stream->handle != -1)) {
/* Use flag to indicate state is not registered in epoll */
if (!state->flags) {
int is_read = (stream->read_state != state) && stream->read_state;
int is_write = (stream->write_state != state) && stream->write_state;
int is_last = !is_read && !is_write;
int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
struct epoll_event ev;
ev.events = 0;
if (is_read) ev.events |= EPOLLIN;
if (is_write) ev.events |= EPOLLOUT;
ev.data.ptr = stream;
int status;
do {
status = epoll_ctl(janet_vm.epoll, op, stream->handle, &ev);
} while (status == -1 && errno == EINTR);
if (status == -1) {
janet_panicv(janet_ev_lasterr());
}
}
}
/* Destroy state machine and free memory */
janet_unlisten_impl(state);
}

Expand Down Expand Up @@ -1762,67 +1740,24 @@ static void timestamp2timespec(struct timespec *t, JanetTimestamp ts) {
t->tv_nsec = ts == 0 ? 0 : (ts % 1000) * 1000000;
}

void add_kqueue_events(const struct kevent *events, int length) {
/* NOTE: Status should be equal to the amount of events added, which isn't
* always known since deletions or modifications occur. Can't use the
* eventlist argument for it to report to us what failed otherwise we may
* poll in events to handle! This code assumes atomicity, that kqueue can
* either succeed or fail, but never partially (which is seemingly how it
* works in practice). When encountering an "inbetween" state we currently
* just panic!
*
* The FreeBSD man page kqueue(2) shows a check through the change list to
* check if kqueue had an error with any of the events being pushed to
* change. Maybe we should do this, even tho the man page also doesn't
* note that kqueue actually does this. We do not do this at this time. */
int status;
status = kevent(janet_vm.kq, events, length, NULL, 0, NULL);
if (status == -1 && errno != EINTR)
janet_panicv(janet_ev_lasterr());
}

JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
struct kevent kev[2];

int length = 0;
if (mask & JANET_ASYNC_LISTEN_READ) {
EV_SETx(&kev[length], stream->handle, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, stream);
length++;
}
if (mask & JANET_ASYNC_LISTEN_WRITE) {
EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, stream);
length++;
if (!(stream->flags & JANET_STREAM_REGISTERED)) {
struct kevent kev;
EV_SETx(&kev, stream->handle, EVFILT_READ | EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, stream);
int status;
do {
status = kevent(janet_vm.kq, events, length, NULL, 0, NULL);
} while (status == -1 errno != EINTR);
if (status == -1) {
janet_panicv(janet_ev_lasterr());
}
stream->flags |= JANET_STREAM_REGISTERED;
}

janet_assert(length, "expected to add kqueue events");
add_kqueue_events(kev, length);

return state;
}

static void janet_unlisten(JanetListenerState *state) {
JanetStream *stream = state->stream;
if (stream && (stream->handle != -1)) {
int is_read = (stream->read_state != state) && stream->read_state;
int is_write = (stream->write_state != state) && stream->write_state;
int is_last = !is_read && !is_write;
int op = is_last ? EV_DELETE : EV_DISABLE | EV_ADD;
struct kevent kev[2];
EV_SETx(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream);

int length = 0;
if (stream->read_state == state) {
EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
length++;
}
if (stream->write_state == state) {
EV_SETx(&kev[length], stream->handle, EVFILT_READ, op, 0, 0, stream);
length++;
}

add_kqueue_events(kev, length);
}
janet_unlisten_impl(state);
}

Expand Down Expand Up @@ -2375,6 +2310,7 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
return JANET_ASYNC_STATUS_DONE;
}
case JANET_ASYNC_EVENT_HUP:
case JANET_ASYNC_EVENT_USER:
case JANET_ASYNC_EVENT_READ: {
JanetBuffer *buffer = state->buf;
int32_t bytes_left = state->bytes_left;
Expand Down Expand Up @@ -2457,11 +2393,11 @@ static void janet_ev_read_generic(JanetStream *stream, JanetBuffer *buf, int32_t
state->bytes_read = 0;
state->mode = mode;
#ifdef JANET_WINDOWS
ev_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
state->flags = (DWORD) flags;
#else
state->flags = flags;
#endif
ev_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
}

void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
Expand Down Expand Up @@ -2605,6 +2541,7 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
case JANET_ASYNC_EVENT_HUP:
janet_cancel(s->fiber, janet_cstringv("stream hup"));
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_USER:
case JANET_ASYNC_EVENT_WRITE: {
int32_t start, len;
const uint8_t *bytes;
Expand Down Expand Up @@ -2676,11 +2613,11 @@ static void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_ab
state->mode = mode;
#ifdef JANET_WINDOWS
state->flags = (DWORD) flags;
ev_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
#else
state->start = 0;
state->flags = flags;
#endif
state->start = 0;
ev_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
}

void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf) {
Expand Down
7 changes: 4 additions & 3 deletions src/core/net.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,7 @@ static void net_sched_connect(JanetStream *stream) {
JanetListenerState *s = janet_listen(stream, net_machine_connect, JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateConnect), NULL);
NetStateConnect *state = (NetStateConnect *)s;
state->did_connect = 0;
#ifdef JANET_WINDOWS
net_machine_connect(s, JANET_ASYNC_EVENT_USER);
#endif
}

/* State machine for accepting connections. */
Expand Down Expand Up @@ -280,6 +278,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
case JANET_ASYNC_EVENT_CLOSE:
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_USER:
case JANET_ASYNC_EVENT_READ: {
#if defined(JANET_LINUX)
JSock connfd = accept4(s->stream->handle, NULL, NULL, SOCK_CLOEXEC);
Expand Down Expand Up @@ -307,8 +306,10 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
}

JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) {
NetStateAccept *state = (NetStateAccept *) janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL);
JanetListenerState *s = janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL);
NetStateAccept *state = (NetStateAccept *) s;
state->function = fun;
net_machine_accept(s, JANET_ASYNC_EVENT_USER);
janet_await();
}

Expand Down
2 changes: 1 addition & 1 deletion src/include/janet.h
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ typedef void *JanetAbstract;

#define JANET_STREAM_CLOSED 0x1
#define JANET_STREAM_SOCKET 0x2
#define JANET_STREAM_IOCP 0x4
#define JANET_STREAM_REGISTERED 0x4
#define JANET_STREAM_READABLE 0x200
#define JANET_STREAM_WRITABLE 0x400
#define JANET_STREAM_ACCEPTABLE 0x800
Expand Down

0 comments on commit 5f2e287

Please sign in to comment.