Skip to content

Commit

Permalink
Rebase Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
mahajanadhitya committed Sep 28, 2023
1 parent 49f180a commit ca3d17f
Show file tree
Hide file tree
Showing 25 changed files with 974 additions and 49 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ librdkafka v2.2.1 is a maintenance release:

* Added Topic id to the metadata response which is part of the [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
* Fixed ListConsumerGroupOffsets not fetching offsets for all the topics in a group with Apache Kafka version below 2.4.0.
* Add missing destroy that leads to leaking partition structure memory when there
are partition leader changes and a stale leader epoch is received (#4429).
* Fix a segmentation fault when closing a consumer using the
cooperative-sticky assignor before the first assignment (#4381).



Expand Down Expand Up @@ -35,7 +39,9 @@ librdkafka v2.2.0 is a feature release:
* [KIP-339](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API):
IncrementalAlterConfigs API (started by @PrasanthV454, #4110).
* [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API): Add Broker-side SCRAM Config API (#4241).

* [KIP-580](https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients): Added Exponential Backoff mechanism for
retriable requests with `retry.backoff.ms` as minimum backoff and `retry.backoff.max.ms` as the
maximum backoff, with jitter `RD_KAFKA_RETRY_JITTER_PERCENT`(#4422).

## Enhancements

Expand Down
5 changes: 3 additions & 2 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ message.copy.max.bytes | * | 0 .. 1000000000 | 65535
receive.message.max.bytes | * | 1000 .. 2147483647 | 100000000 | medium | Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least `fetch.max.bytes` + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set. <br>*Type: integer*
max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000 | low | Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer*
max.in.flight | * | 1 .. 1000000 | 1000000 | low | Alias for `max.in.flight.requests.per.connection`: Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer*
topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | low | Period of time in milliseconds at which topic and broker metadata is refreshed in order to proactively discover any new brokers, topics, partitions or partition leader changes. Use -1 to disable the intervalled refresh (not recommended). If there are no locally referenced topics (no topic objects created, no messages produced, no subscription or no assignment) then only the broker list will be refreshed every interval but no more often than every 10s. <br>*Type: integer*
topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | low | Period of time in milliseconds at which topic and broker metadata is refreshed in order to proactively discover any new brokers, topics, partitions or partition leader changes. Use -1 to disable the intervalled refresh (not recommended). If not set explicitly, it will be defaulted to `retry.backoff.ms`. If there are no locally referenced topics (no topic objects created, no messages produced, no subscription or no assignment) then only the broker list will be refreshed every interval but no more often than every 10s. <br>*Type: integer*
metadata.max.age.ms | * | 1 .. 86400000 | 900000 | low | Metadata cache max age. Defaults to topic.metadata.refresh.interval.ms * 3 <br>*Type: integer*
topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 250 | low | When a topic loses its leader a new metadata request will be enqueued with this initial interval, exponentially increasing until the topic metadata has been refreshed. This is used to recover quickly from transitioning leader brokers. <br>*Type: integer*
topic.metadata.refresh.fast.cnt | * | 0 .. 1000 | 10 | low | **DEPRECATED** No longer used. <br>*Type: integer*
Expand Down Expand Up @@ -142,7 +142,8 @@ queue.buffering.max.ms | P | 0 .. 900000 | 5
linger.ms | P | 0 .. 900000 | 5 | high | Alias for `queue.buffering.max.ms`: Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency. <br>*Type: float*
message.send.max.retries | P | 0 .. 2147483647 | 2147483647 | high | How many times to retry sending a failing Message. **Note:** retrying may cause reordering unless `enable.idempotence` is set to true. <br>*Type: integer*
retries | P | 0 .. 2147483647 | 2147483647 | high | Alias for `message.send.max.retries`: How many times to retry sending a failing Message. **Note:** retrying may cause reordering unless `enable.idempotence` is set to true. <br>*Type: integer*
retry.backoff.ms | P | 1 .. 300000 | 100 | medium | The backoff time in milliseconds before retrying a protocol request. <br>*Type: integer*
retry.backoff.ms | P | 1 .. 300000 | 100 | medium | The backoff time in milliseconds before retrying a protocol request, this is the first backoff time, and will be backed off exponentially until number of retries is exhausted, and it's capped by retry.backoff.max.ms. <br>*Type: integer*
retry.backoff.max.ms | P | 1 .. 300000 | 1000 | medium | The max backoff time in milliseconds before retrying a protocol request, this is the atmost backoff allowed for exponentially backed off requests. <br>*Type: integer*
queue.buffering.backpressure.threshold | P | 1 .. 1000000 | 1 | low | The threshold of outstanding not yet transmitted broker requests needed to backpressure the producer's message accumulator. If the number of not yet transmitted requests equals or exceeds this number, produce request creation that would have otherwise been triggered (for example, in accordance with linger.ms) will be delayed. A lower number yields larger and more effective batches. A higher value can improve latency when using compression on slow machines. <br>*Type: integer*
compression.codec | P | none, gzip, snappy, lz4, zstd | none | medium | compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`. <br>*Type: enum value*
compression.type | P | none, gzip, snappy, lz4, zstd | none | medium | Alias for `compression.codec`: compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`. <br>*Type: enum value*
Expand Down
5 changes: 3 additions & 2 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ error code set.

The application should typically not attempt to retry producing the message
on failure, but instead configure librdkafka to perform these retries
using the `retries` and `retry.backoff.ms` configuration properties.
using the `retries`, `retry.backoff.ms` and `retry.backoff.max.ms`
configuration properties.


#### Error: Timed out in transmission queue
Expand Down Expand Up @@ -1950,7 +1951,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-559 - Make the Kafka Protocol Friendlier with L7 Proxies | 2.5.0 | Not supported |
| KIP-568 - Explicit rebalance triggering on the Consumer | 2.6.0 | Not supported |
| KIP-659 - Add metadata to DescribeConfigsResponse | 2.6.0 | Not supported |
| KIP-580 - Exponential backoff for Kafka clients | WIP | Partially supported |
| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 (WIP) | Supported supported |
| KIP-584 - Versioning scheme for features | WIP | Not supported |
| KIP-588 - Allow producers to recover gracefully from txn timeouts | 2.8.0 (WIP) | Not supported |
| KIP-601 - Configurable socket connection timeout | 2.7.0 | Supported |
Expand Down
18 changes: 18 additions & 0 deletions src/rdinterval.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#define _RDINTERVAL_H_

#include "rd.h"
#include "rdrand.h"

typedef struct rd_interval_s {
rd_ts_t ri_ts_last; /* last interval timestamp */
Expand Down Expand Up @@ -109,6 +110,23 @@ static RD_INLINE RD_UNUSED void rd_interval_reset_to_now(rd_interval_t *ri,
ri->ri_backoff = 0;
}

/**
* Reset the interval to 'now' with the given backoff ms and max_jitter as
* percentage. The backoff is given just for absolute jitter calculation. If now
* is 0, the time will be gathered automatically.
*/
static RD_INLINE RD_UNUSED void
rd_interval_reset_to_now_with_jitter(rd_interval_t *ri,
rd_ts_t now,
int64_t backoff_ms,
int max_jitter) {
rd_interval_reset_to_now(ri, now);
/* We are multiplying by 10 as (backoff_ms * percent * 1000)/100 ->
* backoff_ms * jitter * 10 */
ri->ri_ts_last = ri->ri_ts_last +
backoff_ms * rd_jitter(-max_jitter, max_jitter) * 10;
}

/**
* Back off the next interval by `backoff_us` microseconds.
*/
Expand Down
19 changes: 17 additions & 2 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2823,6 +2823,7 @@ int rd_kafka_send(rd_kafka_broker_t *rkb) {
*/
void rd_kafka_broker_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {

int64_t backoff = 0;
/* Restore original replyq since replyq.q will have been NULLed
* by buf_callback()/replyq_enq(). */
if (!rkbuf->rkbuf_replyq.q && rkbuf->rkbuf_orig_replyq.q) {
Expand Down Expand Up @@ -2850,9 +2851,23 @@ void rd_kafka_broker_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
rkb->rkb_rk->rk_conf.retry_backoff_ms);

rd_atomic64_add(&rkb->rkb_c.tx_retries, 1);
/* In some cases, failed Produce requests do not increment the retry count, see rd_kafka_handle_Produce_error. */
if (rkbuf->rkbuf_retries > 0)
backoff = (1 << (rkbuf->rkbuf_retries - 1)) *
(rkb->rkb_rk->rk_conf.retry_backoff_ms);
else
backoff = rkb->rkb_rk->rk_conf.retry_backoff_ms;

/* We are multiplying by 10 as (backoff_ms * percent * 1000)/100 ->
* backoff_ms * jitter * 10 */
backoff = rd_jitter(100 - RD_KAFKA_RETRY_JITTER_PERCENT,
100 + RD_KAFKA_RETRY_JITTER_PERCENT) *
backoff * 10;

if (backoff > rkb->rkb_rk->rk_conf.retry_backoff_max_ms * 1000)
backoff = rkb->rkb_rk->rk_conf.retry_backoff_max_ms * 1000;

rkbuf->rkbuf_ts_retry =
rd_clock() + (rkb->rkb_rk->rk_conf.retry_backoff_ms * 1000);
rkbuf->rkbuf_ts_retry = rd_clock() + backoff;
/* Precaution: time out the request if it hasn't moved from the
* retry queue within the retry interval (such as when the broker is
* down). */
Expand Down
17 changes: 12 additions & 5 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ void rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t *rkcg) {
rd_list_destroy(&rkcg->rkcg_toppars);
rd_list_destroy(rkcg->rkcg_subscribed_topics);
rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics);
if (rkcg->rkcg_assignor && rkcg->rkcg_assignor->rkas_destroy_state_cb)
if (rkcg->rkcg_assignor && rkcg->rkcg_assignor->rkas_destroy_state_cb &&
rkcg->rkcg_assignor_state)
rkcg->rkcg_assignor->rkas_destroy_state_cb(
rkcg->rkcg_assignor_state);
rd_free(rkcg);
Expand Down Expand Up @@ -754,8 +755,11 @@ void rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t *rkcg, const char *reason) {

rd_kafka_broker_destroy(rkb);

/* Back off the next intervalled query since we just sent one. */
rd_interval_reset_to_now(&rkcg->rkcg_coord_query_intvl, 0);
/* Back off the next intervalled query with a jitter since we just sent
* one. */
rd_interval_reset_to_now_with_jitter(&rkcg->rkcg_coord_query_intvl, 0,
500,
RD_KAFKA_RETRY_JITTER_PERCENT);
}

/**
Expand Down Expand Up @@ -1914,7 +1918,9 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk,
"Unsupported assignment strategy \"%s\"",
protocol_name);
if (rkcg->rkcg_assignor) {
if (rkcg->rkcg_assignor->rkas_destroy_state_cb)
if (rkcg->rkcg_assignor
->rkas_destroy_state_cb &&
rkcg->rkcg_assignor_state)
rkcg->rkcg_assignor
->rkas_destroy_state_cb(
rkcg->rkcg_assignor_state);
Expand Down Expand Up @@ -1952,7 +1958,8 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk,
}

if (rkcg->rkcg_assignor && rkcg->rkcg_assignor != rkas) {
if (rkcg->rkcg_assignor->rkas_destroy_state_cb)
if (rkcg->rkcg_assignor->rkas_destroy_state_cb &&
rkcg->rkcg_assignor_state)
rkcg->rkcg_assignor->rkas_destroy_state_cb(
rkcg->rkcg_assignor_state);
rkcg->rkcg_assignor_state = NULL;
Expand Down
45 changes: 43 additions & 2 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"metadata is refreshed in order to proactively discover any new "
"brokers, topics, partitions or partition leader changes. "
"Use -1 to disable the intervalled refresh (not recommended). "
"If not set explicitly, it will be defaulted to `retry.backoff.ms`. "
"If there are no locally referenced topics "
"(no topic objects created, no messages produced, "
"no subscription or no assignment) then only the broker list will "
Expand Down Expand Up @@ -1372,10 +1373,21 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
0, INT32_MAX, INT32_MAX},
{_RK_GLOBAL | _RK_PRODUCER, "retries", _RK_C_ALIAS,
.sdef = "message.send.max.retries"},

{_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.ms", _RK_C_INT,
_RK(retry_backoff_ms),
"The backoff time in milliseconds before retrying a protocol request.", 1,
300 * 1000, 100},
"The backoff time in milliseconds before retrying a protocol request, "
"this is the first backoff time, "
"and will be backed off exponentially until number of retries is "
"exhausted, and it's capped by retry.backoff.max.ms.",
1, 300 * 1000, 100},

{_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.max.ms", _RK_C_INT,
_RK(retry_backoff_max_ms),
"The max backoff time in milliseconds before retrying a protocol request, "
"this is the atmost backoff allowed for exponentially backed off "
"requests.",
1, 300 * 1000, 1000},

{_RK_GLOBAL | _RK_PRODUCER, "queue.buffering.backpressure.threshold",
_RK_C_INT, _RK(queue_backpressure_thres),
Expand Down Expand Up @@ -3928,6 +3940,10 @@ const char *rd_kafka_conf_finalize(rd_kafka_type_t cltype,
conf->sparse_connect_intvl =
RD_MAX(11, RD_MIN(conf->reconnect_backoff_ms / 2, 1000));
}
if (!rd_kafka_conf_is_modified(
conf, "topic.metadata.refresh.fast.interval.ms"))
conf->metadata_refresh_fast_interval_ms =
conf->retry_backoff_ms;

if (!rd_kafka_conf_is_modified(conf, "connections.max.idle.ms") &&
conf->brokerlist && rd_strcasestr(conf->brokerlist, "azure")) {
Expand Down Expand Up @@ -4116,6 +4132,31 @@ int rd_kafka_conf_warn(rd_kafka_t *rk) {
"recommend not using set_default_topic_conf");

/* Additional warnings */
if (rk->rk_conf.retry_backoff_ms > rk->rk_conf.retry_backoff_max_ms) {
rd_kafka_log(
rk, LOG_WARNING, "CONFWARN",
"Configuration `retry.backoff.ms` with value %d is greater "
"than configuration `retry.backoff.max.ms` with value %d. "
"A static backoff with value `retry.backoff.max.ms` will "
"be applied.",
rk->rk_conf.retry_backoff_ms,
rk->rk_conf.retry_backoff_max_ms);
}

if (rd_kafka_conf_is_modified(
&rk->rk_conf, "topic.metadata.refresh.fast.interval.ms") &&
rk->rk_conf.metadata_refresh_fast_interval_ms >
rk->rk_conf.retry_backoff_max_ms) {
rd_kafka_log(
rk, LOG_WARNING, "CONFWARN",
"Configuration `topic.metadata.refresh.fast.interval.ms` "
"with value %d is greater than configuration "
"`retry.backoff.max.ms` with value %d. "
"A static backoff with value `retry.backoff.max.ms` will "
"be applied.",
rk->rk_conf.metadata_refresh_fast_interval_ms,
rk->rk_conf.retry_backoff_max_ms);
}
if (rk->rk_type == RD_KAFKA_CONSUMER) {
if (rk->rk_conf.fetch_wait_max_ms + 1000 >
rk->rk_conf.socket_timeout_ms)
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ struct rd_kafka_conf_s {
int queue_backpressure_thres;
int max_retries;
int retry_backoff_ms;
int retry_backoff_max_ms;
int batch_num_messages;
int batch_size;
rd_kafka_compression_t compression_codec;
Expand Down
2 changes: 2 additions & 0 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,8 @@ const char *rd_kafka_purge_flags2str(int flags);
#define RD_KAFKA_DBG_ALL 0xfffff
#define RD_KAFKA_DBG_NONE 0x0

/* Jitter Percent for exponential retry backoff */
#define RD_KAFKA_RETRY_JITTER_PERCENT 20

void rd_kafka_log0(const rd_kafka_conf_t *conf,
const rd_kafka_t *rk,
Expand Down
Loading

0 comments on commit ca3d17f

Please sign in to comment.