Skip to content

Commit

Permalink
Fix JoinGroupRequest timeouts: was erroneously capped to socket.timeo…
Browse files Browse the repository at this point in the history
…ut.ms

.. it is now properly limited by max.poll.interval.ms (if supported by broker),
or session.timeout.ms (if not), regardless of socket.timeout.ms.
  • Loading branch information
edenhill committed Nov 20, 2018
1 parent 9209e84 commit b931045
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
5 changes: 4 additions & 1 deletion src/rdkafka_buf.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,16 @@ void rd_kafka_buf_calc_timeout (const rd_kafka_t *rk, rd_kafka_buf_t *rkbuf,
* Relative timeout, set request timeout to
* to now + rel timeout. */
rkbuf->rkbuf_ts_timeout = now + rkbuf->rkbuf_rel_timeout * 1000;
} else {
} else if (!rkbuf->rkbuf_force_timeout) {
/* Use absolute timeout, limited by socket.timeout.ms */
rd_ts_t sock_timeout = now +
rk->rk_conf.socket_timeout_ms * 1000;

rkbuf->rkbuf_ts_timeout =
RD_MIN(sock_timeout, rkbuf->rkbuf_abs_timeout);
} else {
/* Use absolue timeout without limit. */
rkbuf->rkbuf_ts_timeout = rkbuf->rkbuf_abs_timeout;
}
}

Expand Down
18 changes: 16 additions & 2 deletions src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,9 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */
int rkbuf_rel_timeout;/* Relative timeout (ms), used for retries.
* Defaults to socket.timeout.ms.
* Mutually exclusive with rkbuf_abs_timeout*/
rd_bool_t rkbuf_force_timeout; /**< Force request timeout to be
* remaining abs_timeout regardless
* of socket.timeout.ms. */


int64_t rkbuf_offset; /* Used by OffsetCommit */
Expand Down Expand Up @@ -652,18 +655,29 @@ void rd_kafka_buf_calc_timeout (const rd_kafka_t *rk, rd_kafka_buf_t *rkbuf,
* from \p now.
*
* @param now Reuse current time from existing rd_clock() var, else 0.
* @param force If true: force request timeout to be same as remaining
* abs timeout, regardless of socket.timeout.ms.
* If false: cap each request timeout to socket.timeout.ms.
*
* The remaining time is used as timeout for request retries.
*/
static RD_INLINE void
rd_kafka_buf_set_abs_timeout (rd_kafka_buf_t *rkbuf, int timeout_ms,
rd_ts_t now) {
rd_kafka_buf_set_abs_timeout0 (rd_kafka_buf_t *rkbuf, int timeout_ms,
rd_ts_t now, rd_bool_t force) {
if (!now)
now = rd_clock();
rkbuf->rkbuf_rel_timeout = 0;
rkbuf->rkbuf_abs_timeout = now + ((rd_ts_t)timeout_ms * 1000);
rkbuf->rkbuf_force_timeout = force;
}

#define rd_kafka_buf_set_abs_timeout(rkbuf,timeout_ms,now) \
rd_kafka_buf_set_abs_timeout0(rkbuf,timeout_ms,now,rd_false)


#define rd_kafka_buf_set_abs_timeout_force(rkbuf,timeout_ms,now) \
rd_kafka_buf_set_abs_timeout0(rkbuf,timeout_ms,now,rd_true)


#define rd_kafka_buf_keep(rkbuf) rd_refcnt_add(&(rkbuf)->rkbuf_refcnt)
#define rd_kafka_buf_destroy(rkbuf) \
Expand Down
16 changes: 12 additions & 4 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -1187,9 +1187,6 @@ void rd_kafka_JoinGroupRequest (rd_kafka_broker_t *rkb,

rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);

/* This is a blocking request */
rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
rd_kafka_buf_set_abs_timeout(
if (ApiVersion < 1 &&
rk->rk_conf.max_poll_interval_ms >
rk->rk_conf.group_session_timeout_ms &&
Expand All @@ -1206,11 +1203,22 @@ void rd_kafka_JoinGroupRequest (rd_kafka_broker_t *rkb,
"with this broker version",
rk->rk_conf.max_poll_interval_ms,
rk->rk_conf.group_session_timeout_ms);

/* Absolute timeout */
rd_kafka_buf_set_abs_timeout_force(
rkbuf,
rk->rk_conf.group_session_timeout_ms +
/* Request timeout is max.poll.interval.ms + grace
* if the broker supports it, else
* session.timeout.ms + grace. */
(ApiVersion >= 1 ?
rk->rk_conf.max_poll_interval_ms :
rk->rk_conf.group_session_timeout_ms) +
3000/* 3s grace period*/,
0);

/* This is a blocking request */
rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;

rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
}

Expand Down

0 comments on commit b931045

Please sign in to comment.