Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

listener: listen socket factory can be cloned from listener draining … #18686

Merged
merged 2 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion source/server/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,8 @@ void ListenerImpl::createUdpListenerFilterChain(Network::UdpListenerFilterManage

void ListenerImpl::debugLog(const std::string& message) {
UNREFERENCED_PARAMETER(message);
ENVOY_LOG(debug, "{}: name={}, hash={}, address={}", message, name_, hash_, address_->asString());
ENVOY_LOG(debug, "{}: name={}, hash={}, tag={}, address={}", message, name_, hash_, listener_tag_,
address_->asString());
}

void ListenerImpl::initialize() {
Expand Down
17 changes: 17 additions & 0 deletions source/server/listener_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,23 @@ void ListenerManagerImpl::setNewOrDrainingSocketFactory(

if (existing_draining_listener != draining_listeners_.cend()) {
draining_listen_socket_factory = &existing_draining_listener->listener_->getSocketFactory();
existing_draining_listener->listener_->debugLog("clones listener sockets");
} else {
auto existing_draining_filter_chain = std::find_if(
draining_filter_chains_manager_.cbegin(), draining_filter_chains_manager_.cend(),
[&listener](const DrainingFilterChainsManager& draining_filter_chain) {
return draining_filter_chain.getDrainingListener()
.listenSocketFactory()
.getListenSocket(0)
->isOpen() &&
listener.hasCompatibleAddress(draining_filter_chain.getDrainingListener());
});

if (existing_draining_filter_chain != draining_filter_chains_manager_.cend()) {
draining_listen_socket_factory =
&existing_draining_filter_chain->getDrainingListener().getSocketFactory();
existing_draining_filter_chain->getDrainingListener().debugLog("clones listener socket");
}
}

listener.setSocketFactory(draining_listen_socket_factory != nullptr
Expand Down
2 changes: 1 addition & 1 deletion source/server/listener_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class DrainingFilterChainsManager {
const std::list<const Network::FilterChain*>& getDrainingFilterChains() const {
return draining_filter_chains_;
}
ListenerImpl& getDrainingListener() { return *draining_listener_; }
ListenerImpl& getDrainingListener() const { return *draining_listener_; }
uint64_t decWorkersPendingRemoval() { return --workers_pending_removal_; }

// Schedule listener destroy.
Expand Down
95 changes: 95 additions & 0 deletions test/server/listener_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5034,6 +5034,101 @@ traffic_direction: INBOUND
EXPECT_CALL(*listener_foo_update1, onDestroy());
}

TEST_F(ListenerManagerImplTest, ListenSocketFactoryIsClonedFromListenerDrainingFilterChain) {
InSequence s;

EXPECT_CALL(*worker_, start(_, _));
manager_->startWorkers(guard_dog_, callback_.AsStdFunction());

// Add foo listener.
const std::string listener_foo_yaml = R"EOF(
name: foo
traffic_direction: INBOUND
address:
socket_address:
address: 127.0.0.1
port_value: 1234
filter_chains:
- filters: []
)EOF";

ListenerHandle* listener_foo = expectListenerCreate(true, true);
EXPECT_CALL(listener_factory_, createListenSocket(_, _, _, default_bind_type, 0));
EXPECT_CALL(listener_foo->target_, initialize());
EXPECT_TRUE(manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_foo_yaml), "", true));
checkStats(__LINE__, 1, 0, 0, 1, 0, 0, 0);
EXPECT_CALL(*worker_, addListener(_, _, _));
listener_foo->target_.ready();
worker_->callAddCompletion();
EXPECT_EQ(1UL, manager_->listeners().size());

// Update foo into warming.
const std::string listener_foo_update1_yaml = R"EOF(
name: foo
traffic_direction: INBOUND
address:
socket_address:
address: 127.0.0.1
port_value: 1234
filter_chains:
- filters:
filter_chain_match:
destination_port: 1234
)EOF";

ListenerHandle* listener_foo_update1 = expectListenerOverridden(true, listener_foo);
EXPECT_CALL(*listener_factory_.socket_, duplicate());
EXPECT_CALL(listener_foo_update1->target_, initialize());
EXPECT_TRUE(
manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_foo_update1_yaml), "", true));
EXPECT_EQ(1UL, manager_->listeners().size());
EXPECT_EQ(1, server_.stats_store_.counter("listener_manager.listener_in_place_updated").value());
checkStats(__LINE__, 1, 1, 0, 1, 1, 0, 0);

// The warmed up starts the drain timer.
EXPECT_CALL(*worker_, addListener(_, _, _));
EXPECT_CALL(server_.options_, drainTime()).WillOnce(Return(std::chrono::seconds(600)));
Event::MockTimer* filter_chain_drain_timer = new Event::MockTimer(&server_.dispatcher_);
EXPECT_CALL(*filter_chain_drain_timer, enableTimer(std::chrono::milliseconds(600000), _));
listener_foo_update1->target_.ready();
checkStats(__LINE__, 1, 1, 0, 0, 1, 0, 1);
EXPECT_CALL(*worker_, removeFilterChains(_, _, _));
filter_chain_drain_timer->invokeCallback();

// Stop the active listener listener_foo_update1.
std::function<void()> stop_completion;
EXPECT_CALL(*worker_, stopListener(_, _))
.WillOnce(Invoke(
[&stop_completion](Network::ListenerConfig&, std::function<void()> completion) -> void {
ASSERT_TRUE(completion != nullptr);
stop_completion = std::move(completion);
}));
EXPECT_CALL(*listener_foo_update1->drain_manager_, startDrainSequence(_));
EXPECT_TRUE(manager_->removeListener("foo"));

EXPECT_CALL(*worker_, removeListener(_, _));
listener_foo_update1->drain_manager_->drain_sequence_completion_();

EXPECT_CALL(*listener_foo_update1, onDestroy());
worker_->callRemovalCompletion();

// The snapshot of the listener manager is
// 1) listener_foo is draining filter chain. The listen socket is open.
// 2) No listen is active. Note that listener_foo_update1 is stopped.
//
// The next step is to add a listener on the same socket address and the listen socket of
// listener_foo will be duplicated.
auto listener_foo_expect_reuse_socket = expectListenerCreate(true, true);
EXPECT_CALL(listener_factory_, createListenSocket(_, _, _, _, _)).Times(0);
EXPECT_CALL(*listener_factory_.socket_, duplicate());
EXPECT_CALL(listener_foo_expect_reuse_socket->target_, initialize());
EXPECT_TRUE(
manager_->addOrUpdateListener(parseListenerFromV3Yaml(listener_foo_yaml), "version1", true));

EXPECT_CALL(*listener_foo, onDestroy());
EXPECT_CALL(*listener_foo_expect_reuse_socket, onDestroy());
}

TEST(ListenerMessageUtilTest, ListenerMessageSameAreEquivalent) {
envoy::config::listener::v3::Listener listener1;
envoy::config::listener::v3::Listener listener2;
Expand Down