Skip to content

Commit

Permalink
Fix large file sending with kqueue backend
Browse files Browse the repository at this point in the history
  • Loading branch information
trikko committed Dec 26, 2024
1 parent 8cb63f5 commit a4ad99a
Showing 1 changed file with 24 additions and 3 deletions.
27 changes: 24 additions & 3 deletions source/serverino/communicator.d
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ package class Communicator
static if (serverino.common.Backend == BackendType.EPOLL)
Daemon.epollRemoveSocket(clientSktHandle);
else static if (serverino.common.Backend == BackendType.KQUEUE)
Daemon.addKqueueChange(clientSktHandle, EVFILT_READ | EVFILT_WRITE, EV_DELETE | EV_DISABLE, null);
{
Daemon.addKqueueChange(clientSktHandle, EVFILT_READ, EV_DELETE | EV_DISABLE, null);
Daemon.addKqueueChange(clientSktHandle, EVFILT_WRITE, EV_DELETE | EV_DISABLE, null);
}

// Remove the communicator from the list of alives
if (prev !is null) prev.next = next;
Expand Down Expand Up @@ -190,7 +193,10 @@ package class Communicator
static if (serverino.common.Backend == BackendType.EPOLL)
Daemon.epollAddSocket(clientSktHandle, EPOLLIN, cast(void*) this);
else static if (serverino.common.Backend == BackendType.KQUEUE)
{
Daemon.addKqueueChange(clientSktHandle, EVFILT_WRITE, EV_DELETE | EV_DISABLE, cast(void*) this);
Daemon.addKqueueChange(clientSktHandle, EVFILT_READ, EV_ADD | EV_ENABLE, cast(void*) this);
}
}
else assert(false);
}
Expand Down Expand Up @@ -261,7 +267,10 @@ package class Communicator
static if (serverino.common.Backend == BackendType.EPOLL)
Daemon.epollEditSocket(clientSktHandle, EPOLLIN, cast(void*) this);
else static if (serverino.common.Backend == BackendType.KQUEUE)
{
Daemon.addKqueueChange(clientSktHandle, EVFILT_WRITE, EV_DELETE | EV_DISABLE, cast(void*) this);
Daemon.addKqueueChange(clientSktHandle, EVFILT_READ, EV_ADD | EV_ENABLE, cast(void*) this);
}
}

hasBuffer = false;
Expand Down Expand Up @@ -352,6 +361,9 @@ package class Communicator
reset();
return;
}

warning("BLOCKED buffer");
return;
}
else
{
Expand Down Expand Up @@ -408,7 +420,10 @@ package class Communicator
static if (serverino.common.Backend == BackendType.EPOLL)
Daemon.epollEditSocket(clientSktHandle, EPOLLIN, cast(void*) this);
else static if (serverino.common.Backend == BackendType.KQUEUE)
{
Daemon.addKqueueChange(clientSktHandle, EVFILT_WRITE, EV_DELETE | EV_DISABLE, cast(void*) this);
Daemon.addKqueueChange(clientSktHandle, EVFILT_READ, EV_ADD | EV_ENABLE, cast(void*) this);
}
}

bufferSent = 0;
Expand Down Expand Up @@ -485,7 +500,10 @@ package class Communicator
static if(serverino.common.Backend == BackendType.EPOLL)
Daemon.epollEditSocket(clientSktHandle, EPOLLIN | EPOLLOUT, cast(void*) this);
else static if(serverino.common.Backend == BackendType.KQUEUE)
Daemon.addKqueueChange(clientSktHandle, EVFILT_READ | EVFILT_WRITE, EV_ADD | EV_ENABLE, cast(void*) this);
{
Daemon.addKqueueChange(clientSktHandle, EVFILT_READ, EV_DELETE | EV_DISABLE, cast(void*) this);
Daemon.addKqueueChange(clientSktHandle, EVFILT_WRITE, EV_ADD | EV_ENABLE, cast(void*) this);
}

onWriteAvailable();
}
Expand Down Expand Up @@ -514,7 +532,10 @@ package class Communicator
static if(serverino.common.Backend == BackendType.EPOLL)
Daemon.epollEditSocket(clientSktHandle, EPOLLIN | EPOLLOUT, cast(void*) this);
else static if(serverino.common.Backend == BackendType.KQUEUE)
Daemon.addKqueueChange(clientSktHandle, EVFILT_READ | EVFILT_WRITE, EV_ADD | EV_ENABLE, cast(void*) this);
{
Daemon.addKqueueChange(clientSktHandle, EVFILT_READ, EV_DELETE | EV_DISABLE, cast(void*) this);
Daemon.addKqueueChange(clientSktHandle, EVFILT_WRITE, EV_ADD | EV_ENABLE, cast(void*) this);
}
}

// If the response is completed, unset the worker
Expand Down

0 comments on commit a4ad99a

Please sign in to comment.