Skip to content

Commit

Permalink
Add connections.max.idle.ms
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Mar 9, 2021
1 parent 51c49f6 commit c132608
Show file tree
Hide file tree
Showing 19 changed files with 395 additions and 11 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

## Fixes

## Enhancements

* Added `connections.max.idle.ms` to automatically close idle broker
connections.
This feature is disabled by default unless `bootstrap.servers` contains
the string `azure` in which case the default is set to 9 minutes to improve
connection reliability and circumvent limitations with the Azure load
balancers (see #3109 for more information).


### General fixes

* Fix accesses to freed metadata cache mutexes on client termination (#3279)
Expand Down
1 change: 1 addition & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ socket.nagle.disable | * | true, false | false
socket.max.fails | * | 0 .. 1000000 | 1 | low | Disconnect from broker when this number of send failures (e.g., timed out requests) is reached. Disable with 0. WARNING: It is highly recommended to leave this setting at its default value of 1 to avoid the client and broker to become desynchronized in case of request timeouts. NOTE: The connection is automatically re-established. <br>*Type: integer*
broker.address.ttl | * | 0 .. 86400000 | 1000 | low | How long to cache the broker address resolving results (milliseconds). <br>*Type: integer*
broker.address.family | * | any, v4, v6 | any | low | Allowed broker IP address families: any, v4, v6 <br>*Type: enum value*
connections.max.idle.ms | * | 0 .. 2147483647 | 0 | medium | Close broker connections after the specified time of inactivity. Disable with 0. If this property is left at its default value some heuristics are performed to determine a suitable default value, this is currently limited to identifying brokers on Azure (see librdkafka issue #3109 for more info). <br>*Type: integer*
reconnect.backoff.jitter.ms | * | 0 .. 3600000 | 0 | low | **DEPRECATED** No longer used. See `reconnect.backoff.ms` and `reconnect.backoff.max.ms`. <br>*Type: integer*
reconnect.backoff.ms | * | 0 .. 3600000 | 100 | medium | The initial time to wait before reconnecting to a broker after the connection has been closed. The time is increased exponentially until `reconnect.backoff.max.ms` is reached. -25% to +50% jitter is applied to each reconnect backoff. A value of 0 disables the backoff and reconnects immediately. <br>*Type: integer*
reconnect.backoff.max.ms | * | 0 .. 3600000 | 10000 | medium | The maximum time to wait before reconnecting to a broker after the connection has been closed. <br>*Type: integer*
Expand Down
2 changes: 2 additions & 0 deletions STATISTICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,14 @@ tx | int | | Total number of requests sent
txbytes | int | | Total number of bytes sent
txerrs | int | | Total number of transmission errors
txretries | int | | Total number of request retries
txidle | int | | Microseconds since last socket send (or -1 if no sends yet for current connection).
req_timeouts | int | | Total number of requests timed out
rx | int | | Total number of responses received
rxbytes | int | | Total number of bytes received
rxerrs | int | | Total number of receive errors
rxcorriderrs | int | | Total number of unmatched correlation ids in response (typically for timed out requests)
rxpartial | int | | Total number of partial MessageSets received. The broker may return partial responses if the full MessageSet could not fit in the remaining Fetch response size.
rxidle | int | | Microseconds since last socket receive (or -1 if no receives yet for current connection).
req | object | | Request type counters. Object key is the request name, value is the number of requests sent.
zbuf_grow | int | | Total number of decompression buffer size increases
buf_grow | int | | Total number of buffer size increases (deprecated, unused)
Expand Down
9 changes: 9 additions & 0 deletions configure.self
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,15 @@ const char *foo (void) {
return buf;
}"

# Check if strcasestr() is available.
mkl_compile_check "strcasestr" "HAVE_STRCASESTR" disable CC "" \
"
#define _GNU_SOURCE
#include <string.h>
char *foo (const char *needle) {
return strcasestr(\"the hay\", needle);
}"


# See if GNU's pthread_setname_np() is available, and in what form.
mkl_compile_check "pthread_setname_gnu" "HAVE_PTHREAD_SETNAME_GNU" disable CC "-D_GNU_SOURCE -lpthread" \
Expand Down
22 changes: 22 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1649,8 +1649,26 @@ static void rd_kafka_stats_emit_all (rd_kafka_t *rk) {

TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
rd_kafka_toppar_t *rktp;
rd_ts_t txidle = -1, rxidle = -1;

rd_kafka_broker_lock(rkb);

if (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP) {
/* Calculate tx and rx idle time in usecs */
txidle = rd_atomic64_get(&rkb->rkb_c.ts_send);
rxidle = rd_atomic64_get(&rkb->rkb_c.ts_recv);

if (txidle)
txidle = RD_MAX(now - txidle, 0);
else
txidle = -1;

if (rxidle)
rxidle = RD_MAX(now - rxidle, 0);
else
rxidle = -1;
}

_st_printf("%s\"%s\": { "/*open broker*/
"\"name\":\"%s\", "
"\"nodeid\":%"PRId32", "
Expand All @@ -1666,12 +1684,14 @@ static void rd_kafka_stats_emit_all (rd_kafka_t *rk) {
"\"txbytes\":%"PRIu64", "
"\"txerrs\":%"PRIu64", "
"\"txretries\":%"PRIu64", "
"\"txidle\":%"PRIu64", "
"\"req_timeouts\":%"PRIu64", "
"\"rx\":%"PRIu64", "
"\"rxbytes\":%"PRIu64", "
"\"rxerrs\":%"PRIu64", "
"\"rxcorriderrs\":%"PRIu64", "
"\"rxpartial\":%"PRIu64", "
"\"rxidle\":%"PRIu64", "
"\"zbuf_grow\":%"PRIu64", "
"\"buf_grow\":%"PRIu64", "
"\"wakeups\":%"PRIu64", "
Expand All @@ -1693,12 +1713,14 @@ static void rd_kafka_stats_emit_all (rd_kafka_t *rk) {
rd_atomic64_get(&rkb->rkb_c.tx_bytes),
rd_atomic64_get(&rkb->rkb_c.tx_err),
rd_atomic64_get(&rkb->rkb_c.tx_retries),
txidle,
rd_atomic64_get(&rkb->rkb_c.req_timeouts),
rd_atomic64_get(&rkb->rkb_c.rx),
rd_atomic64_get(&rkb->rkb_c.rx_bytes),
rd_atomic64_get(&rkb->rkb_c.rx_err),
rd_atomic64_get(&rkb->rkb_c.rx_corrid_err),
rd_atomic64_get(&rkb->rkb_c.rx_partial),
rxidle,
rd_atomic64_get(&rkb->rkb_c.zbuf_grow),
rd_atomic64_get(&rkb->rkb_c.buf_grow),
rd_atomic64_get(&rkb->rkb_c.wakeups),
Expand Down
73 changes: 65 additions & 8 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ const char *rd_kafka_secproto_names[] = {



/**
* @returns true if the broker needs a persistent connection
* @locaility broker thread
*/
static RD_INLINE rd_bool_t
rd_kafka_broker_needs_persistent_connection (rd_kafka_broker_t *rkb) {
return rkb->rkb_persistconn.internal ||
rd_atomic32_get(&rkb->rkb_persistconn.coord);
}


/**
* @returns > 0 if a connection to this broker is needed, else 0.
* @locality broker thread
Expand All @@ -111,8 +122,7 @@ rd_kafka_broker_needs_connection (rd_kafka_broker_t *rkb) {
!rd_kafka_terminating(rkb->rkb_rk) &&
!rd_kafka_fatal_error_code(rkb->rkb_rk) &&
(!rkb->rkb_rk->rk_conf.sparse_connections ||
rkb->rkb_persistconn.internal ||
rd_atomic32_get(&rkb->rkb_persistconn.coord));
rd_kafka_broker_needs_persistent_connection(rkb));
}


Expand Down Expand Up @@ -560,6 +570,9 @@ void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
/* Unlock broker since a requeue will try to lock it. */
rd_kafka_broker_unlock(rkb);

rd_atomic64_set(&rkb->rkb_c.ts_send, 0);
rd_atomic64_set(&rkb->rkb_c.ts_recv, 0);

/*
* Purge all buffers
* (put bufs on a temporary queue since bufs may be requeued,
Expand Down Expand Up @@ -653,7 +666,7 @@ void rd_kafka_broker_conn_closed (rd_kafka_broker_t *rkb,
int inqueue = rd_kafka_bufq_cnt(&rkb->rkb_outbufs);

if (rkb->rkb_ts_state + minidle < now &&
rd_atomic64_get(&rkb->rkb_ts_tx_last) + minidle < now &&
rd_atomic64_get(&rkb->rkb_c.ts_send) + minidle < now &&
inflight + inqueue == 0)
log_level = LOG_DEBUG;
else if (inflight > 1)
Expand Down Expand Up @@ -1410,7 +1423,7 @@ static int rd_kafka_broker_weight_usable (rd_kafka_broker_t *rkb) {
weight += 10 * !RD_KAFKA_BROKER_IS_LOGICAL(rkb);

if (likely(!rd_atomic32_get(&rkb->rkb_blocking_request_cnt))) {
rd_ts_t tx_last = rd_atomic64_get(&rkb->rkb_ts_tx_last);
rd_ts_t tx_last = rd_atomic64_get(&rkb->rkb_c.ts_send);
int idle = (int)((rd_clock() -
(tx_last > 0 ? tx_last : rkb->rkb_ts_state))
/ 1000000);
Expand Down Expand Up @@ -1839,6 +1852,8 @@ int rd_kafka_recv (rd_kafka_broker_t *rkb) {
goto err;
}

rd_atomic64_set(&rkb->rkb_c.ts_recv, rd_clock());

if (rkbuf->rkbuf_totlen == 0) {
/* Packet length not known yet. */

Expand Down Expand Up @@ -2612,7 +2627,7 @@ int rd_kafka_send (rd_kafka_broker_t *rkb) {
return -1;

now = rd_clock();
rd_atomic64_set(&rkb->rkb_ts_tx_last, now);
rd_atomic64_set(&rkb->rkb_c.ts_send, now);

/* Partial send? Continue next time. */
if (rd_slice_remains(&rkbuf->rkbuf_reader) > 0) {
Expand Down Expand Up @@ -4981,6 +4996,40 @@ static void rd_kafka_broker_consumer_serve (rd_kafka_broker_t *rkb,
}



/**
* @brief Check if connections.max.idle.ms has been exceeded and if so
* close the connection.
*
* @remark Must only be called if connections.max.idle.ms > 0 and
* the current broker state is UP (or UPDATE).
*
* @locality broker thread
*/
static RD_INLINE void rd_kafka_broker_idle_check (rd_kafka_broker_t *rkb) {
rd_ts_t ts_send = rd_atomic64_get(&rkb->rkb_c.ts_send);
rd_ts_t ts_recv = rd_atomic64_get(&rkb->rkb_c.ts_recv);
rd_ts_t ts_last_activity = RD_MAX(ts_send, ts_recv);
int idle_ms;

/* If nothing has been sent yet, use the connection time as
* last activity. */
if (unlikely(!ts_last_activity))
ts_last_activity = rkb->rkb_ts_state;

idle_ms = (int)((rd_clock() - ts_last_activity) / 1000);

if (likely(idle_ms < rkb->rkb_rk->rk_conf.connections_max_idle_ms))
return;

rd_kafka_broker_fail(rkb, LOG_DEBUG,
RD_KAFKA_RESP_ERR__TRANSPORT,
"Connection max idle time exceeded "
"(%dms since last activity)",
idle_ms);
}


/**
* @brief Serve broker thread according to client type.
* May be called in any broker state.
Expand Down Expand Up @@ -5072,12 +5121,19 @@ static void rd_kafka_broker_serve (rd_kafka_broker_t *rkb, int timeout_ms) {
rkb->rkb_persistconn.internal =
rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt) > 0;

if (rkb->rkb_source == RD_KAFKA_INTERNAL)
if (rkb->rkb_source == RD_KAFKA_INTERNAL) {
rd_kafka_broker_internal_serve(rkb, abs_timeout);
else if (rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER)
return;
}

if (rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER)
rd_kafka_broker_producer_serve(rkb, abs_timeout);
else if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER)
rd_kafka_broker_consumer_serve(rkb, abs_timeout);

if (rkb->rkb_rk->rk_conf.connections_max_idle_ms &&
rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP)
rd_kafka_broker_idle_check(rkb);
}


Expand Down Expand Up @@ -5451,7 +5507,8 @@ rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
rkb->rkb_reconnect_backoff_ms = rk->rk_conf.reconnect_backoff_ms;
rd_atomic32_init(&rkb->rkb_persistconn.coord, 0);

rd_atomic64_init(&rkb->rkb_ts_tx_last, 0);
rd_atomic64_init(&rkb->rkb_c.ts_send, 0);
rd_atomic64_init(&rkb->rkb_c.ts_recv, 0);

/* ApiVersion fallback interval */
if (rkb->rkb_rk->rk_conf.api_version_request) {
Expand Down
6 changes: 3 additions & 3 deletions src/rdkafka_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */

rd_atomic64_t reqtype[RD_KAFKAP__NUM]; /**< Per request-type
* counter */

rd_atomic64_t ts_send; /**< Timestamp of last send */
rd_atomic64_t ts_recv; /**< Timestamp of last receive */
} rkb_c;

int rkb_req_timeouts; /* Current value */

rd_atomic64_t rkb_ts_tx_last; /**< Timestamp of last
* transmitted requested */

thrd_t rkb_thread;

rd_refcnt_t rkb_refcnt;
Expand Down
19 changes: 19 additions & 0 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,17 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
{ AF_INET, "v4" },
{ AF_INET6, "v6" },
} },
{ _RK_GLOBAL|_RK_MED, "connections.max.idle.ms",
_RK_C_INT,
_RK(connections_max_idle_ms),
"Close broker connections after the specified time of "
"inactivity. "
"Disable with 0. "
"If this property is left at its default value some heuristics are "
"performed to determine a suitable default value, this is currently "
"limited to identifying brokers on Azure "
"(see librdkafka issue #3109 for more info).",
0, INT_MAX, 0 },
{ _RK_GLOBAL|_RK_MED|_RK_HIDDEN, "enable.sparse.connections",
_RK_C_BOOL,
_RK(sparse_connections),
Expand Down Expand Up @@ -3797,6 +3808,14 @@ const char *rd_kafka_conf_finalize (rd_kafka_type_t cltype,
RD_MAX(11, RD_MIN(conf->reconnect_backoff_ms/2, 1000));
}

if (!rd_kafka_conf_is_modified(conf, "connections.max.idle.ms") &&
conf->brokerlist &&
rd_strcasestr(conf->brokerlist, "azure")) {
/* Issue #3109:
* Default connections.max.idle.ms to 9 minutes on Azure. */
conf->connections_max_idle_ms = 9*1000*60; /* 9 minutes */
}

if (!rd_kafka_conf_is_modified(conf, "allow.auto.create.topics")) {
/* Consumer: Do not allow auto create by default.
* Producer: Allow auto create by default. */
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ struct rd_kafka_conf_s {
int reconnect_backoff_ms;
int reconnect_backoff_max_ms;
int reconnect_jitter_ms;
int connections_max_idle_ms;
int sparse_connections;
int sparse_connect_intvl;
int api_version_request;
Expand Down
8 changes: 8 additions & 0 deletions src/rdposix.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@
#define rd_strcasecmp(A,B) strcasecmp(A,B)
#define rd_strncasecmp(A,B,N) strncasecmp(A,B,N)


#ifdef HAVE_STRCASESTR
#define rd_strcasestr(HAYSTACK,NEEDLE) strcasestr(HAYSTACK,NEEDLE)
#else
#define rd_strcasestr(HAYSTACK,NEEDLE) _rd_strcasestr(HAYSTACK,NEEDLE)
#endif


/**
* Errors
*/
Expand Down
Loading

0 comments on commit c132608

Please sign in to comment.