Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add idle timeout support for logreader based sources #355

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/cfg-grammar.y
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ main_location_print (FILE *yyo, YYLTYPE const * const yylocp)
%token KW_BATCH_LINES 10087
%token KW_BATCH_TIMEOUT 10088
%token KW_TRIM_LARGE_MESSAGES 10089

%token KW_STATS 10400
%token KW_FREQ 10401
%token KW_LEVEL 10402
Expand All @@ -273,6 +274,7 @@ main_location_print (FILE *yyo, YYLTYPE const * const yylocp)
%token KW_CHECK_HOSTNAME 10093
%token KW_BAD_HOSTNAME 10094
%token KW_LOG_LEVEL 10095
%token KW_IDLE_TIMEOUT 10096

%token KW_KEEP_TIMESTAMP 10100

Expand Down Expand Up @@ -1507,6 +1509,7 @@ source_proto_option
free($3);
}
| KW_LOG_MSG_SIZE '(' positive_integer ')' { last_proto_server_options->max_msg_size = $3; }
| KW_IDLE_TIMEOUT '(' positive_integer ')' { last_proto_server_options->idle_timeout = $3; }
| KW_TRIM_LARGE_MESSAGES '(' yesno ')' { last_proto_server_options->trim_large_messages = $3; }
;

Expand Down
1 change: 1 addition & 0 deletions lib/cfg-parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ static CfgLexerKeyword main_keywords[] =
{ "log_iw_size", KW_LOG_IW_SIZE },
{ "log_msg_size", KW_LOG_MSG_SIZE },
{ "trim_large_messages", KW_TRIM_LARGE_MESSAGES },
{ "idle_timeout", KW_IDLE_TIMEOUT },
{ "log_prefix", KW_LOG_PREFIX, KWS_OBSOLETE, "program_override" },
{ "program_override", KW_PROGRAM_OVERRIDE },
{ "host_override", KW_HOST_OVERRIDE },
Expand Down
6 changes: 3 additions & 3 deletions lib/logproto/logproto-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,20 @@ log_proto_client_options_set_drop_input(LogProtoClientOptions *options, gboolean
void
log_proto_client_options_set_timeout(LogProtoClientOptions *options, gint timeout)
{
options->timeout = timeout;
options->idle_timeout = timeout;
}

gint
log_proto_client_options_get_timeout(LogProtoClientOptions *options)
{
return options->timeout;
return options->idle_timeout;
}

void
log_proto_client_options_defaults(LogProtoClientOptions *options)
{
options->drop_input = FALSE;
options->timeout = 0;
options->idle_timeout = 0;
}

void
Expand Down
8 changes: 6 additions & 2 deletions lib/logproto/logproto-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ typedef struct _LogProtoClient LogProtoClient;
typedef struct _LogProtoClientOptions
{
gboolean drop_input;
gint timeout;
gint idle_timeout;
} LogProtoClientOptions;

typedef union _LogProtoClientOptionsStorage
Expand Down Expand Up @@ -126,7 +126,11 @@ log_proto_client_handshake(LogProtoClient *s, gboolean *handshake_finished)
static inline gboolean
log_proto_client_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout)
{
return s->prepare(s, fd, cond, timeout);
gboolean result = s->prepare(s, fd, cond, timeout);

if (!result && *timeout < 0)
*timeout = s->options->idle_timeout;
return result;
}

static inline LogProtoStatus
Expand Down
1 change: 1 addition & 0 deletions lib/logproto/logproto-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ log_proto_server_options_defaults(LogProtoServerOptions *options)
options->trim_large_messages = -1;
options->init_buffer_size = -1;
options->max_buffer_size = -1;
options->idle_timeout = -1;
options->ack_tracker_factory = instant_ack_tracker_bookmarkless_factory_new();
}

Expand Down
9 changes: 7 additions & 2 deletions lib/logproto/logproto-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ struct _LogProtoServerOptions
gboolean trim_large_messages;
gint max_buffer_size;
gint init_buffer_size;
gint idle_timeout;
AckTrackerFactory *ack_tracker_factory;
};

Expand Down Expand Up @@ -115,10 +116,14 @@ log_proto_server_set_options(LogProtoServer *self, const LogProtoServerOptions *
self->options = options;
}

static inline gboolean
static inline LogProtoPrepareAction
log_proto_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout)
{
return s->prepare(s, cond, timeout);
LogProtoPrepareAction result = s->prepare(s, cond, timeout);

if (result == LPPA_POLL_IO && *timeout < 0)
*timeout = s->options->idle_timeout;
return result;
}

static inline gboolean
Expand Down
7 changes: 1 addition & 6 deletions lib/logproto/logproto-text-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ log_proto_text_client_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, g
if (*cond == 0)
*cond = G_IO_OUT;

const gboolean pending_write = self->partial != NULL;

if (!pending_write && s->options->timeout > 0)
*timeout = s->options->timeout;

return pending_write;
return self->partial != NULL;
}

static LogProtoStatus
Expand Down
7 changes: 1 addition & 6 deletions modules/affile/logproto-file-writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,7 @@ log_proto_file_writer_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, g
/* if there's no pending I/O in the transport layer, then we want to do a write */
if (*cond == 0)
*cond = G_IO_OUT;
const gboolean pending_write = self->buf_count > 0 || self->partial;

if (!pending_write && s->options->timeout > 0)
*timeout = s->options->timeout;

return pending_write;
return self->buf_count > 0 || self->partial;
}

LogProtoClient *
Expand Down
Loading