From f906d20acd794c9e4a9b6318f6448ecdd2aa7a1a Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Sun, 8 Aug 2021 10:29:57 -0700 Subject: [PATCH] kafka: fix ignored exceptional future in staged handler When a continuation like `.then([..] {...})` runs the lambda parameter is not executed if it is a attached to a failed future. In this case the lambda will be destroyed without running. In the case of kafka two stage dispatch handling, we had the following: first_stage.then([f = std::move(second_stage)] { ... }).then(...); and we encountered a situation where the first and second stages failed. when the continuation attached to the first_stage ran the captured `f` exceptional future was destroyed (via the lambda that captured it). if the first stage had not failed, then the lambda had proper handling for a failed `f`. This is an instance of a class of bug that is pretty hard to address. In this case it was a bad_alloc that was thrown which caused the issue, not an exceptional case that we explicitly handle. to fix this we check if the first stage failed, and if so, we explicitly consume the second stage to avoid this case. if the first stage succeeds, then we operate as normal and background the second stage. Aug 07 19:53:30 ip-172-31-45-184 rpk[21830]: WARN 2021-08-07 19:53:30,680 [shard 30] raft - [group_id:3069, {kafka/foo3/4}] replicate_entries_stm.cc:100 - Error while replicating entries std::bad_alloc (std::bad_alloc) Aug 07 19:53:30 ip-172-31-45-184 rpk[21830]: WARN 2021-08-07 19:53:30,680 [shard 31] seastar - Exceptional future ignored: std::bad_alloc (std::bad_alloc), backtrace: [Backtrace #0] void seastar::backtrace(seastar::current_backtrace_tasklocal()::$_3&&) at /v/build/v_deps_build/seastar-prefix/src/seastar/include/seastar/util/backtrace.hh:59 (inlined by) seastar::current_backtrace_tasklocal() at /v/build/v_deps_build/seastar-prefix/src/seastar/src/util/backtrace.cc:86 (inlined by) seastar::current_tasktrace() at /v/build/v_deps_build/seastar-prefix/src/seastar/src/util/backtrace.cc:137 (inlined by) seastar::current_backtrace() at /v/build/v_deps_build/seastar-prefix/src/seastar/src/util/backtrace.cc:170 seastar::report_failed_future(std::exception_ptr const&) at /v/build/v_deps_build/seastar-prefix/src/seastar/src/core/future.cc:210 (inlined by) seastar::report_failed_future(seastar::future_state_base::any&&) at /v/build/v_deps_build/seastar-prefix/src/seastar/src/core/future.cc:218 addr2line: DWARF error: could not find variable specification at offset a8a addr2line: DWARF error: could not find variable specification at offset c49 addr2line: DWARF error: could not find variable specification at offset 46223 addr2line: DWARF error: could not find variable specification at offset 4d49d addr2line: DWARF error: could not find variable specification at offset 5f261 seastar::future_state_base::any::check_failure() at /vectorized/include/seastar/core/future.hh:567 (inlined by) seastar::future_state > > >::clear() at /vectorized/include/seastar/core/future.hh:609 (inlined by) ~future_state at /vectorized/include/seastar/core/future.hh:614 (inlined by) ~future at /vectorized/include/seastar/core/future.hh:1337 (inlined by) ~ at /var/lib/buildkite-agent/builds/buildkite-amd64-builders-i-0eeaee3e12186c1d5-1/vectorized/redpanda/vbuild/release/clang/../../../src/v/kafka/server/connection_context.cc:257 (inlined by) ~continuation at /vectorized/include/seastar/core/future.hh:750 (inlined by) seastar::continuation, kafka::connection_context::dispatch_method_once(kafka::request_header, unsigned long)::$_0::operator()(kafka::connection_context::session_resources)::{lambda(iobuf)#1}::operator()(iobuf)::{lambda( )#1}, seastar::future::then_impl_nrvo<{lambda(iobuf)#1}, kafka::connection_context::dispatch_method_once(kafka::request_header, unsigned long)::$_0::operator()(kafka::connection_context::session_resources)::{lambda(iobuf)#1}::operator()(iobuf)::{lambda()#1}>({lambda(iobuf)#1 }&&)::{lambda(seastar::internal::promise_base_with_type&&, {lambda(iobuf)#1}&, seastar::future_state&&)#1}, void>::run_and_dispose() at /vectorized/include/seastar/core/future.hh:771 addr2line: DWARF error: could not find variable specification at offset 333c9 addr2line: DWARF error: could not find variable specification at offset 4afb7 addr2line: DWARF error: could not find variable specification at offset 4ead7 seastar::reactor::run_tasks(seastar::reactor::task_queue&) at /v/build/v_deps_build/seastar-prefix/src/seastar/src/core/reactor.cc:2263 (inlined by) seastar::reactor::run_some_tasks() at /v/build/v_deps_build/seastar-prefix/src/seastar/src/core/reactor.cc:2672 seastar::reactor::run() at /v/build/v_deps_build/seastar-prefix/src/seastar/src/core/reactor.cc:2831 operator() at /v/build/v_deps_build/seastar-prefix/src/seastar/src/core/reactor.cc:4022 std::__1::__function::__value_func::operator()() const at /vectorized/llvm/bin/../include/c++/v1/functional:1885 (inlined by) std::__1::function::operator()() const at /vectorized/llvm/bin/../include/c++/v1/functional:2560 (inlined by) seastar::posix_thread::start_routine(void*) at /v/build/v_deps_build/seastar-prefix/src/seastar/src/core/posix.cc:60 addr2line: '/opt/redpanda/lib/libpthread.so.0': No such file /opt/redpanda/lib/libpthread.so.0 0x9298 addr2line: '/opt/redpanda/lib/libc.so.6': No such file /opt/redpanda/lib/libc.so.6 0x1006a2 Signed-off-by: Noah Watkins --- src/v/kafka/server/connection_context.cc | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 1e098478920da..a66c4873b4a53 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -254,12 +254,28 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { * first stage processed in a foreground. */ return res.dispatched - .then([this, + .then_wrapped([this, f = std::move(res.response), seq, correlation, self, - s = std::move(sres)]() mutable { + s = std::move(sres)](ss::future<> d) mutable { + /* + * if the dispatch/first stage failed, then we need to + * need to consume the second stage since it might be + * an exceptional future. if we captured `f` in the + * lambda but didn't use `then_wrapped` then the + * lambda would be destroyed and an ignored + * exceptional future would be caught by seastar. + */ + if (d.failed()) { + return f.discard_result() + .handle_exception([](std::exception_ptr e) { + vlog(klog.info, "Discarding second stage failure {}", e); + }).finally([d = std::move(d)]() mutable { + return std::move(d); + }); + } /** * second stage processed in background. */ @@ -281,9 +297,10 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { self->_rs.conn->shutdown_input(); }) .finally([s = std::move(s), self] {}); + return d; }) .handle_exception([self](std::exception_ptr e) { - vlog(klog.info, "Detected error processing request: {}", e); + vlog(klog.info, "Detected error dispatching request: {}", e); self->_rs.conn->shutdown_input(); }); });