Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segfaults during consumer shutdown #254

Closed
5 tasks done
mensfeld opened this issue Jun 8, 2023 · 40 comments
Closed
5 tasks done

Segfaults during consumer shutdown #254

mensfeld opened this issue Jun 8, 2023 · 40 comments
Assignees
Labels
bug librdkafka Label for reports / issues related to C librdkafka

Comments

@mensfeld
Copy link
Member

mensfeld commented Jun 8, 2023

Check if 2.0.2 has a correct lock for rebalance in progress close flow for rdkafka-ruby and karafka

details here: confluentinc/librdkafka#4308
more details here: confluentinc/librdkafka#4312

ref #242
ref #250

  • try to reproduce on 2.1.1
  • try to reproduce on 2.0.2
  • try to reproduce on 1.9.2
  • replicate with karafka
  • investigate deferring close if under rebalance (if needed)
@mensfeld mensfeld self-assigned this Jun 8, 2023
@mensfeld mensfeld added the librdkafka Label for reports / issues related to C librdkafka label Jun 8, 2023
@mensfeld
Copy link
Member Author

mensfeld commented Jun 8, 2023

@vwassan can you share some code snippets and answer following questions:

  1. How do you handle shutdown? Via process signaling or something else?
  2. Do you use rebalance callbacks?
  3. If so, can you share how they look?
  4. Did any errors occur during the rebalance that was running during shutdown?
  5. Do you use the simple iterating client or the poll API?

@mensfeld
Copy link
Member Author

mensfeld commented Jun 8, 2023

I can reproduce with Karafka test suite:

