diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc index 5be22ea8dacf3..01e0852179c5b 100644 --- a/src/v/kafka/server/handlers/produce.cc +++ b/src/v/kafka/server/handlers/produce.cc @@ -598,9 +598,25 @@ produce_handler::handle(request_context ctx, ss::smp_service_group ssg) { octx.response))); }); } catch (...) { + /* + * if the first stage failed then we cannot resolve the + * current future (do_with holding octx) immediately, + * otherwise octx will be destroyed and all of the second + * stage futures (which have a reference to octx) will be + * backgrounded. logging about the second stage return value + * is handled in connection_context handler. + */ dispatched_promise.set_exception(std::current_exception()); - return ss::make_exception_future( - std::current_exception()); + return when_all_succeed(produced.begin(), produced.end()) + .discard_result() + .then([] { + return ss::make_exception_future( + std::runtime_error("First stage produce failed but " + "second stage succeeded.")); + }) + .handle_exception([](std::exception_ptr e) { + return ss::make_exception_future(e); + }); } }); });