Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio, streaming consumer, future producer leading to rd.h:298: rd_refcnt_sub0: Assertion !*"refcnt sub-zero"' failed.` #66

Closed
abrugh opened this issue Nov 28, 2017 · 3 comments

Comments

@abrugh
Copy link

abrugh commented Nov 28, 2017

Environment:

  • Five (5) brokers running Kafka 0.9.0.1
  • rust-rdkafka version 0.13.0

Code:

This is approximate, not a direct copy and paste, trying to keep it concise and to the point. If i need to clarify how/where stuff is coming from let me know.

// lots of extern crates here
extern crate tokio_core

// lots of use calls here
use tokio_core::reactor::Core;

fn main():
  // main loop for consume/produce, a small http server exists outside this thread for metrics collection
  thread::spawn(|| {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let consumer = consumer_config.create().expect("can't create consumer");
    consumer.subscribe(//in the usual way);
    let message_stream = consumer.start().filter_map(|result| {
      // match for error handling here
    }).for_each(|message| {
      let payload = message.payload();
      match payload {
        Ok(payload) => {
          let res = // transformation of payload;
          match res {
            Ok(things) => {
              msgs = things.into_iter().map(|thing| { (thing.key(), thing.val1, thing_to_msg(thing))}).collect::<Vec(_,_, Vec<u8)>>{};
              for (key, val1, msg) in msgs{
                let send_thread = producer.clone();
                let process_message = send_thread.send_copy::<Vec<u8>, String>(
                ).and_then(move |d_report| {
                  // increment a metrics thing that got cloned
                  Ok(());
                }).or_else(|err| {
                  warn!("Error...")'
                  Ok(())
                });
                handle.spawn(process_message);
              }
            },
            // Err matches and reporting here
          }
        },
        // Err matches and reporting here
      } 
    });
    core.run(message_stream).unwrap();
  });

  // Http server stuff here
}

Behavior

This will run anywhere from 20 minutes to 2 hours, producing about 1.5M messages per minutue before the number of produced messages drops to 0. CPU for one thread will stay at 100% for an additional 10 minutes before the whole thing exists with a code of 139 (segfault). Logs below.

Logs

There's a string of messages about BrokerTransportFailure, from some or all of the brokers. Many brokers are mentioned more than once. The assertion error's timestamp provided by docker, which is why it's a little different looking.

2017-11-28T13:50:50.370186398+00:00 - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): node01:9092/bootstrap: Receive failed: Disconnected
...
2017-11-28T14:25:50.330383250+00:00 - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): node01:9092/bootstrap: Receive failed: Disconnected
2017-11-28T14:25:50.330724298Z - my_program: rd.h:298: rd_refcnt_sub0: Assertion `!*"refcnt sub-zero"' failed.
@fede1024
Copy link
Owner

How is the consumer configured? Could you try with "api.version.request" set to false?

Also: do you know if there was actually any problem with the brokers? I wonder if this behavior happens every time a connection hangs. Could you try pausing some of the brokers while the consumer/producer is running? It is possible that during the termination sequence something is freed multiple times. Are you using -1 as block_ms parameter in the producer? Maybe you could try lowering that parameter and see if it get stuck there (even though it wouldn't explain the 100% CPU).

Could you also enable logging? (explained here). Using RUST_LOG=rdkafka=trace should be enough (plus the env_logger::init() in the code).

@fede1024
Copy link
Owner

I found a couple of librdkafka issues that might be related:

however i'm not sure if it might be the same issue or something caused by rust-rdkafka. In one of them they suggest upgrading to Kafka 0.10, but I don't know if that's an option in your case.

@benesch
Copy link
Collaborator

benesch commented Oct 22, 2019

Given that this assertion fired with other Kafka clients, I'm inclined to call this a bug in librdkafka/Kafka. @abrugh, if you can still reproduce this on a more recent version of librdkafka/Kafka, please refile!

@benesch benesch closed this as completed Oct 22, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants