Skip to content

Commit

Permalink
Merge pull request #110 from jhawthorn/better_timeout_handling
Browse files Browse the repository at this point in the history
Improve wait callback and timeout handling
  • Loading branch information
jhawthorn authored Aug 4, 2023
2 parents 4090a79 + fe9a7ae commit a1f46bb
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 27 deletions.
98 changes: 72 additions & 26 deletions contrib/ruby/ext/trilogy-ruby/cext.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

#include "trilogy-ruby.h"

#define TRILOGY_RB_TIMEOUT 1

VALUE Trilogy_CastError;
static VALUE Trilogy_BaseConnectionError, Trilogy_ProtocolError, Trilogy_SSLError, Trilogy_QueryError,
Trilogy_ConnectionClosedError, Trilogy_ConnectionRefusedError, Trilogy_ConnectionResetError,
Expand Down Expand Up @@ -114,6 +112,12 @@ static void handle_trilogy_error(struct trilogy_ctx *ctx, int rc, const char *ms
case TRILOGY_SYSERR:
trilogy_syserr_fail_str(errno, rbmsg);

case TRILOGY_TIMEOUT:
if (ctx->conn.socket != NULL) {
trilogy_sock_shutdown(ctx->conn.socket);
}
rb_raise(Trilogy_TimeoutError, "%" PRIsVALUE, rbmsg);

case TRILOGY_ERR: {
VALUE message = rb_str_new(ctx->conn.error_message, ctx->conn.error_message_len);
VALUE exc = rb_funcall(Trilogy_ProtocolError, id_from_code, 2, message, INT2NUM(ctx->conn.error_code));
Expand Down Expand Up @@ -169,8 +173,9 @@ static int flush_writes(struct trilogy_ctx *ctx)
return rc;
}

if (trilogy_sock_wait_write(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_flush_writes");
rc = trilogy_sock_wait_write(ctx->conn.socket);
if (rc != TRILOGY_OK) {
return rc;
}
}
}
Expand All @@ -190,6 +195,21 @@ static double timeval_to_double(struct timeval tv)
return (double)tv.tv_sec + ((double)tv.tv_usec) / 1000000.0;
}

struct rb_trilogy_wait_args {
struct timeval *timeout;
int wait_flag;
int fd;
int rc;
};

static VALUE rb_trilogy_wait_protected(VALUE vargs) {
struct rb_trilogy_wait_args *args = (void *)vargs;

args->rc = rb_wait_for_single_fd(args->fd, args->wait_flag, args->timeout);

return Qnil;
}

static int _cb_ruby_wait(trilogy_sock_t *sock, trilogy_wait_t wait)
{
struct timeval *timeout = NULL;
Expand Down Expand Up @@ -219,11 +239,27 @@ static int _cb_ruby_wait(trilogy_sock_t *sock, trilogy_wait_t wait)
timeout = NULL;
}

int fd = trilogy_sock_fd(sock);
if (rb_wait_for_single_fd(fd, wait_flag, timeout) <= 0)
struct rb_trilogy_wait_args args;
args.fd = trilogy_sock_fd(sock);
args.wait_flag = wait_flag;
args.timeout = timeout;
args.rc = 0;

int state = 0;
rb_protect(rb_trilogy_wait_protected, (VALUE)&args, &state);
if (state) {
trilogy_sock_shutdown(sock);
rb_jump_tag(state);
}

// rc here comes from rb_wait_for_single_fd which (similar to poll(3)) returns 0 to indicate that the call timed out
// or -1 to indicate a system error with errno set.
if (args.rc < 0)
return TRILOGY_SYSERR;
if (args.rc == 0)
return TRILOGY_TIMEOUT;

return 0;
return TRILOGY_OK;
}

struct nogvl_sock_args {
Expand Down Expand Up @@ -275,8 +311,10 @@ escape the GVL on each wait operation without going through call_without_gvl */
return rc;
}

if (trilogy_sock_wait(ctx->conn.socket, TRILOGY_WAIT_HANDSHAKE) < 0)
return TRILOGY_RB_TIMEOUT;
rc = trilogy_sock_wait(ctx->conn.socket, TRILOGY_WAIT_HANDSHAKE);
if (rc != TRILOGY_OK) {
return rc;
}
}
}

Expand All @@ -303,8 +341,9 @@ static void auth_switch(struct trilogy_ctx *ctx, trilogy_handshake_t *handshake)
handle_trilogy_error(ctx, rc, "trilogy_auth_recv");
}

