Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QUIC hot restart part 6 - child instance pauses listening until parent is drained (second try) #32664

Merged
merged 2 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions envoy/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "parent_drained_callback_registrar_interface",
hdrs = ["parent_drained_callback_registrar.h"],
deps = [":address_interface"],
)

envoy_cc_library(
name = "udp_packet_writer_handler_interface",
hdrs = ["udp_packet_writer_handler.h"],
Expand Down
29 changes: 29 additions & 0 deletions envoy/network/parent_drained_callback_registrar.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

#include "envoy/network/address.h"

#include "absl/functional/any_invocable.h"

namespace Envoy {
namespace Network {

/**
* An interface through which a UDP listen socket, especially a QUIC socket, can
* postpone reading during hot restart until the parent instance is drained.
*/
class ParentDrainedCallbackRegistrar {
public:
/**
* @param address is the address of the listener.
* @param callback the function to call when the listener matching address is
* drained on the parent instance.
*/
virtual void registerParentDrainedCallback(const Address::InstanceConstSharedPtr& address,
absl::AnyInvocable<void()> callback) PURE;

protected:
virtual ~ParentDrainedCallbackRegistrar() = default;
};

} // namespace Network
} // namespace Envoy
7 changes: 7 additions & 0 deletions envoy/network/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,13 @@ class Socket {
* @return the socket options stored earlier with addOption() and addOptions() calls, if any.
*/
virtual const OptionsSharedPtr& options() const PURE;

/**
* @return a ParentDrainedCallbackRegistrar for UDP listen sockets during hot restart.
*/
virtual OptRef<class ParentDrainedCallbackRegistrar> parentDrainedCallbackRegistrar() const {
return absl::nullopt;
}
};

using SocketPtr = std::unique_ptr<Socket>;
Expand Down
11 changes: 11 additions & 0 deletions envoy/server/hot_restart.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ class HotRestart {
virtual void
registerUdpForwardingListener(Network::Address::InstanceConstSharedPtr address,
std::shared_ptr<Network::UdpListenerConfig> listener_config) PURE;

/**
* @return An interface on which registerParentDrainedCallback can be called during
* creation of a listener, or nullopt if there is no parent instance.
*
* If this is set, any UDP listener should start paused and only begin listening
* when the parent instance is drained; this allows draining QUIC listeners to
* catch their own packets and forward unrecognized packets to the child instance.
*/
virtual OptRef<Network::ParentDrainedCallbackRegistrar> parentDrainedCallbackRegistrar() PURE;

/**
* Initialize the parent logic of our restarter. Meant to be called after initialization of a
* new child has begun. The hot restart implementation needs to be created early to deal with
Expand Down
5 changes: 4 additions & 1 deletion source/common/listener_manager/listener_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,10 @@ Network::SocketSharedPtr ProdListenerComponentFactory::createListenSocket(
if (socket_type == Network::Socket::Type::Stream) {
return std::make_shared<Network::TcpListenSocket>(std::move(io_handle), address, options);
} else {
return std::make_shared<Network::UdpListenSocket>(std::move(io_handle), address, options);
auto socket = std::make_shared<Network::UdpListenSocket>(
std::move(io_handle), address, options,
server_.hotRestart().parentDrainedCallbackRegistrar());
return socket;
}
}
}
Expand Down
1 change: 1 addition & 0 deletions source/common/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ envoy_cc_library(
"//envoy/event:file_event_interface",
"//envoy/network:exception_interface",
"//envoy/network:listener_interface",
"//envoy/network:parent_drained_callback_registrar_interface",
"//envoy/runtime:runtime_interface",
"//envoy/stats:stats_interface",
"//envoy/stats:stats_macros",
Expand Down
19 changes: 16 additions & 3 deletions source/common/network/listen_socket_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,19 @@ template <typename T> class NetworkListenSocket : public ListenSocketImpl {
}
}

NetworkListenSocket(IoHandlePtr&& io_handle, const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options)
: ListenSocketImpl(std::move(io_handle), address) {
NetworkListenSocket(
IoHandlePtr&& io_handle, const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options,
OptRef<ParentDrainedCallbackRegistrar> parent_drained_callback_registrar = absl::nullopt)
: ListenSocketImpl(std::move(io_handle), address),
parent_drained_callback_registrar_(parent_drained_callback_registrar) {
setListenSocketOptions(options);
}

OptRef<ParentDrainedCallbackRegistrar> parentDrainedCallbackRegistrar() const override {
return parent_drained_callback_registrar_;
}

Socket::Type socketType() const override { return T::type; }

SocketPtr duplicate() override {
Expand Down Expand Up @@ -110,6 +117,12 @@ template <typename T> class NetworkListenSocket : public ListenSocketImpl {
}

protected:
// Usually a socket when initialized starts listening for ready-to-read or ready-to-write events;
// for a QUIC socket during hot restart this is undesirable as the parent instance needs to
// receive all packets; in that case this interface is set, and listening won't begin until the
// callback is called.
OptRef<ParentDrainedCallbackRegistrar> parent_drained_callback_registrar_;

void setPrebindSocketOptions() {
// On Windows, SO_REUSEADDR does not restrict subsequent bind calls when there is a listener as
// on Linux and later BSD socket stacks.
Expand Down
40 changes: 37 additions & 3 deletions source/common/network/udp_listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "envoy/common/platform.h"
#include "envoy/config/core/v3/base.pb.h"
#include "envoy/network/exception.h"
#include "envoy/network/parent_drained_callback_registrar.h"

#include "source/common/api/os_sys_calls_impl.h"
#include "source/common/common/assert.h"
Expand All @@ -34,20 +35,53 @@ UdpListenerImpl::UdpListenerImpl(Event::Dispatcher& dispatcher, SocketSharedPtr
: BaseListenerImpl(dispatcher, std::move(socket)), cb_(cb), time_source_(time_source),
// Default prefer_gro to false for downstream server traffic.
config_(config, false) {
parent_drained_callback_registrar_ = socket_->parentDrainedCallbackRegistrar();
socket_->ioHandle().initializeFileEvent(
dispatcher, [this](uint32_t events) -> void { onSocketEvent(events); },
Event::PlatformDefaultTriggerType, Event::FileReadyType::Read | Event::FileReadyType::Write);
Event::PlatformDefaultTriggerType, paused() ? 0 : events_when_unpaused_);
if (paused()) {
parent_drained_callback_registrar_->registerParentDrainedCallback(
socket_->connectionInfoProvider().localAddress(),
[this, &dispatcher, alive = std::weak_ptr<void>(destruction_checker_)]() {
dispatcher.post([this, alive = std::move(alive)]() {
auto still_alive = alive.lock();
if (still_alive != nullptr) {
unpause();
}
});
});
}
}

void UdpListenerImpl::unpause() {
// Remove the paused state so enable will actually start listening to events.
parent_drained_callback_registrar_ = absl::nullopt;
if (events_when_unpaused_ != 0) {
// Start listening to events.
enable();
// There may have already been events while this instance was ignoring them,
// so try reading immediately.
activateRead();
}
}

UdpListenerImpl::~UdpListenerImpl() { socket_->ioHandle().resetFileEvents(); }

void UdpListenerImpl::disable() { disableEvent(); }

void UdpListenerImpl::enable() {
socket_->ioHandle().enableFileEvents(Event::FileReadyType::Read | Event::FileReadyType::Write);
events_when_unpaused_ = Event::FileReadyType::Read | Event::FileReadyType::Write;
if (!paused()) {
socket_->ioHandle().enableFileEvents(events_when_unpaused_);
}
}

void UdpListenerImpl::disableEvent() { socket_->ioHandle().enableFileEvents(0); }
void UdpListenerImpl::disableEvent() {
events_when_unpaused_ = 0;
if (!paused()) {
socket_->ioHandle().enableFileEvents(0);
}
}

void UdpListenerImpl::onSocketEvent(short flags) {
ASSERT((flags & (Event::FileReadyType::Read | Event::FileReadyType::Write)));
Expand Down
6 changes: 6 additions & 0 deletions source/common/network/udp_listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class UdpListenerImpl : public BaseListenerImpl,
TimeSource& time_source, const envoy::config::core::v3::UdpSocketConfig& config);
~UdpListenerImpl() override;
uint32_t packetsDropped() { return packets_dropped_; }
bool paused() const { return parent_drained_callback_registrar_ != absl::nullopt; }
void unpause();

// Network::Listener
void disable() override;
Expand Down Expand Up @@ -63,6 +65,10 @@ class UdpListenerImpl : public BaseListenerImpl,

TimeSource& time_source_;
const ResolvedUdpSocketConfig config_;
OptRef<ParentDrainedCallbackRegistrar> parent_drained_callback_registrar_;
// Taking a weak_ptr to this lets us detect if the listener has been destroyed.
std::shared_ptr<bool> destruction_checker_ = std::make_shared<bool>(true);
uint32_t events_when_unpaused_ = Event::FileReadyType::Read | Event::FileReadyType::Write;
};

class UdpListenerWorkerRouterImpl : public UdpListenerWorkerRouter {
Expand Down
1 change: 1 addition & 0 deletions source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ envoy_cc_library(
hdrs = envoy_select_hot_restart(["hot_restarting_child.h"]),
deps = [
":hot_restarting_base",
"//envoy/network:parent_drained_callback_registrar_interface",
"//source/common/stats:stat_merger_lib",
],
)
Expand Down
4 changes: 4 additions & 0 deletions source/server/hot_restart_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ void HotRestartImpl::registerUdpForwardingListener(
as_child_.registerUdpForwardingListener(address, listener_config);
}

OptRef<Network::ParentDrainedCallbackRegistrar> HotRestartImpl::parentDrainedCallbackRegistrar() {
return as_child_;
}

void HotRestartImpl::initialize(Event::Dispatcher& dispatcher, Server::Instance& server) {
as_parent_.initialize(dispatcher, server);
as_child_.initialize(dispatcher);
Expand Down
1 change: 1 addition & 0 deletions source/server/hot_restart_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class HotRestartImpl : public HotRestart {
void registerUdpForwardingListener(
Network::Address::InstanceConstSharedPtr address,
std::shared_ptr<Network::UdpListenerConfig> listener_config) override;
OptRef<Network::ParentDrainedCallbackRegistrar> parentDrainedCallbackRegistrar() override;
void initialize(Event::Dispatcher& dispatcher, Server::Instance& server) override;
absl::optional<AdminShutdownResponse> sendParentAdminShutdownRequest() override;
void sendParentTerminateRequest() override;
Expand Down
3 changes: 3 additions & 0 deletions source/server/hot_restart_nop_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class HotRestartNopImpl : public Server::HotRestart {
int duplicateParentListenSocket(const std::string&, uint32_t) override { return -1; }
void registerUdpForwardingListener(Network::Address::InstanceConstSharedPtr,
std::shared_ptr<Network::UdpListenerConfig>) override {}
OptRef<Network::ParentDrainedCallbackRegistrar> parentDrainedCallbackRegistrar() override {
return absl::nullopt;
}
void initialize(Event::Dispatcher&, Server::Instance&) override {}
absl::optional<AdminShutdownResponse> sendParentAdminShutdownRequest() override {
return absl::nullopt;
Expand Down
55 changes: 41 additions & 14 deletions source/server/hot_restarting_child.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ HotRestartingChild::UdpForwardingContext::getListenerForDestination(
return it->second;
}

// If restart_epoch is 0 there is no parent, so it's effectively already
// drained and terminated.
HotRestartingChild::HotRestartingChild(int base_id, int restart_epoch,
const std::string& socket_path, mode_t socket_mode)
: HotRestartingBase(base_id), restart_epoch_(restart_epoch) {
: HotRestartingBase(base_id), restart_epoch_(restart_epoch),
parent_terminated_(restart_epoch == 0), parent_drained_(restart_epoch == 0) {
main_rpc_stream_.initDomainSocketAddress(&parent_address_);
std::string socket_path_udp = socket_path + "_udp";
udp_forwarding_rpc_stream_.initDomainSocketAddress(&parent_address_udp_forwarding_);
Expand Down Expand Up @@ -102,7 +105,7 @@ void HotRestartingChild::onForwardedUdpPacket(uint32_t worker_index, Network::Ud

int HotRestartingChild::duplicateParentListenSocket(const std::string& address,
uint32_t worker_index) {
if (restart_epoch_ == 0 || parent_terminated_) {
if (parent_terminated_) {
return -1;
}

Expand All @@ -121,7 +124,7 @@ int HotRestartingChild::duplicateParentListenSocket(const std::string& address,
}

std::unique_ptr<HotRestartMessage> HotRestartingChild::getParentStats() {
if (restart_epoch_ == 0 || parent_terminated_) {
if (parent_terminated_) {
return nullptr;
}

Expand All @@ -138,7 +141,7 @@ std::unique_ptr<HotRestartMessage> HotRestartingChild::getParentStats() {
}

void HotRestartingChild::drainParentListeners() {
if (restart_epoch_ == 0 || parent_terminated_) {
if (parent_terminated_) {
return;
}
// No reply expected.
Expand All @@ -154,9 +157,29 @@ void HotRestartingChild::registerUdpForwardingListener(
udp_forwarding_context_.registerListener(address, listener_config);
}

void HotRestartingChild::registerParentDrainedCallback(
const Network::Address::InstanceConstSharedPtr& address, absl::AnyInvocable<void()> callback) {
absl::MutexLock lock(&registry_mu_);
if (parent_drained_) {
callback();
} else {
on_drained_actions_.emplace(address->asString(), std::move(callback));
}
}

void HotRestartingChild::allDrainsImplicitlyComplete() {
absl::MutexLock lock(&registry_mu_);
for (auto& drain_action : on_drained_actions_) {
// Call the callback.
std::move(drain_action.second)();
}
on_drained_actions_.clear();
parent_drained_ = true;
}

absl::optional<HotRestart::AdminShutdownResponse>
HotRestartingChild::sendParentAdminShutdownRequest() {
if (restart_epoch_ == 0 || parent_terminated_) {
if (parent_terminated_) {
return absl::nullopt;
}

Expand All @@ -176,25 +199,29 @@ HotRestartingChild::sendParentAdminShutdownRequest() {
}

void HotRestartingChild::sendParentTerminateRequest() {
if (restart_epoch_ == 0 || parent_terminated_) {
if (parent_terminated_) {
return;
}
allDrainsImplicitlyComplete();

HotRestartMessage wrapped_request;
wrapped_request.mutable_request()->mutable_terminate();
main_rpc_stream_.sendHotRestartMessage(parent_address_, wrapped_request);
parent_terminated_ = true;

// Note that the 'generation' counter needs to retain the contribution from
// the parent.
stat_merger_->retainParentGaugeValue(hot_restart_generation_stat_name_);
if (stat_merger_ != nullptr) {
stat_merger_->retainParentGaugeValue(hot_restart_generation_stat_name_);

// Now it is safe to forget our stat transferral state.
//
// This destruction is actually important far beyond memory efficiency. The
// scope-based temporary counter logic relies on the StatMerger getting
// destroyed once hot restart's stat merging is all done. (See stat_merger.h
// for details).
stat_merger_.reset();
// Now it is safe to forget our stat transferral state.
//
// This destruction is actually important far beyond memory efficiency. The
// scope-based temporary counter logic relies on the StatMerger getting
// destroyed once hot restart's stat merging is all done. (See stat_merger.h
// for details).
stat_merger_.reset();
}
}

void HotRestartingChild::mergeParentStats(Stats::Store& stats_store,
Expand Down
Loading
Loading