Skip to content

Commit

Permalink
[fix](routine-load) fix consumer hang when kafka exception causing ca…
Browse files Browse the repository at this point in the history
…n not query (apache#33492)
  • Loading branch information
sollhui authored and seawinde committed Apr 15, 2024
1 parent 096e101 commit 106e0f5
Showing 1 changed file with 31 additions and 0 deletions.
31 changes: 31 additions & 0 deletions thirdparty/patches/librdkafka-1.9.2.patch
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,37 @@
# Clear define name ($2): caller may have additional checks
mkl_check_failed "$cname" "" "$3" "pkg-config --libs failed"
return 1
--- src/rdkafka.c
+++ src/rdkafka.c
@@ -3510,6 +3510,7 @@ rd_kafka_resp_err_t rd_kafka_query_watermark_offsets(rd_kafka_t *rk,
struct rd_kafka_partition_leader *leader;
rd_list_t leaders;
rd_kafka_resp_err_t err;
+ int tmout;

partitions = rd_kafka_topic_partition_list_new(1);
rktpar =
@@ -3556,11 +3557,15 @@ rd_kafka_resp_err_t rd_kafka_query_watermark_offsets(rd_kafka_t *rk,
rd_list_destroy(&leaders);

/* Wait for reply (or timeout) */
- while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
- rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
- rd_kafka_poll_cb,
- NULL) != RD_KAFKA_OP_RES_YIELD)
- ;
+ while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
+ tmout = rd_timeout_remains(ts_end);
+ if (rd_timeout_expired(tmout)) {
+ state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+ break;
+ }
+ rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
+ rd_kafka_poll_cb, NULL);
+ }

rd_kafka_q_destroy_owner(rkq);

--- src/rdkafka_broker.c
+++ src/rdkafka_broker.c
@@ -5461,7 +5461,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
Expand Down

0 comments on commit 106e0f5

Please sign in to comment.