Skip to content

Commit

Permalink
listener: listen socket factory can be cloned from listener draining … (
Browse files Browse the repository at this point in the history
envoyproxy#18686)

Signed-off-by: Yuchen Dai <silentdai@gmail.com>
  • Loading branch information
lambdai committed Oct 20, 2021
1 parent 96701cb commit bc6bce8
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 2 deletions.
3 changes: 2 additions & 1 deletion source/server/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,8 @@ void ListenerImpl::createUdpListenerFilterChain(Network::UdpListenerFilterManage

void ListenerImpl::debugLog(const std::string& message) {
UNREFERENCED_PARAMETER(message);
ENVOY_LOG(debug, "{}: name={}, hash={}, address={}", message, name_, hash_, address_->asString());
ENVOY_LOG(debug, "{}: name={}, hash={}, tag={}, address={}", message, name_, hash_, listener_tag_,
address_->asString());
}

void ListenerImpl::initialize() {
Expand Down
17 changes: 17 additions & 0 deletions source/server/listener_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,23 @@ void ListenerManagerImpl::setNewOrDrainingSocketFactory(

if (existing_draining_listener != draining_listeners_.cend()) {
draining_listen_socket_factory = &existing_draining_listener->listener_->getSocketFactory();
existing_draining_listener->listener_->debugLog("clones listener sockets");
} else {
auto existing_draining_filter_chain = std::find_if(
draining_filter_chains_manager_.cbegin(), draining_filter_chains_manager_.cend(),
[&listener](const DrainingFilterChainsManager& draining_filter_chain) {
return draining_filter_chain.getDrainingListener()
.listenSocketFactory()
.getListenSocket(0)
->isOpen() &&
listener.hasCompatibleAddress(draining_filter_chain.getDrainingListener());
});

if (existing_draining_filter_chain != draining_filter_chains_manager_.cend()) {
draining_listen_socket_factory =
&existing_draining_filter_chain->getDrainingListener().getSocketFactory();
existing_draining_filter_chain->getDrainingListener().debugLog("clones listener socket");
}
}

listener.setSocketFactory(draining_listen_socket_factory != nullptr
Expand Down
2 changes: 1 addition & 1 deletion source/server/listener_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class DrainingFilterChainsManager {
const std::list<const Network::FilterChain*>& getDrainingFilterChains() const {
return draining_filter_chains_;
}
ListenerImpl& getDrainingListener() { return *draining_listener_; }
ListenerImpl& getDrainingListener() const { return *draining_listener_; }
uint64_t decWorkersPendingRemoval() { return --workers_pending_removal_; }

// Schedule listener destroy.
Expand Down
95 changes: 95 additions & 0 deletions test/server/listener_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5034,6 +5034,101 @@ traffic_direction: INBOUND
EXPECT_CALL(*listener_foo_update1, onDestroy());
}

TEST_F(ListenerManagerImplTest, ListenSocketFactoryIsClonedFromListenerDrainingFilterChain) {
InSequence s;

EXPECT_CALL(*worker_, start(_, _));
manager_->startWorkers(guard_dog_, callback_.AsStdFunction());

// Add foo listener.
const std::string listener_foo_yaml = R"EOF(
name: foo
traffic_direction: INBOUND
address:
socket_address:
address: 127.0.0.1
port_value: 1234
filter_chains:
- filters: []
)EOF";

ListenerHandle* listener_foo = expectListenerCreate(true, true);
EXPECT_CALL(listener_factory_, createListenSocket(_, _, _, default_bind_type, 0));
EXPECT_CALL(listener_foo->target_, initialize());
EXPECT_TRUE(manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_foo_yaml), "", true));
checkStats(__LINE__, 1, 0, 0, 1, 0, 0, 0);
EXPECT_CALL(*worker_, addListener(_, _, _));
listener_foo->target_.ready();
worker_->callAddCompletion();
EXPECT_EQ(1UL, manager_->listeners().size());

// Update foo into warming.
const std::string listener_foo_update1_yaml = R"EOF(
name: foo
traffic_direction: INBOUND
address:
socket_address:
address: 127.0.0.1
port_value: 1234
filter_chains:
- filters:
filter_chain_match:
destination_port: 1234
)EOF";

ListenerHandle* listener_foo_update1 = expectListenerOverridden(true, listener_foo);
EXPECT_CALL(*listener_factory_.socket_, duplicate());
EXPECT_CALL(listener_foo_update1->target_, initialize());
EXPECT_TRUE(
manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_foo_update1_yaml), "", true));
EXPECT_EQ(1UL, manager_->listeners().size());
EXPECT_EQ(1, server_.stats_store_.counter("listener_manager.listener_in_place_updated").value());
checkStats(__LINE__, 1, 1, 0, 1, 1, 0, 0);

// The warmed up starts the drain timer.
EXPECT_CALL(*worker_, addListener(_, _, _));
EXPECT_CALL(server_.options_, drainTime()).WillOnce(Return(std::chrono::seconds(600)));
Event::MockTimer* filter_chain_drain_timer = new Event::MockTimer(&server_.dispatcher_);
EXPECT_CALL(*filter_chain_drain_timer, enableTimer(std::chrono::milliseconds(600000), _));
listener_foo_update1->target_.ready();
checkStats(__LINE__, 1, 1, 0, 0, 1, 0, 1);
EXPECT_CALL(*worker_, removeFilterChains(_, _, _));
filter_chain_drain_timer->invokeCallback();

// Stop the active listener listener_foo_update1.
std::function<void()> stop_completion;
EXPECT_CALL(*worker_, stopListener(_, _))
.WillOnce(Invoke(
[&stop_completion](Network::ListenerConfig&, std::function<void()> completion) -> void {
ASSERT_TRUE(completion != nullptr);
stop_completion = std::move(completion);
}));
EXPECT_CALL(*listener_foo_update1->drain_manager_, startDrainSequence(_));
EXPECT_TRUE(manager_->removeListener("foo"));

EXPECT_CALL(*worker_, removeListener(_, _));
listener_foo_update1->drain_manager_->drain_sequence_completion_();

EXPECT_CALL(*listener_foo_update1, onDestroy());
worker_->callRemovalCompletion();

// The snapshot of the listener manager is
// 1) listener_foo is draining filter chain. The listen socket is open.
// 2) No listen is active. Note that listener_foo_update1 is stopped.
//
// The next step is to add a listener on the same socket address and the listen socket of
// listener_foo will be duplicated.
auto listener_foo_expect_reuse_socket = expectListenerCreate(true, true);
EXPECT_CALL(listener_factory_, createListenSocket(_, _, _, _, _)).Times(0);
EXPECT_CALL(*listener_factory_.socket_, duplicate());
EXPECT_CALL(listener_foo_expect_reuse_socket->target_, initialize());
EXPECT_TRUE(
manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_foo_yaml), "version1", true));

EXPECT_CALL(*listener_foo, onDestroy());
EXPECT_CALL(*listener_foo_expect_reuse_socket, onDestroy());
}

TEST(ListenerMessageUtilTest, ListenerMessageSameAreEquivalent) {
envoy::config::listener::v3::Listener listener1;
envoy::config::listener::v3::Listener listener2;
Expand Down

0 comments on commit bc6bce8

Please sign in to comment.