diff --git a/source/server/listener_impl.cc b/source/server/listener_impl.cc index 943ba20d4c18..26ecc8417761 100644 --- a/source/server/listener_impl.cc +++ b/source/server/listener_impl.cc @@ -740,7 +740,8 @@ void ListenerImpl::createUdpListenerFilterChain(Network::UdpListenerFilterManage void ListenerImpl::debugLog(const std::string& message) { UNREFERENCED_PARAMETER(message); - ENVOY_LOG(debug, "{}: name={}, hash={}, address={}", message, name_, hash_, address_->asString()); + ENVOY_LOG(debug, "{}: name={}, hash={}, tag={}, address={}", message, name_, hash_, listener_tag_, + address_->asString()); } void ListenerImpl::initialize() { diff --git a/source/server/listener_manager_impl.cc b/source/server/listener_manager_impl.cc index afd6ebd142d6..c1e27ee9db29 100644 --- a/source/server/listener_manager_impl.cc +++ b/source/server/listener_manager_impl.cc @@ -984,6 +984,23 @@ void ListenerManagerImpl::setNewOrDrainingSocketFactory( if (existing_draining_listener != draining_listeners_.cend()) { draining_listen_socket_factory = &existing_draining_listener->listener_->getSocketFactory(); + existing_draining_listener->listener_->debugLog("clones listener sockets"); + } else { + auto existing_draining_filter_chain = std::find_if( + draining_filter_chains_manager_.cbegin(), draining_filter_chains_manager_.cend(), + [&listener](const DrainingFilterChainsManager& draining_filter_chain) { + return draining_filter_chain.getDrainingListener() + .listenSocketFactory() + .getListenSocket(0) + ->isOpen() && + listener.hasCompatibleAddress(draining_filter_chain.getDrainingListener()); + }); + + if (existing_draining_filter_chain != draining_filter_chains_manager_.cend()) { + draining_listen_socket_factory = + &existing_draining_filter_chain->getDrainingListener().getSocketFactory(); + existing_draining_filter_chain->getDrainingListener().debugLog("clones listener socket"); + } } listener.setSocketFactory(draining_listen_socket_factory != nullptr diff --git a/source/server/listener_manager_impl.h b/source/server/listener_manager_impl.h index ad8505467195..3401b71ad43a 100644 --- a/source/server/listener_manager_impl.h +++ b/source/server/listener_manager_impl.h @@ -143,7 +143,7 @@ class DrainingFilterChainsManager { const std::list& getDrainingFilterChains() const { return draining_filter_chains_; } - ListenerImpl& getDrainingListener() { return *draining_listener_; } + ListenerImpl& getDrainingListener() const { return *draining_listener_; } uint64_t decWorkersPendingRemoval() { return --workers_pending_removal_; } // Schedule listener destroy. diff --git a/test/server/listener_manager_impl_test.cc b/test/server/listener_manager_impl_test.cc index 8bf8e1388054..646f28d853d8 100644 --- a/test/server/listener_manager_impl_test.cc +++ b/test/server/listener_manager_impl_test.cc @@ -5034,6 +5034,101 @@ traffic_direction: INBOUND EXPECT_CALL(*listener_foo_update1, onDestroy()); } +TEST_F(ListenerManagerImplTest, ListenSocketFactoryIsClonedFromListenerDrainingFilterChain) { + InSequence s; + + EXPECT_CALL(*worker_, start(_, _)); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); + + // Add foo listener. + const std::string listener_foo_yaml = R"EOF( +name: foo +traffic_direction: INBOUND +address: + socket_address: + address: 127.0.0.1 + port_value: 1234 +filter_chains: +- filters: [] + )EOF"; + + ListenerHandle* listener_foo = expectListenerCreate(true, true); + EXPECT_CALL(listener_factory_, createListenSocket(_, _, _, default_bind_type, 0)); + EXPECT_CALL(listener_foo->target_, initialize()); + EXPECT_TRUE(manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_foo_yaml), "", true)); + checkStats(__LINE__, 1, 0, 0, 1, 0, 0, 0); + EXPECT_CALL(*worker_, addListener(_, _, _)); + listener_foo->target_.ready(); + worker_->callAddCompletion(); + EXPECT_EQ(1UL, manager_->listeners().size()); + + // Update foo into warming. + const std::string listener_foo_update1_yaml = R"EOF( +name: foo +traffic_direction: INBOUND +address: + socket_address: + address: 127.0.0.1 + port_value: 1234 +filter_chains: +- filters: + filter_chain_match: + destination_port: 1234 + )EOF"; + + ListenerHandle* listener_foo_update1 = expectListenerOverridden(true, listener_foo); + EXPECT_CALL(*listener_factory_.socket_, duplicate()); + EXPECT_CALL(listener_foo_update1->target_, initialize()); + EXPECT_TRUE( + manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_foo_update1_yaml), "", true)); + EXPECT_EQ(1UL, manager_->listeners().size()); + EXPECT_EQ(1, server_.stats_store_.counter("listener_manager.listener_in_place_updated").value()); + checkStats(__LINE__, 1, 1, 0, 1, 1, 0, 0); + + // The warmed up starts the drain timer. + EXPECT_CALL(*worker_, addListener(_, _, _)); + EXPECT_CALL(server_.options_, drainTime()).WillOnce(Return(std::chrono::seconds(600))); + Event::MockTimer* filter_chain_drain_timer = new Event::MockTimer(&server_.dispatcher_); + EXPECT_CALL(*filter_chain_drain_timer, enableTimer(std::chrono::milliseconds(600000), _)); + listener_foo_update1->target_.ready(); + checkStats(__LINE__, 1, 1, 0, 0, 1, 0, 1); + EXPECT_CALL(*worker_, removeFilterChains(_, _, _)); + filter_chain_drain_timer->invokeCallback(); + + // Stop the active listener listener_foo_update1. + std::function stop_completion; + EXPECT_CALL(*worker_, stopListener(_, _)) + .WillOnce(Invoke( + [&stop_completion](Network::ListenerConfig&, std::function completion) -> void { + ASSERT_TRUE(completion != nullptr); + stop_completion = std::move(completion); + })); + EXPECT_CALL(*listener_foo_update1->drain_manager_, startDrainSequence(_)); + EXPECT_TRUE(manager_->removeListener("foo")); + + EXPECT_CALL(*worker_, removeListener(_, _)); + listener_foo_update1->drain_manager_->drain_sequence_completion_(); + + EXPECT_CALL(*listener_foo_update1, onDestroy()); + worker_->callRemovalCompletion(); + + // The snapshot of the listener manager is + // 1) listener_foo is draining filter chain. The listen socket is open. + // 2) No listen is active. Note that listener_foo_update1 is stopped. + // + // The next step is to add a listener on the same socket address and the listen socket of + // listener_foo will be duplicated. + auto listener_foo_expect_reuse_socket = expectListenerCreate(true, true); + EXPECT_CALL(listener_factory_, createListenSocket(_, _, _, _, _)).Times(0); + EXPECT_CALL(*listener_factory_.socket_, duplicate()); + EXPECT_CALL(listener_foo_expect_reuse_socket->target_, initialize()); + EXPECT_TRUE( + manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_foo_yaml), "version1", true)); + + EXPECT_CALL(*listener_foo, onDestroy()); + EXPECT_CALL(*listener_foo_expect_reuse_socket, onDestroy()); +} + TEST(ListenerMessageUtilTest, ListenerMessageSameAreEquivalent) { envoy::config::listener::v3::Listener listener1; envoy::config::listener::v3::Listener listener2;