Skip to content

Commit

Permalink
First try, adding kqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
trikko committed Dec 24, 2024
1 parent c2cb42c commit 919518a
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 3 deletions.
70 changes: 68 additions & 2 deletions source/serverino/common.d
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,20 @@ import std.datetime : MonoTimeImpl, ClockType;
// Serverino can be built using two different backends: select or epoll
public enum BackendType
{
SELECT,
EPOLL
SELECT = "select",
EPOLL = "epoll",
KQUEUE = "kqueue"
}

// The backend is selected using the version directive or by checking the OS
version(use_select) { enum Backend = BackendType.SELECT; }
else version(use_epoll) { enum Backend = BackendType.EPOLL; }
else version(use_kqueue) { enum Backend = BackendType.KQUEUE; }
else {
version(linux) enum Backend = BackendType.EPOLL;
else version(BSD) enum Backend = BackendType.EPOLL;
else version(OSX) enum Backend = BackendType.EPOLL;
else version(Windows) enum Backend = BackendType.SELECT;
else enum Backend = BackendType.SELECT;
}

Expand All @@ -48,6 +53,67 @@ static if(Backend == BackendType.EPOLL)
else static assert(false, "epoll backend is only available on Linux");
}

static if (Backend == BackendType.KQUEUE)
{
version(linux) enum IS_KQUEUE_AVAILABLE = true;
else version(BSD) enum IS_KQUEUE_AVAILABLE = true;
else version(OSX) enum IS_KQUEUE_AVAILABLE = true;
else enum IS_KQUEUE_AVAILABLE = false;

static if (IS_KQUEUE_AVAILABLE)
{
struct timespec {
long tv_sec; // seconds
long tv_nsec; // nanoseconds
}

extern(C)
{
alias uintptr_t = size_t;
alias intptr_t = ptrdiff_t;

struct kevent {
uintptr_t ident;
short filter;
ushort flags;
uint fflags;
intptr_t data;
void* udata;
}

enum EVFILT_READ = -1;
enum EVFILT_WRITE = -2;

enum EV_ADD = 0x0001;
enum EV_DELETE = 0x0002;
enum EV_ENABLE = 0x0004;
enum EV_DISABLE = 0x0008;

void EV_SET(ref kevent kevp, uintptr_t ident, short filter, ushort flags, uint fflags, intptr_t data, void* udata) {
kevp.ident = ident;
kevp.filter = filter;
kevp.flags = flags;
kevp.fflags = fflags;
kevp.data = data;
kevp.udata = udata;
}

int kqueue();

pragma(mangle, "kevent")
int kevent_f(
int kq,
const kevent* changelist,
int nchanges,
kevent* eventlist,
int nevents,
const timespec* timeout
);
}
}
else static assert(false, "kqueue backend is only available on Linux and BSD");
}

// The time type used in the serverino library
alias CoarseTime = MonoTimeImpl!(ClockType.coarse);

Expand Down
7 changes: 7 additions & 0 deletions source/serverino/communicator.d
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ package class Communicator
import serverino.daemon : Daemon;
Daemon.epollRemoveSocket(clientSkt);
}
else static if (serverino.common.Backend == BackendType.KQUEUE) {
import serverino.daemon : Daemon;
Daemon.changeList ~= kevent(clientSkt.handle, EVFILT_READ, EV_DELETE, 0, 0, cast(void*) this);
}

// Remove the communicator from the list of alives
if (prev !is null) prev.next = next;
Expand Down Expand Up @@ -185,6 +189,9 @@ package class Communicator
import serverino.daemon : Daemon;
import core.sys.linux.epoll : EPOLLIN;
Daemon.epollAddSocket(s, EPOLLIN, cast(void*) this);
} else static if (serverino.common.Backend == BackendType.KQUEUE) {
import serverino.daemon : Daemon;
Daemon.changeList ~= kevent(s.handle, EVFILT_READ, EV_ADD, 0, 0, cast(void*) this);
}
}
else assert(false);
Expand Down
74 changes: 73 additions & 1 deletion source/serverino/daemon.d
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ package class WorkerInfo
import core.sys.linux.epoll : EPOLLIN;
Daemon.epollAddSocket(accepted, EPOLLIN, cast(void*) this);
}
else static if (serverino.common.Backend == BackendType.KQUEUE) {
import serverino.daemon : Daemon;
Daemon.changeList ~= kevent(accepted.handle, EVFILT_READ, EV_ADD, 0, 0, cast(void*) this);
}