if (trilogy_sock_wait_read(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_auth_recv");
rc = trilogy_sock_wait_read(ctx->conn.socket);
if (rc != TRILOGY_OK) {
handle_trilogy_error(ctx, rc, "trilogy_auth_recv");
}
}
}
Expand Down Expand Up @@ -356,8 +395,9 @@ static void authenticate(struct trilogy_ctx *ctx, trilogy_handshake_t *handshake
handle_trilogy_error(ctx, rc, "trilogy_auth_recv");
}

if (trilogy_sock_wait_read(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_auth_recv");
rc = trilogy_sock_wait_read(ctx->conn.socket);
if (rc != TRILOGY_OK) {
handle_trilogy_error(ctx, rc, "trilogy_auth_recv");
}
}

Expand Down Expand Up @@ -519,7 +559,7 @@ static VALUE rb_trilogy_initialize(VALUE self, VALUE encoding, VALUE charset, VA
}

int rc = try_connect(ctx, &handshake, &connopt);
if (rc == TRILOGY_RB_TIMEOUT) {
if (rc == TRILOGY_TIMEOUT) {
rb_raise(Trilogy_TimeoutError, "trilogy_connect_recv");
}
if (rc != TRILOGY_OK) {
Expand Down Expand Up @@ -566,8 +606,9 @@ static VALUE rb_trilogy_change_db(VALUE self, VALUE database)
handle_trilogy_error(ctx, rc, "trilogy_change_db_recv");
}

if (trilogy_sock_wait_read(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_change_db_recv");
rc = trilogy_sock_wait_read(ctx->conn.socket);
if (rc != TRILOGY_OK) {
handle_trilogy_error(ctx, rc, "trilogy_change_db_recv");
}
}

Expand Down Expand Up @@ -599,8 +640,9 @@ static VALUE rb_trilogy_set_server_option(VALUE self, VALUE option)
handle_trilogy_error(ctx, rc, "trilogy_set_option_recv");
}

if (trilogy_sock_wait_read(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_set_option_recv");
rc = trilogy_sock_wait_read(ctx->conn.socket);
if (rc != TRILOGY_OK) {
handle_trilogy_error(ctx, rc, "trilogy_set_option_recv");
}
}

Expand Down Expand Up @@ -672,8 +714,9 @@ static VALUE read_query_response(VALUE vargs)
return read_query_error(args, rc, "trilogy_query_recv");
}

if (trilogy_sock_wait_read(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_query_recv");
rc = trilogy_sock_wait_read(ctx->conn.socket);
if (rc != TRILOGY_OK) {
handle_trilogy_error(ctx, rc, "trilogy_query_recv");
}
}

Expand Down Expand Up @@ -721,8 +764,9 @@ static VALUE read_query_response(VALUE vargs)
return read_query_error(args, rc, "trilogy_read_column");
}

if (trilogy_sock_wait_read(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_read_column");
rc = trilogy_sock_wait_read(ctx->conn.socket);
if (rc != TRILOGY_OK) {
return read_query_error(args, rc, "trilogy_read_column");
}
}

Expand All @@ -749,8 +793,9 @@ static VALUE read_query_response(VALUE vargs)
int rc = trilogy_read_row(&ctx->conn, row_values);

if (rc == TRILOGY_AGAIN) {
if (trilogy_sock_wait_read(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_read_row");
rc = trilogy_sock_wait_read(ctx->conn.socket);
if (rc != TRILOGY_OK) {
return read_query_error(args, rc, "trilogy_read_row");
}
continue;
}
Expand Down Expand Up @@ -876,8 +921,9 @@ static VALUE rb_trilogy_ping(VALUE self)
handle_trilogy_error(ctx, rc, "trilogy_ping_recv");
}

if (trilogy_sock_wait_read(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_ping_recv");
rc = trilogy_sock_wait_read(ctx->conn.socket);
if (rc != TRILOGY_OK) {
handle_trilogy_error(ctx, rc, "trilogy_ping_recv");
}
}

Expand Down
3 changes: 2 additions & 1 deletion inc/trilogy/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
XX(TRILOGY_DNS_ERR, -18) \
XX(TRILOGY_AUTH_SWITCH, -19) \
XX(TRILOGY_MAX_PACKET_EXCEEDED, -20) \
XX(TRILOGY_UNKNOWN_TYPE, -21)
XX(TRILOGY_UNKNOWN_TYPE, -21) \
XX(TRILOGY_TIMEOUT, -22)

enum {
#define XX(name, code) name = code,
Expand Down

0 comments on commit a1f46bb

Please sign in to comment.