From bfa03b0cb1b88636962f1481c118606fbb059888 Mon Sep 17 00:00:00 2001 From: machty Date: Thu, 13 Jan 2022 14:01:03 -0400 Subject: [PATCH] Add support for Ruby 3.0 Fiber Scheduler This implements support for the Ruby 3.0+ Fiber Scheduler interface. If a Fiber Scheduler (such as the one defined by the Async library) is present, then we invoke its hooks whenever we're about to perform an IO wait operation. This makes it possible for the scheduler to pass control to another Fiber/task while waiting for the socket to become readable/writable. --- ext/hiredis_ext/connection.c | 97 ++++++++++++++++++++++++++++++++++ lib/hiredis/ruby/connection.rb | 28 ++++++++-- 2 files changed, 120 insertions(+), 5 deletions(-) diff --git a/ext/hiredis_ext/connection.c b/ext/hiredis_ext/connection.c index 865c842..1da3b39 100644 --- a/ext/hiredis_ext/connection.c +++ b/ext/hiredis_ext/connection.c @@ -2,6 +2,63 @@ #include #include "hiredis_ext.h" +#ifdef HAVE_RUBY_FIBER_SCHEDULER_H +#include "ruby/fiber/scheduler.h" +#include "ruby/io.h" + +// TODO: I copied these from Ruby; are they supposed to be exposed as part of the C extension API? + +#define FMODE_PREP (1<<16) + +static int +io_check_tty(rb_io_t *fptr) +{ + int t = isatty(fptr->fd); + if (t) + fptr->mode |= FMODE_TTY|FMODE_DUPLEX; + return t; +} + +static VALUE +io_alloc(VALUE klass) +{ + NEWOBJ_OF(io, struct RFile, klass, T_FILE); + + io->fptr = 0; + + return (VALUE)io; +} + +static VALUE +prep_io(int fd, int fmode, VALUE klass, const char *path) +{ + rb_io_t *fp; + VALUE io = io_alloc(klass); + + MakeOpenFile(io, fp); + fp->self = io; + fp->fd = fd; + fp->mode = fmode; + if (!io_check_tty(fp)) { +#ifdef __CYGWIN__ + fp->mode |= FMODE_BINMODE; + setmode(fd, O_BINARY); +#endif + } + if (path) fp->pathv = rb_obj_freeze(rb_str_new_cstr(path)); + rb_update_max_fd(fd); + + return io; +} + +static VALUE +io_from_fd(int fd) +{ + return prep_io(fd, FMODE_PREP, rb_cIO, NULL); +} + +#endif + typedef struct redisParentContext { redisContext *context; struct timeval *timeout; @@ -107,6 +164,26 @@ typedef fd_set _fdset_t; #endif static int __wait_readable(int fd, const struct timeval *timeout, int *isset) { +#ifdef HAVE_RUBY_FIBER_SCHEDULER_H + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + VALUE result = rb_fiber_scheduler_io_wait(scheduler, + io_from_fd(fd), + RB_UINT2NUM(RUBY_IO_READABLE), + rb_fiber_scheduler_make_timeout((struct timeval *) timeout)); + + if (RTEST(result)) { + if (isset) { + *isset = 1; + } + return 0; + } else { + // timeout + return -1; + } + } +#endif + struct timeval to; struct timeval *toptr = NULL; @@ -137,6 +214,25 @@ static int __wait_readable(int fd, const struct timeval *timeout, int *isset) { } static int __wait_writable(int fd, const struct timeval *timeout, int *isset) { +#ifdef HAVE_RUBY_FIBER_SCHEDULER_H + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + VALUE result = rb_fiber_scheduler_io_wait(scheduler, + io_from_fd(fd), + RB_UINT2NUM(RUBY_IO_WRITABLE), + rb_fiber_scheduler_make_timeout((struct timeval *) timeout)); + if (RTEST(result)) { + if (isset) { + *isset = 1; + } + return 0; + } else { + // timeout + return -1; + } + } +#endif + struct timeval to; struct timeval *toptr = NULL; @@ -236,6 +332,7 @@ static VALUE connection_generic_connect(VALUE self, redisContext *c, VALUE arg_t rb_sys_fail(0); } +// entrypoint for Driver#connect static VALUE connection_connect(int argc, VALUE *argv, VALUE self) { redisContext *c; VALUE arg_host = Qnil; diff --git a/lib/hiredis/ruby/connection.rb b/lib/hiredis/ruby/connection.rb index bd07dc7..d41d1d5 100644 --- a/lib/hiredis/ruby/connection.rb +++ b/lib/hiredis/ruby/connection.rb @@ -5,6 +5,7 @@ module Hiredis module Ruby class Connection + EMPTY_ARRAY = [].freeze if defined?(RUBY_ENGINE) && RUBY_ENGINE == "rbx" @@ -123,14 +124,14 @@ def _write(sock, data, timeout) data.force_encoding("binary") if data.respond_to?(:force_encoding) begin - nwritten = @sock.write_nonblock(data) + nwritten = sock.write_nonblock(data) while nwritten < string_size(data) data = data[nwritten..-1] - nwritten = @sock.write_nonblock(data) + nwritten = sock.write_nonblock(data) end rescue Errno::EAGAIN - if IO.select([], [@sock], [], timeout) + if _wait_writable(sock, timeout) # Writable, try again retry else @@ -140,13 +141,29 @@ def _write(sock, data, timeout) end end + def _wait_readable(io, timeout) + if @fiber_scheduler_supported && Fiber.scheduler + Fiber.scheduler.io_wait(io, IO::READABLE, timeout) + else + IO.select([io], EMPTY_ARRAY, EMPTY_ARRAY, timeout) + end + end + + def _wait_writable(io, timeout) + if @fiber_scheduler_supported && Fiber.scheduler + Fiber.scheduler.io_wait(io, IO::WRITABLE, timeout) + else + IO.select(EMPTY_ARRAY, [io], EMPTY_ARRAY, timeout) + end + end + def _connect_sockaddr(af, sockaddr, timeout) sock = Socket.new(af, Socket::SOCK_STREAM, 0) begin sock.connect_nonblock(sockaddr) rescue Errno::EINPROGRESS - if IO.select(nil, [sock], nil, timeout) + if _wait_writable(sock, timeout) # Writable, check for errors optval = sock.getsockopt(Socket::SOL_SOCKET, Socket::SO_ERROR) errno = optval.unpack("i").first @@ -176,6 +193,7 @@ def _connect_sockaddr(af, sockaddr, timeout) def initialize @sock = nil @timeout = nil + @fiber_scheduler_supported = defined?(Fiber.scheduler) end def connected? @@ -267,7 +285,7 @@ def read begin @reader.feed @sock.read_nonblock(1024) rescue Errno::EAGAIN - if IO.select([@sock], [], [], @timeout) + if _wait_readable(@sock, @timeout) # Readable, try again retry else