Skip to content

Commit

Permalink
kafka: fix ignored exceptional future in staged handler
Browse files Browse the repository at this point in the history
When a continuation like `.then([..] {...})` runs the lambda parameter
is not executed if it is a attached to a failed future. In this case
the lambda will be destroyed without running. In the case of kafka two
stage dispatch handling, we had the following:

  first_stage.then([f = std::move(second_stage)] { ... }).then(...);

and we encountered a situation where the first and second stages failed.
when the continuation attached to the first_stage ran the captured `f`
exceptional future was destroyed (via the lambda that captured it). if
the first stage had not failed, then the lambda had proper handling for
a failed `f`.

This is an instance of a class of bug that is pretty hard to address. In this
case it was a bad_alloc that was thrown which caused the issue, not an
exceptional case that we explicitly handle.

to fix this we check if the first stage failed, and if so, we explicitly
consume the second stage to avoid this case. if the first stage
succeeds, then we operate as normal and background the second stage.

Aug 07 19:53:30 ip-172-31-45-184 rpk[21830]: WARN  2021-08-07 19:53:30,680 [shard 30] raft - [group_id:3069, {kafka/foo3/4}] replicate_entries_stm.cc:100 - Error while replicating entries std::bad_alloc (std::bad_alloc)
Aug 07 19:53:30 ip-172-31-45-184 rpk[21830]: WARN  2021-08-07 19:53:30,680 [shard 31] seastar - Exceptional future ignored: std::bad_alloc (std::bad_alloc), backtrace:
[Backtrace #0]
void seastar::backtrace<seastar::current_backtrace_tasklocal()::$_3>(seastar::current_backtrace_tasklocal()::$_3&&) at /v/build/v_deps_build/seastar-prefix/src/seastar/include/seastar/util/backtrace.hh:59
 (inlined by) seastar::current_backtrace_tasklocal() at /v/build/v_deps_build/seastar-prefix/src/seastar/src/util/backtrace.cc:86
 (inlined by) seastar::current_tasktrace() at /v/build/v_deps_build/seastar-prefix/src/seastar/src/util/backtrace.cc:137
 (inlined by) seastar::current_backtrace() at /v/build/v_deps_build/seastar-prefix/src/seastar/src/util/backtrace.cc:170
seastar::report_failed_future(std::exception_ptr const&) at /v/build/v_deps_build/seastar-prefix/src/seastar/src/core/future.cc:210
 (inlined by) seastar::report_failed_future(seastar::future_state_base::any&&) at /v/build/v_deps_build/seastar-prefix/src/seastar/src/core/future.cc:218
addr2line: DWARF error: could not find variable specification at offset a8a
addr2line: DWARF error: could not find variable specification at offset c49
addr2line: DWARF error: could not find variable specification at offset 46223
addr2line: DWARF error: could not find variable specification at offset 4d49d
addr2line: DWARF error: could not find variable specification at offset 5f261
seastar::future_state_base::any::check_failure() at /vectorized/include/seastar/core/future.hh:567
 (inlined by) seastar::future_state<seastar::foreign_ptr<std::__1::unique_ptr<kafka::response, std::__1::default_delete<kafka::response> > > >::clear() at /vectorized/include/seastar/core/future.hh:609
 (inlined by) ~future_state at /vectorized/include/seastar/core/future.hh:614
 (inlined by) ~future at /vectorized/include/seastar/core/future.hh:1337
 (inlined by) ~ at /var/lib/buildkite-agent/builds/buildkite-amd64-builders-i-0eeaee3e12186c1d5-1/vectorized/redpanda/vbuild/release/clang/../../../src/v/kafka/server/connection_context.cc:257
 (inlined by) ~continuation at /vectorized/include/seastar/core/future.hh:750
 (inlined by) seastar::continuation<seastar::internal::promise_base_with_type<void>, kafka::connection_context::dispatch_method_once(kafka::request_header, unsigned long)::$_0::operator()(kafka::connection_context::session_resources)::{lambda(iobuf)#1}::operator()(iobuf)::{lambda(
)#1}, seastar::future<void>::then_impl_nrvo<{lambda(iobuf)#1}, kafka::connection_context::dispatch_method_once(kafka::request_header, unsigned long)::$_0::operator()(kafka::connection_context::session_resources)::{lambda(iobuf)#1}::operator()(iobuf)::{lambda()#1}>({lambda(iobuf)#1
}&&)::{lambda(seastar::internal::promise_base_with_type<void>&&, {lambda(iobuf)#1}&, seastar::future_state<seastar::internal::monostate>&&)#1}, void>::run_and_dispose() at /vectorized/include/seastar/core/future.hh:771
addr2line: DWARF error: could not find variable specification at offset 333c9
addr2line: DWARF error: could not find variable specification at offset 4afb7
addr2line: DWARF error: could not find variable specification at offset 4ead7
seastar::reactor::run_tasks(seastar::reactor::task_queue&) at /v/build/v_deps_build/seastar-prefix/src/seastar/src/core/reactor.cc:2263
 (inlined by) seastar::reactor::run_some_tasks() at /v/build/v_deps_build/seastar-prefix/src/seastar/src/core/reactor.cc:2672
seastar::reactor::run() at /v/build/v_deps_build/seastar-prefix/src/seastar/src/core/reactor.cc:2831
operator() at /v/build/v_deps_build/seastar-prefix/src/seastar/src/core/reactor.cc:4022
std::__1::__function::__value_func<void ()>::operator()() const at /vectorized/llvm/bin/../include/c++/v1/functional:1885
 (inlined by) std::__1::function<void ()>::operator()() const at /vectorized/llvm/bin/../include/c++/v1/functional:2560
 (inlined by) seastar::posix_thread::start_routine(void*) at /v/build/v_deps_build/seastar-prefix/src/seastar/src/core/posix.cc:60
addr2line: '/opt/redpanda/lib/libpthread.so.0': No such file
/opt/redpanda/lib/libpthread.so.0 0x9298                                                                                                                                                                                                                                                 addr2line: '/opt/redpanda/lib/libc.so.6': No such file
/opt/redpanda/lib/libc.so.6 0x1006a2

Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
  • Loading branch information
dotnwat committed Aug 8, 2021
1 parent 282a3c5 commit f906d20
Showing 1 changed file with 20 additions and 3 deletions.
23 changes: 20 additions & 3 deletions src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,28 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
* first stage processed in a foreground.
*/
return res.dispatched
.then([this,
.then_wrapped([this,
f = std::move(res.response),
seq,
correlation,
self,
s = std::move(sres)]() mutable {
s = std::move(sres)](ss::future<> d) mutable {
/*
* if the dispatch/first stage failed, then we need to
* need to consume the second stage since it might be
* an exceptional future. if we captured `f` in the
* lambda but didn't use `then_wrapped` then the
* lambda would be destroyed and an ignored
* exceptional future would be caught by seastar.
*/
if (d.failed()) {
return f.discard_result()
.handle_exception([](std::exception_ptr e) {
vlog(klog.info, "Discarding second stage failure {}", e);
}).finally([d = std::move(d)]() mutable {
return std::move(d);
});
}
/**
* second stage processed in background.
*/
Expand All @@ -281,9 +297,10 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
self->_rs.conn->shutdown_input();
})
.finally([s = std::move(s), self] {});
return d;
})
.handle_exception([self](std::exception_ptr e) {
vlog(klog.info, "Detected error processing request: {}", e);
vlog(klog.info, "Detected error dispatching request: {}", e);
self->_rs.conn->shutdown_input();
});
});
Expand Down

0 comments on commit f906d20

Please sign in to comment.