Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve wait callback and timeout handling #110

Merged
merged 5 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 72 additions & 26 deletions contrib/ruby/ext/trilogy-ruby/cext.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

#include "trilogy-ruby.h"

#define TRILOGY_RB_TIMEOUT 1

VALUE Trilogy_CastError;
static VALUE Trilogy_BaseConnectionError, Trilogy_ProtocolError, Trilogy_SSLError, Trilogy_QueryError,
Trilogy_ConnectionClosedError, Trilogy_ConnectionRefusedError, Trilogy_ConnectionResetError,
Expand Down Expand Up @@ -114,6 +112,12 @@ static void handle_trilogy_error(struct trilogy_ctx *ctx, int rc, const char *ms
case TRILOGY_SYSERR:
trilogy_syserr_fail_str(errno, rbmsg);

case TRILOGY_TIMEOUT:
if (ctx->conn.socket != NULL) {
trilogy_sock_shutdown(ctx->conn.socket);
}
rb_raise(Trilogy_TimeoutError, "%" PRIsVALUE, rbmsg);

case TRILOGY_ERR: {
VALUE message = rb_str_new(ctx->conn.error_message, ctx->conn.error_message_len);
VALUE exc = rb_funcall(Trilogy_ProtocolError, id_from_code, 2, message, INT2NUM(ctx->conn.error_code));
Expand Down Expand Up @@ -169,8 +173,9 @@ static int flush_writes(struct trilogy_ctx *ctx)
return rc;
}

if (trilogy_sock_wait_write(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_flush_writes");
rc = trilogy_sock_wait_write(ctx->conn.socket);
if (rc != TRILOGY_OK) {
return rc;
}
}
}
Expand All @@ -190,6 +195,21 @@ static double timeval_to_double(struct timeval tv)
return (double)tv.tv_sec + ((double)tv.tv_usec) / 1000000.0;
}

struct rb_trilogy_wait_args {
struct timeval *timeout;
int wait_flag;
int fd;
int rc;
};

static VALUE rb_trilogy_wait_protected(VALUE vargs) {
struct rb_trilogy_wait_args *args = (void *)vargs;

args->rc = rb_wait_for_single_fd(args->fd, args->wait_flag, args->timeout);

return Qnil;
}

static int _cb_ruby_wait(trilogy_sock_t *sock, trilogy_wait_t wait)
{
struct timeval *timeout = NULL;
Expand Down Expand Up @@ -219,11 +239,27 @@ static int _cb_ruby_wait(trilogy_sock_t *sock, trilogy_wait_t wait)
timeout = NULL;
}

int fd = trilogy_sock_fd(sock);
if (rb_wait_for_single_fd(fd, wait_flag, timeout) <= 0)
struct rb_trilogy_wait_args args;
args.fd = trilogy_sock_fd(sock);
args.wait_flag = wait_flag;
args.timeout = timeout;
args.rc = 0;

int state = 0;
rb_protect(rb_trilogy_wait_protected, (VALUE)&args, &state);
if (state) {
trilogy_sock_shutdown(sock);
rb_jump_tag(state);
}

// rc here comes from rb_wait_for_single_fd which (similar to poll(3)) returns 0 to indicate that the call timed out
// or -1 to indicate a system error with errno set.
if (args.rc < 0)
return TRILOGY_SYSERR;
if (args.rc == 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got confused for a second because in most other places rc == 0 means TRILOGY_OK. I understand now that this is the return value of rb_wait_for_single_fd, where 0 means a timeout: https://github.com/ruby/ruby/blob/1642e0c39220e95ddb16b4cbbbe78f24507dfd48/include/ruby/io.h#L902. Possibly worth an inline comment in addition to what you have in the commit message?

return TRILOGY_TIMEOUT;

return 0;
return TRILOGY_OK;
}

struct nogvl_sock_args {
Expand Down Expand Up @@ -275,8 +311,10 @@ escape the GVL on each wait operation without going through call_without_gvl */
return rc;
}

if (trilogy_sock_wait(ctx->conn.socket, TRILOGY_WAIT_HANDSHAKE) < 0)
return TRILOGY_RB_TIMEOUT;
rc = trilogy_sock_wait(ctx->conn.socket, TRILOGY_WAIT_HANDSHAKE);
if (rc != TRILOGY_OK) {
return rc;
}
}
}

Expand All @@ -303,8 +341,9 @@ static void auth_switch(struct trilogy_ctx *ctx, trilogy_handshake_t *handshake)
handle_trilogy_error(ctx, rc, "trilogy_auth_recv");
}

