diff --git a/thirdparty/patches/librdkafka-1.9.2.patch b/thirdparty/patches/librdkafka-1.9.2.patch index 38064e751dc0a2..b13e740bc5c36a 100644 --- a/thirdparty/patches/librdkafka-1.9.2.patch +++ b/thirdparty/patches/librdkafka-1.9.2.patch @@ -34,6 +34,37 @@ # Clear define name ($2): caller may have additional checks mkl_check_failed "$cname" "" "$3" "pkg-config --libs failed" return 1 +--- src/rdkafka.c ++++ src/rdkafka.c +@@ -3510,6 +3510,7 @@ rd_kafka_resp_err_t rd_kafka_query_watermark_offsets(rd_kafka_t *rk, + struct rd_kafka_partition_leader *leader; + rd_list_t leaders; + rd_kafka_resp_err_t err; ++ int tmout; + + partitions = rd_kafka_topic_partition_list_new(1); + rktpar = +@@ -3556,11 +3557,15 @@ rd_kafka_resp_err_t rd_kafka_query_watermark_offsets(rd_kafka_t *rk, + rd_list_destroy(&leaders); + + /* Wait for reply (or timeout) */ +- while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS && +- rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK, +- rd_kafka_poll_cb, +- NULL) != RD_KAFKA_OP_RES_YIELD) +- ; ++ while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS) { ++ tmout = rd_timeout_remains(ts_end); ++ if (rd_timeout_expired(tmout)) { ++ state.err = RD_KAFKA_RESP_ERR__TIMED_OUT; ++ break; ++ } ++ rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK, ++ rd_kafka_poll_cb, NULL); ++ } + + rd_kafka_q_destroy_owner(rkq); + --- src/rdkafka_broker.c +++ src/rdkafka_broker.c @@ -5461,7 +5461,9 @@ static int rd_kafka_broker_thread_main(void *arg) {