Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Purge brokers no longer reported in metadata #4557

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,35 @@

librdkafka v2.5.0 is a feature release.

* Identify brokers only by broker id (#4557, @mfleming)
* Remove unavailable brokers and their thread (#4557, @mfleming)

## Enhancements

* Update bundled lz4 (used when `./configure --disable-lz4-ext`) to
[v1.9.4](https://github.com/lz4/lz4/releases/tag/v1.9.4), which contains
bugfixes and performance improvements (#4726).


## Fixes

### General fixes

* Issues: #4212
Identify brokers only by broker id, as happens in Java,
avoid to find the broker with same hostname and use the same thread
and connection.
Happens since 1.x (#4557, @mfleming).
* Issues: #4557
Remove brokers not reported in a metadata call, along with their thread.
Avoids that unavailable brokers are selected for a new connection when
there's no one available. We cannot tell if a broker was removed
temporarily or permanently so we always remove it and it'll be added back when
it becomes available again.
Happens since 1.x (#4557, @mfleming).



# librdkafka v2.4.0

librdkafka v2.4.0 is a feature release:
Expand Down
41 changes: 13 additions & 28 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1192,32 +1192,9 @@ static void rd_kafka_destroy_internal(rd_kafka_t *rk) {
rd_kafka_wrlock(rk);
}

/* Decommission brokers.
* Broker thread holds a refcount and detects when broker refcounts
* reaches 1 and then decommissions itself. */
/* Decommission brokers. */
TAILQ_FOREACH_SAFE(rkb, &rk->rk_brokers, rkb_link, rkb_tmp) {
/* Add broker's thread to wait_thrds list for later joining */
thrd = rd_malloc(sizeof(*thrd));
*thrd = rkb->rkb_thread;
rd_list_add(&wait_thrds, thrd);
rd_kafka_wrunlock(rk);

rd_kafka_dbg(rk, BROKER, "DESTROY", "Sending TERMINATE to %s",
rd_kafka_broker_name(rkb));
/* Send op to trigger queue/io wake-up.
* The op itself is (likely) ignored by the broker thread. */
rd_kafka_q_enq(rkb->rkb_ops,
rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));

#ifndef _WIN32
/* Interrupt IO threads to speed up termination. */
if (rk->rk_conf.term_sig)
pthread_kill(rkb->rkb_thread, rk->rk_conf.term_sig);
#endif

rd_kafka_broker_destroy(rkb);

rd_kafka_wrlock(rk);
rd_kafka_broker_decommission(rk, rkb, &wait_thrds);
}

if (rk->rk_clusterid) {
Expand Down Expand Up @@ -1291,6 +1268,15 @@ static void rd_kafka_destroy_internal(rd_kafka_t *rk) {

rd_list_destroy(&wait_thrds);

/* Join previously decommissioned broker threads */
RD_LIST_FOREACH(thrd, &rk->wait_thrds, i) {
int res;
if (thrd_join(*thrd, &res) != thrd_success)
;
rd_free(thrd);
}
rd_list_destroy(&rk->wait_thrds);

/* Destroy mock cluster */
if (rk->rk_mock.cluster)
rd_kafka_mock_cluster_destroy(rk->rk_mock.cluster);
Expand Down Expand Up @@ -1415,9 +1401,7 @@ static RD_INLINE void rd_kafka_stats_emit_toppar(struct _stats_emit *st,
rd_kafka_toppar_lock(rktp);

if (rktp->rktp_broker) {
rd_kafka_broker_lock(rktp->rktp_broker);
broker_id = rktp->rktp_broker->rkb_nodeid;
rd_kafka_broker_unlock(rktp->rktp_broker);
}

/* Grab a copy of the latest finalized offset stats */
Expand Down Expand Up @@ -2287,6 +2271,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
rd_kafka_coord_cache_init(&rk->rk_coord_cache,
rk->rk_conf.metadata_max_age_ms);
rd_kafka_coord_reqs_init(rk);
rd_list_init(&rk->wait_thrds, 0, NULL);

if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb)
rk->rk_drmode = RD_KAFKA_DR_MODE_CB;
Expand Down Expand Up @@ -4734,8 +4719,8 @@ static void rd_kafka_DescribeGroups_resp_cb(rd_kafka_t *rk,
goto err;
}

gi->broker.id = rkb->rkb_nodeid;
rd_kafka_broker_lock(rkb);
gi->broker.id = rkb->rkb_nodeid;
gi->broker.host = rd_strdup(rkb->rkb_origname);
gi->broker.port = rkb->rkb_port;
rd_kafka_broker_unlock(rkb);
Expand Down
10 changes: 10 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -5289,6 +5289,16 @@ void rd_kafka_group_list_destroy(const struct rd_kafka_group_list *grplist);
RD_EXPORT
int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist);

/**
* @brief Retrieve and return the learned broker ids.
*
* @param rk Instance to use.
* @param cntp Will be updated to the number of brokers returned.
*
* @returns a malloc:ed list of int32_t broker ids.
*/
RD_EXPORT
int32_t *rd_kafka_brokers_learned_ids(rd_kafka_t *rk, size_t *cntp);


/**
Expand Down
6 changes: 3 additions & 3 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -8029,10 +8029,10 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
rd_list_init(&rko_result->rko_u.admin_result.results, cnt,
rd_kafka_ConsumerGroupDescription_free);

rd_kafka_broker_lock(rkb);
nodeid = rkb->rkb_nodeid;
host = rd_strdup(rkb->rkb_origname);
port = rkb->rkb_port;
rd_kafka_broker_lock(rkb);
host = rd_strdup(rkb->rkb_origname);
port = rkb->rkb_port;
rd_kafka_broker_unlock(rkb);

node = rd_kafka_Node_new(nodeid, host, port, NULL);
Expand Down
Loading