if (trilogy_sock_wait_read(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_auth_recv");
rc = trilogy_sock_wait_read(ctx->conn.socket);
if (rc != TRILOGY_OK) {
handle_trilogy_error(ctx, rc, "trilogy_auth_recv");
}
}
}
Expand Down Expand Up @@ -356,8 +395,9 @@ static void authenticate(struct trilogy_ctx *ctx, trilogy_handshake_t *handshake
handle_trilogy_error(ctx, rc, "trilogy_auth_recv");
}

if (trilogy_sock_wait_read(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_auth_recv");
rc = trilogy_sock_wait_read(ctx->conn.socket);
if (rc != TRILOGY_OK) {
handle_trilogy_error(ctx, rc, "trilogy_auth_recv");
}
}

Expand Down Expand Up @@ -519,7 +559,7 @@ static VALUE rb_trilogy_initialize(VALUE self, VALUE encoding, VALUE charset, VA
}

int rc = try_connect(ctx, &handshake, &connopt);
if (rc == TRILOGY_RB_TIMEOUT) {
if (rc == TRILOGY_TIMEOUT) {
rb_raise(Trilogy_TimeoutError, "trilogy_connect_recv");
}
if (rc != TRILOGY_OK) {
Expand Down Expand Up @@ -566,8 +606,9 @@ static VALUE rb_trilogy_change_db(VALUE self, VALUE database)
handle_trilogy_error(ctx, rc, "trilogy_change_db_recv");
}

if (trilogy_sock_wait_read(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_change_db_recv");
rc = trilogy_sock_wait_read(ctx->conn.socket);
if (rc != TRILOGY_OK) {
handle_trilogy_error(ctx, rc, "trilogy_change_db_recv");
}
}

Expand Down Expand Up @@ -599,8 +640,9 @@ static VALUE rb_trilogy_set_server_option(VALUE self, VALUE option)
handle_trilogy_error(ctx, rc, "trilogy_set_option_recv");
}

if (trilogy_sock_wait_read(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_set_option_recv");
rc = trilogy_sock_wait_read(ctx->conn.socket);
if (rc != TRILOGY_OK) {
handle_trilogy_error(ctx, rc, "trilogy_set_option_recv");
}
}

Expand Down Expand Up @@ -672,8 +714,9 @@ static VALUE read_query_response(VALUE vargs)
return read_query_error(args, rc, "trilogy_query_recv");
}

if (trilogy_sock_wait_read(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_query_recv");
rc = trilogy_sock_wait_read(ctx->conn.socket);
if (rc != TRILOGY_OK) {
handle_trilogy_error(ctx, rc, "trilogy_query_recv");
}
}

Expand Down Expand Up @@ -721,8 +764,9 @@ static VALUE read_query_response(VALUE vargs)
return read_query_error(args, rc, "trilogy_read_column");
}

if (trilogy_sock_wait_read(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_read_column");
rc = trilogy_sock_wait_read(ctx->conn.socket);
if (rc != TRILOGY_OK) {
return read_query_error(args, rc, "trilogy_read_column");
}
}

Expand All @@ -749,8 +793,9 @@ static VALUE read_query_response(VALUE vargs)
int rc = trilogy_read_row(&ctx->conn, row_values);

if (rc == TRILOGY_AGAIN) {
if (trilogy_sock_wait_read(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_read_row");
rc = trilogy_sock_wait_read(ctx->conn.socket);
if (rc != TRILOGY_OK) {
return read_query_error(args, rc, "trilogy_read_row");
}
continue;
}
Expand Down Expand Up @@ -876,8 +921,9 @@ static VALUE rb_trilogy_ping(VALUE self)
handle_trilogy_error(ctx, rc, "trilogy_ping_recv");
}

if (trilogy_sock_wait_read(ctx->conn.socket) < 0) {
rb_raise(Trilogy_TimeoutError, "trilogy_ping_recv");
rc = trilogy_sock_wait_read(ctx->conn.socket);
if (rc != TRILOGY_OK) {
handle_trilogy_error(ctx, rc, "trilogy_ping_recv");
}
}

Expand Down
3 changes: 2 additions & 1 deletion inc/trilogy/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
XX(TRILOGY_DNS_ERR, -18) \
XX(TRILOGY_AUTH_SWITCH, -19) \
XX(TRILOGY_MAX_PACKET_EXCEEDED, -20) \
XX(TRILOGY_UNKNOWN_TYPE, -21)
XX(TRILOGY_UNKNOWN_TYPE, -21) \
XX(TRILOGY_TIMEOUT, -22)

enum {
#define XX(name, code) name = code,
Expand Down