setStatus(WorkerInfo.State.IDLING);
}
Expand Down Expand Up @@ -465,7 +469,7 @@ package:

daemonThread = Thread.getThis();

info("Daemon started. [backend=", Backend == Backend.EPOLL ? "epoll" : "select", ";thread=", daemonThread.isMainThread ? "main" : "secondary", "]");
info("Daemon started. [backend=", cast(string)(Backend), ";thread=", daemonThread.isMainThread ? "main" : "secondary", "]");
now = CoarseTime.currTime;

version(Posix)
Expand All @@ -482,6 +486,12 @@ package:
tryInit!Modules();

static if (serverino.common.Backend == BackendType.EPOLL) epoll = epoll_create1(0);
else static if (serverino.common.Backend == BackendType.KQUEUE)
{
kq = kqueue();
if (kq == -1) throw new Exception("Failed to create kqueue");
eventList.length = 1024;
}

// Starting all the listeners.
foreach(ref listener; config.listeners)
Expand Down Expand Up @@ -546,6 +556,12 @@ package:
}

static if (serverino.common.Backend == BackendType.EPOLL) epollAddSocket(listener.socket, EPOLLIN, cast(void*)listener);
else static if (serverino.common.Backend == BackendType.KQUEUE)
{
kevent evt;
EV_SET(evt, listener.socket.handle, EVFILT_READ, EV_ADD, 0, 0, cast(void*)listener);
kevent_f(kq, &evt, 1, null, 0, null);
}
}

ThreadBase mainThread;
Expand Down Expand Up @@ -625,6 +641,17 @@ package:
epoll_event[MAX_EPOLL_EVENTS] events = void;
long updates = epoll_wait(epoll, events.ptr, MAX_EPOLL_EVENTS, 1000);
}
else static if (serverino.common.Backend == BackendType.KQUEUE) {

import core.stdc.stdlib : exit;
debug import std.stdio : writeln;

auto timeout = timespec(1, 0);
int updates = kevent_f(kq, changeList.ptr, cast(int)changeList.length, eventList.ptr, cast(int)eventList.length, &timeout);
if (updates == -1) throw new Exception("kevent error");

changeList.length = 0;
}

now = CoarseTime.currTime;

Expand Down Expand Up @@ -815,6 +842,44 @@ package:
}
}

// ------------------------
// Kqueue version main loop
// ------------------------

else static if (serverino.common.Backend == BackendType.KQUEUE) {

foreach(ref kevent e; eventList[0..updates])
{
Object o = cast(Object)(cast(void*) e.udata);

Communicator communicator = cast(Communicator)(o);
if (communicator !is null)
{
if (communicator.clientSkt !is null && (e.filter == EVFILT_READ))
communicator.onReadAvailable();

if (communicator.clientSkt !is null && (e.filter == EVFILT_WRITE))
communicator.onWriteAvailable();

continue;
}

WorkerInfo worker = cast(WorkerInfo)(o);
if (worker !is null)
{
worker.onReadAvailable();
continue;
}

Listener listener = cast(Listener)(o);
if (listener !is null)
{
listener.onConnectionAvailable();
continue;
}
}
}

// Check if we have some free workers and some waiting communicators.
if (Communicator.execWaitingListFront !is null)
{
Expand Down Expand Up @@ -905,6 +970,8 @@ package:
// Delete the canary file.
removeCanary();

//static if (serverino.common.Backend == BackendType.KQUEUE) close(kq);

info("Daemon shutdown completed. Goodbye!");
}

Expand Down Expand Up @@ -939,6 +1006,11 @@ package:

int epoll;
}
else static if (serverino.common.Backend == BackendType.KQUEUE) {
int kq; // File descriptor per kqueue
kevent[] changeList;
kevent[] eventList;
}

private __gshared:

Expand Down

0 comments on commit 919518a

Please sign in to comment.