Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rd_kafka_query_watermark_offsets API hang forever #2588

Closed
5 of 7 tasks
hxiaodon opened this issue Oct 25, 2019 · 5 comments · Fixed by #4460
Closed
5 of 7 tasks

rd_kafka_query_watermark_offsets API hang forever #2588

hxiaodon opened this issue Oct 25, 2019 · 5 comments · Fixed by #4460
Assignees
Labels
Milestone

Comments

@hxiaodon
Copy link

hxiaodon commented Oct 25, 2019

Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ

Description

rd_kafka_query_watermark_offsets API will hang forever when the kafka cluster network encounter access restriction(network isolation)

How to reproduce

I could reproduce this problem with latest librdkafka version

  1. launch 2 vm/docker instances(my local os is centos 6). A, B

  2. install confluent-oss at instance A, start kafka with 3 broker services

    • brokerId: 1, port:9093
    • brokerId: 2, port:9094
    • brokerId: 3, port:9095
  3. create a topic "test" for kafka with 3 partitions and replication-factor equal to 1, each broker should have a unique partition Id, assuming the "test" topic is with the following compositions:

    • brokerId: 1, port:9093 partitionId:0
    • brokerId: 2, port:9094 partitionId:1
    • brokerId: 3, port:9095 partitionId:2
  4. at instance B, deploy the test program
    main.go.zip

  5. enable iptable service at instance A, just reject instance B's accessing for port 9095

  6. Now run test program at instance B(test API QueryWatermarkOffsets), and it will hang(the partitionId 2's broker is alive but is not accessible for instanceB)

    • ./kafkatest -broker=$instanceA_IP:9093 -newAPI=true -topic=test -partitionId=2 -timeout=2000
  7. If we use the OffsetsForTimes API, the program could exit when timeout

    • ./kafkatest -broker=$instanceA_IP:9093 -newAPI=false -topic=test -partitionId=2 -timeout=5000

conclusion:
I think the issue could be easily reproduced when a partitionId's leader(broker) is isolated.
The infinite looping code is here,

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version (release number or git tag): v0.11.6
  • Apache Kafka version: confluent-oss-5.0.0-2.11
  • librdkafka client configuration: "session.timeout.ms": 10000
  • Operating system: centos 6
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@hxiaodon
Copy link
Author

Hi,
I made some code modification at rdkafka tag v0.11.6. The change is based on rd_kafka_offsets_for_times API's implementation.

diff --git a/src/rdkafka.c b/src/rdkafka.c
index 86d347f8..797ee937 100644
--- a/src/rdkafka.c
+++ b/src/rdkafka.c
@@ -2592,6 +2592,8 @@ rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic,
         struct rd_kafka_partition_leader *leader;
         rd_list_t leaders;
         rd_kafka_resp_err_t err;
+        int tmout;
+        int cnt;
 
         partitions = rd_kafka_topic_partition_list_new(1);
         rktpar = rd_kafka_topic_partition_list_add(partitions,
@@ -2641,10 +2643,12 @@ rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic,
 
         /* Wait for reply (or timeout) */
         while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
-               rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
-                                rd_kafka_poll_cb, NULL) !=
-               RD_KAFKA_OP_RES_YIELD)
-                ;
+               !rd_timeout_expired((tmout = rd_timeout_remains(ts_end)))){
+                cnt = rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
+                                rd_kafka_poll_cb, NULL);
+                if (cnt == RD_KAFKA_OP_RES_YIELD)
+                   break; 
+        }
 
         rd_kafka_q_destroy_owner(rkq);
 

Does it make sense, or it involves other risk?
Hope for your reply, Thanks!

@hxiaodon
Copy link
Author

hxiaodon commented Oct 29, 2019

@edenhill
The code change is tiny, could you help to take a look and consider about merging into master branch?
Thanks!

@b1aafulei
Copy link

@edenhill

The same problem is still there in the most recent release v1.7.0. rd_kafka_query_watermark_offsets would hang forever if brokers are down, completely ignoring its timeout_ms parameter. Meanwhile, @hxiaodon solution seems to be good. Would you consider merging it?

@IsaacHe20
Copy link

@edenhill
hi, eden. Our team faces the same bug as above, I verified this change on version 1.7.0, it's a work solution.
I found that this problem has not been fixed for two years, Maybe you can talk about your concerns?
Thanks !

@nickwb
Copy link

nickwb commented Apr 4, 2023

I've encountered a similar issue, it appears rd_kafka_query_watermark_offsets will block forever if the provided partition argument does not exist (i.e., Some value n which is greater than the partition count for that topic).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants