Skip to content

Commit

Permalink
upstream: new property to limit the amount of active TCP connections (#…
Browse files Browse the repository at this point in the history
…7586)

By default, Fluent Bit tries to deliver data as faster as possible and create TCP connections on-demand and in keepalive mode for performance reasons. In high-scalable environments, the user might want to control how many connections are done in parallel by setting a limit.

This patch implements a new configuration property called 'net.max_connections' that can be used in the output plugins sections, so Fluent Bit won't open more than net.max_connections if it has been set. If the limit is reached, the output plugins will issue a retry.

Configuration example:

  [OUTPUT]
      name                splunk
      match               *
      net.max_worker_connections 10

Note that this feature works at upstream/plugin level, and the limit is applied for all the workers if they exists, e.g: if you have 50 workers and net.max_connections=10, only 10 connections will be allowed.

---------

Signed-off-by: Eduardo Silva <eduardo@calyptia.com>
  • Loading branch information
edsiper committed Jun 23, 2023
1 parent 41d3e17 commit bbd1688
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_network.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ struct flb_net_setup {

/* prioritize ipv4 results when trying to establish a connection*/
int dns_prefer_ipv4;

/* maximum number of allowed active TCP connections */
int max_worker_connections;
};

/* Defines a host service and it properties */
Expand Down
32 changes: 29 additions & 3 deletions src/flb_upstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ struct flb_config_map upstream_net[] = {
FLB_CONFIG_MAP_INT, "net.keepalive_max_recycle", "2000",
0, FLB_TRUE, offsetof(struct flb_net_setup, keepalive_max_recycle),
"Set maximum number of times a keepalive connection can be used "
"before it is retired."
"before it is retried."
},

{
FLB_CONFIG_MAP_INT, "net.max_worker_connections", "0",
0, FLB_TRUE, offsetof(struct flb_net_setup, max_worker_connections),
"Set the maximum number of active TCP connections that can be used per worker thread."
},

/* EOF */
Expand Down Expand Up @@ -606,6 +612,7 @@ int flb_upstream_conn_recycle(struct flb_connection *conn, int val)
struct flb_connection *flb_upstream_conn_get(struct flb_upstream *u)
{
int err;
int total_connections = 0;
struct mk_list *tmp;
struct mk_list *head;
struct flb_connection *conn;
Expand All @@ -617,12 +624,31 @@ struct flb_connection *flb_upstream_conn_get(struct flb_upstream *u)
"net.connect_timeout = %i seconds\n"
"net.source_address = %s\n"
"net.keepalive = %s\n"
"net.keepalive_idle_timeout = %i seconds",
"net.keepalive_idle_timeout = %i seconds\n"
"net.max_worker_connections = %i",
u->tcp_host, u->tcp_port,
u->base.net.connect_timeout,
u->base.net.source_address ? u->base.net.source_address: "any",
u->base.net.keepalive ? "enabled": "disabled",
u->base.net.keepalive_idle_timeout);
u->base.net.keepalive_idle_timeout,
u->base.net.max_worker_connections);


/* If the upstream is limited by max connections, check current state */
if (u->base.net.max_worker_connections > 0) {
flb_stream_acquire_lock(&u->base, FLB_TRUE);

total_connections = mk_list_size(&uq->av_queue);
total_connections += mk_list_size(&uq->busy_queue);

flb_stream_release_lock(&u->base);

if (total_connections >= u->base.net.max_worker_connections) {
flb_debug("[upstream] max worker connections=%i reached to: %s:%i, cannot connect",
u->base.net.max_worker_connections, u->tcp_host, u->tcp_port);
return NULL;
}
}

conn = NULL;

Expand Down

0 comments on commit bbd1688

Please sign in to comment.