ruby: rdkafka_sticky_assignor.c:1896: rd_kafka_sticky_assignor_state_destroy: Assertion `assignor_state' failed.

@mensfeld mensfeld added the bug label Jun 8, 2023
@mensfeld
Copy link
Member Author

mensfeld commented Jun 8, 2023

Ok I can reproduce on all the versions. We need to introduce a lock during the assignments states. It seems Karafka is not directly affected because its shutdown model is not directly the same as rdkafka so I will focus on rdkafka now (as could not reproduce with karafka)

@vwassan
Copy link

vwassan commented Jun 8, 2023

Ok I can reproduce on all the versions. We need to introduce a lock during the assignments states. It seems Karafka is not directly affected because its shutdown model is not directly the same as rdkafka so I will focus on rdkafka now (as could not reproduce with karafka)

@mensfeld Thank you for looking into the issue!
Considering ☝️ do you still need #254 (comment) or you're good?

@mensfeld
Copy link
Member Author

mensfeld commented Jun 8, 2023

@vwassan I'm good. I have a fix running locally. I need, however one or two days to solidify it and run all the regressions to make sure it's stable. So far so good. There are few ways how to tackle that and need to pick one that will suite all ;)

@mensfeld
Copy link
Member Author

mensfeld commented Jun 9, 2023

@vwassan ok bad news. My patch is indeed making things better but under heavy load it can crash (3 times in 6 hours of rebalances) and adds up to the shutdown time. I created a new issue here to provide debug info for librdkafka people: confluentinc/librdkafka#4312

If they are slow to react I will try to include the patch in rdkafka release or will share it with you here so you can at least partially mitigate this.

@methodmissing
Copy link
Contributor

If they are slow to react I will try to include the patch in rdkafka release or will share it with you here so you can at least partially mitigate this.

Maciej, we'd be down to try the patch 😄

@mensfeld
Copy link
Member Author

mensfeld commented Jun 9, 2023

@methodmissing you will have to wait for it because if I am right (running stress tests once more) it cannot be mitigated directly in rdkafka but either in karafka or a custom consumer using rdkafka-ruby (or the default simple one from rdkafka-ruby). So far so good. The nature of this seems to be related to resources allocation during the initial rebalance for a given consumer instance and not per se with the shutdown. There seems to be some dangling stuff from the first not completed rebalance.

This would explain why karafka users don't see it. Karafka has a different flow for startup and shutdown, hence the chances of such short lived assignments are much lower than with using rdkafka-ruby directly 🤷

Atm the bypass is running for 2h and no crashes.

@mensfeld
Copy link
Member Author

mensfeld commented Jun 9, 2023

At the moment I am running 64 consumers for over 4 hours, no crashes. Starts to look solid at least on local.

edit: ok got first crash after several hours. So the solution is not final 😅 thought it's still much better than previous crash on every run

edit: I can replicate it with karafka as well - a lot of effort but doable. Lets check if the patch works there as well.

@mensfeld
Copy link
Member Author

@methodmissing @vwassan it was already fixed by confluent so won't ship my patch. It's a matter of releasing 2.2.0. I will bump it here probably in the next release as need to run extensive testing to make sure it's stable. When bumping karafka-rdkafka to 2.1.0 I noticed critical errors with librdkafka that were fixed in 2.1.1 hence need to be cautious. Once librdkafka has a 2.2.0 crafted in GH will run those.

@mensfeld
Copy link
Member Author

2.2.0-rc-1 does not present this behaviour. I will close this once rdkafka catches up on releases.

@mensfeld
Copy link
Member Author

mensfeld commented Jul 5, 2023

@methodmissing @vwassan do you want to test out the RC release of librdkafka with the fix?

@mensfeld
Copy link
Member Author

Nvmd. It's back in 2.2.0 🤦

@vwassan
Copy link

vwassan commented Jul 13, 2023

@methodmissing @vwassan do you want to test out the RC release of librdkafka with the fix?

@mensfeld sorry! somehow I missed this message

@mensfeld
Copy link
Member Author

@vwassan nvmd. They broke it back. I already commented and got another stable reproduction. Lets see what they have to say ;)

@mensfeld
Copy link
Member Author

@methodmissing @vwassan its a different issue 😅 though the effect is the same. I do however know how to mitigate this and hopefully will craft a PR next week.

@vwassan
Copy link

vwassan commented Jul 20, 2023

Thanks @mensfeld. Appreciate your efforts!

@jychen7
Copy link
Contributor

jychen7 commented Jul 22, 2023

I do however know how to mitigate this and hopefully will craft a PR next week.

confluentinc/librdkafka#4312 (comment) sounds exciting

@dmariassy
Copy link

dmariassy commented Aug 22, 2023

When running in production, we've also started to see a lot of segfaults during producer shutdowns. I haven't been able to reproduce locally. However, a hunch from a co-worker helped us fix the issue client-side by doing:

at_exit do
  client.close
  client = nil
  GC.start
end

This suggests that the bug is most likely in rdkafka-ruby's finalizer logic.

@mensfeld
Copy link
Member Author

@dmariassy same applies to rdkafka-ruby itself:

If any, I would recommend using at_exit to close the producer instead of nullifying and triggering indirect closing. It makes things more complicated. Librdkafka has several bugs related to the shutdown of both producer and consumer, and the fact that you see it on shutdown may not indicate it is on "our" side. There are, at the moment, race conditions of flush and broker metadata refreshes and few other that affect both producers and consumers. There are also consumer specific shutdown issues.

If you have librdkafka stracktrace it could be helpful. waterdrop had one crash for last 6 months on CI and my envs and it was only related to the internal librdkafka race condition.

@dmariassy
Copy link

Sorry, my example above was incomplete. We do close the producer before setting it to nil. I corrected the omission. The full example is:

at_exit do
  client.close
  client = nil
  GC.start
end

@mensfeld
Copy link
Member Author

@dmariassy I will need:

  • stacktracke (librdkafka prints that and if not, ruby VM crash should)
  • repro (if doable)
  • cluster info and structure
  • ruby version, rdkafka version. librdkafka version
  • info about load during close
  • any customization
  • info if you have one client or many

Without stable repro I cannot fix it unfortunately as none of my higher level libs or code presents this behaviour anymore. Nor any of the specs or things in rdkafka-ruby itself.

WaterDrop uses a different (opinionated) finalizer hook for cleanups: https://github.com/karafka/waterdrop/blob/master/lib/waterdrop/producer.rb you may try to take its code and implement it yourself or to check if the same would be present via waterdrop to assess.

@mensfeld
Copy link
Member Author

@dmariassy if I can reproduce it in any way (except not closing the producer at all), It should be fixable (minus known librdkafka issues that can only be partially mitigated). If I reproduce, I will probably open a new issue as this was related to other things that are already fixed and refer to consumer or won't be fixed because are in librdkafka and were reported.

@dmariassy
Copy link

Hi @mensfeld , sorry for the late reply. I finally have a stacktrace I can share 🎉

sfr-production : ruby : SIGABRT Unhandled signal SIGABRT in `/usr/local/ruby/bin/ruby` 
    /artifacts/bundle/ruby/3.2.0/gems/rdkafka-0.13.0/lib/rdkafka/native_kafka.rb:110 rd_kafka_destroy
    /artifacts/bundle/ruby/3.2.0/gems/rdkafka-0.13.0/lib/rdkafka/native_kafka.rb:110 block in close
    /artifacts/bundle/ruby/3.2.0/gems/rdkafka-0.13.0/lib/rdkafka/native_kafka.rb:60 with_inner
    /artifacts/bundle/ruby/3.2.0/gems/rdkafka-0.13.0/lib/rdkafka/native_kafka.rb:73 block in synchronize
    /artifacts/bundle/ruby/3.2.0/gems/rdkafka-0.13.0/lib/rdkafka/native_kafka.rb:66 synchronize
    /artifacts/bundle/ruby/3.2.0/gems/rdkafka-0.13.0/lib/rdkafka/native_kafka.rb:66 synchronize
    /artifacts/bundle/ruby/3.2.0/gems/rdkafka-0.13.0/lib/rdkafka/native_kafka.rb:88 close
    /artifacts/bundle/ruby/3.2.0/gems/rdkafka-0.13.0/lib/rdkafka/producer.rb:45 close
    /artifacts/bundle/ruby/3.2.0/gems/kafka-shopify-4.1.1/lib/kafka-shopify/writers/rdkafka/abstract_rdkafka_writer.rb:207 block (2 levels) in close_clients

Caused by: sfr-production : ruby : SIGABRT Unhandled signal SIGABRT in `/usr/local/ruby/bin/ruby` 
    vm_dump.c:812 rb_vm_bugreport
    error.c:1029 rb_bug_for_fatal_signal
    signal.c:964 sigsegv
    [/lib/x86_64-linux-gnu/libpthread.so.0@0x000079d32e468420] __restore_rt
    [/lib/x86_64-linux-gnu/libpthread.so.0@0x000079d32e45daab] __pthread_clockjoin_ex
    [/lib/x86_64-linux-gnu/libpthread.so.0@0x000079d32e469b7e] thrd_join
    rdkafka.c:1107 rd_kafka_destroy_app
    [/lib/x86_64-linux-gnu/libffi.so.7@0x000079d324d29ff5] 0x79d324d29ff5
    [/lib/x86_64-linux-gnu/libffi.so.7@0x000079d324d2940a] 0x79d324d2940a
    Call.c:400 rbffi_CallFunction
    MethodHandle.c:220 custom_trampoline
    vm_insnhelper.c:3288 vm_call_cfunc_with_frame
    vm_insnhelper.c:3940 vm_call_method_each_type
    vm_insnhelper.c:4090 vm_call_method
    vm_insnhelper.c:5112 vm_exec_core
    vm.c:2438 rb_vm_exec
    vm.c:1451 rb_yield
    eval.c:1011 rb_ensure
    vm_insnhelper.c:3288 vm_call_cfunc_with_frame
    vm_insnhelper.c:5112 vm_exec_core
    vm.c:2438 rb_vm_exec
    vm.c:1656 rb_vm_invoke_proc
    thread.c:595 thread_do_start_proc
    thread.c:614 thread_start_func_2
    thread_pthread.c:1173 thread_start_func_1
    [/lib/x86_64-linux-gnu/libpthread.so.0@0x000079d32e45c609] start_thread
    [/lib/x86_64-linux-gnu/libc.so.6@0x000079d32e366133] clone

As for the other pieces of info

  • cluster info and structure: Kafka 3.5
  • ruby version: 3.2.0
  • rdkafka version: rdkafka-0.13.0
  • info about load during close: Close is called during the container's shutdown sequence. There should be little to no load
  • any customization:
    {
          "compression.type": "zstd",
          "linger.ms": 8000,
          "batch.num.messages": 800,
          "topic.metadata.propagation.max.ms": 0, # If a topic is missing, raise an error immediately
          # Use the max allowed setting for message.max.bytes. This setting controls the maximum size of a single
          # uncompressed record that the producer will accept before raising a "Message size too large" error.
          # The default value is 1_000_000 bytes (1MB), which should theoretically be enough. However, rdkafka applies
          # this limit on uncompressed messages, while the broker checks the size of compressed message batches.
          # So it's perfectly possible for a message to be rejected by the client, which would actually be accepted by
          # the broker. By setting this config to its max allowed value, we prevent this from happening.
          "message.max.bytes": 1_000_000_000,
        }
    
  • info if you have one client or many: 1

@mensfeld
Copy link
Member Author

@dmariassy thanks! Just to be sure, I spoke recently with @peterzhu2118 on your shutdown procedures and there was a case of multi-threaded shutdown with timeout on close. I assume this is not the case here right? Can you show me more of the shutdown procedure you guys are using?

@mensfeld
Copy link
Member Author

Also one more thing: was the process a fork?

@dmariassy
Copy link

dmariassy commented Sep 20, 2023

@dmariassy thanks! Just to be sure, I spoke recently with @peterzhu2118 on your shutdown procedures and there was a case of multi-threaded shutdown with timeout on close. I assume this is not the case here right? Can you show me more of the shutdown procedure you guys are using?

We still call client.close from a thread, but we don't terminate that thread any more. This is roughly what the shutdown process looks like:

def close_client
  begin
    client.flush
  rescue => e
    @logger.error("Error flushing messages while closing rdkafka client: #{e.inspect}")
  end
  
  closed = T.let(false, T::Boolean)
  start = Time.now
  t = Thread.new do
    client.close # This is the line referenced in the stack trace: `abstract_rdkafka_writer.rb:207`
    closed = true
  rescue => ex
    @logger.error("Error closing rdkafka client: #{ex.inspect}")
  end
  
  sleep 0.1 while !closed && Time.now - start < 5 # wait 5 seconds for the client to close
  unless closed
    # The client will never close if it cannot connect to the broker
    @logger.error("Failed to close rdkafka client for within allowed timeout")
  end
rescue => e
  @logger.error("Error closing rdkafka client: #{e.inspect}")
ensure
  # These two lines were added as an earlier attempt to prevent segfaults. And it worked in some cases
  client = nil
  GC.start
  @logger.info("Rdkafka close sequence complete")
end

Also one more thing: was the process a fork?

Yes

@mensfeld
Copy link
Member Author

rdkafka-ruby uses librdkafka 2.0.2. I will bump it to 2.2.0 soon. In 2.0.2 the stacktrace points to:

https://github.com/confluentinc/librdkafka/blob/v2.0.2/src/rdkafka.c#L1107C8-L1107C8

Which leads me to another question: was the producer initialized in the parent process and used from the fork?

@mensfeld
Copy link
Member Author

ref:

        if (thrd_join(thrd, &res) != thrd_success)
                rd_kafka_log(rk, LOG_ERR, "DESTROY",
                             "Failed to join internal main thread: %s "
                             "(was process forked?)",
                             rd_strerror(errno));

@mensfeld
Copy link
Member Author

will try to repro tomorrow.

@mensfeld
Copy link
Member Author

@dmariassy yeah I can crash it:

/home/mencio/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/karafka-rdkafka-0.13.5/lib/rdkafka/native_kafka.rb:113: [BUG] Segmentation fault at 0x00007f911c84f910
ruby 3.2.2 (2023-03-30 revision e51014f9c0) [x86_64-linux]

-- Control frame information -----------------------------------------------
c:0041 p:---- s:0230 e:000229 CFUNC  :rd_kafka_destroy
c:0040 p:0049 s:0225 e:000224 BLOCK  /home/mencio/.rbenv/

@mensfeld
Copy link
Member Author

mensfeld commented Sep 22, 2023

@dmariassy stop using config and producer references from the original process in the fork and don't close it and things will work as expected. Quote from waterdrop:

If you work with forked processes, make sure you don't use the producer before the fork. You can easily configure the producer and then fork and use it.

My repro as followed:

crashes VMs in scale

config = {:"bootstrap.servers" => "localhost:9092", :"queue.buffering.max.ms" => 30_000}
producer = Rdkafka::Config.new(config).producer
producer.produce(topic: 'test', payload: 'a')

10.times {  sleep(rand); fork {10.times { producer.produce(topic: 'test', payload: 'a') }; producer.close  } }

does NOT crash:

config = {:"bootstrap.servers" => "localhost:9092", :"queue.buffering.max.ms" => 30_000}

10.times {
sleep(rand);
fork {
  producer = Rdkafka::Config.new(config).producer
  10.times {
    producer.produce(topic: 'test', payload: 'a')
  }
  producer.close
 }
}

@mensfeld
Copy link
Member Author

The last case is a librdkafka bug on closing with metadata refresh running on a cluster change. This is reported and being fixed in librdkafka. Your case seems to be the one described and reproduced by me above.

@dmariassy
Copy link

Thanks for looking into it! Our library wrapper already defers the producer initialization until the first produce call, which I expect to only happen after forking. But I maybe wrong about that because I don't own the client application. I'll add, additional logs to confirm at runtime.

@mensfeld
Copy link
Member Author

@dmariassy from your stacktrace, librdkafka code and my POC it seems there's "something" out there that does not work as expected.

I ofc may be wrong and then I will get back to this issue, but as of now it looks like you close a producer that is referenced from a fork.

@mensfeld
Copy link
Member Author

This version seems to crash more often:

config = {
  :"bootstrap.servers" => "localhost:9092",
  :"queue.buffering.max.ms" => 30_000
}

producer = Rdkafka::Config.new(config).producer
producer.produce(topic: 'test', payload: 'a')

sleep(2)

20.times do |i|
  sleep(rand / 10)

  fork do
    10.times do
      producer.produce(topic: 'test', payload: 'a')
    end

    sleep(0.01)

    producer.close
  end
end

@mensfeld
Copy link
Member Author

Yeah @dmariassy I was not able to crash this:

config = {
  :"bootstrap.servers" => "localhost:9092",
  :"queue.buffering.max.ms" => 30_000
}

20.times do |i|
  sleep(rand / 10)

  fork do
    producer = Rdkafka::Config.new(config).producer
    producer.produce(topic: 'test', payload: 'a')

    10.times do
      producer.produce(topic: 'test', payload: 'a')
    end

    sleep(0.01)

    producer.close
  end
end

it clearly indicates resource leakage.

@dmariassy
Copy link

I think we might be closing from the master process since that's where our at_exit get registered... I'll have to try moving that to the forked process as well

@mensfeld
Copy link
Member Author

you can do what waterdrop does: https://github.com/karafka/waterdrop/blob/master/lib/waterdrop/producer.rb#L97

this is lazy deferred to the post-fork stage (as long as you do not operate from main thread). Still worth closing correctly but such a trick should also operate.

@mensfeld
Copy link
Member Author

I am closing this one. The last issues were fixed in librdkafka 2.3.0. We should get a 1:1 alignment with karafka-rdkafka and merger of both in the upcoming months.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug librdkafka Label for reports / issues related to C librdkafka
Development

No branches or pull requests

5 participants