Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: FakeUpstream threading fixes #14526

Merged
merged 18 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
512610f
test: FakeUpstream threading fixes
antoniovicente Dec 24, 2020
19a63f7
address review comments
antoniovicente Dec 29, 2020
5f767cf
fix flakiness in hds_integration_test
antoniovicente Dec 29, 2020
1bfa958
Merge remote-tracking branch 'upstream/master' into fake_upstream_thr…
antoniovicente Dec 29, 2020
45b8864
Fix wait for disconnect. Previously disconnect condition happened un…
antoniovicente Jan 6, 2021
3b653f2
fix use after free due to connection possibly being deleted.
antoniovicente Jan 7, 2021
ee052d6
fix more use-after-free
antoniovicente Jan 7, 2021
e9f4597
fix flaky test
antoniovicente Jan 7, 2021
3c55b38
Merge remote-tracking branch 'upstream/master' into fake_upstream_thr…
antoniovicente Jan 8, 2021
495b19e
add logging to debug macos failure
antoniovicente Jan 9, 2021
6580555
Wait outside the upstream lock since holding that lock is not needed.
antoniovicente Jan 11, 2021
f90610f
Symbolize names of VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS test cases.
antoniovicente Jan 11, 2021
22211c9
also stringify arguments to other VERSIONED_GRPC_CLIENT_INTEGRATION_P…
antoniovicente Jan 11, 2021
0d4fc3d
Fix issue of timeouts waiting for disconnect on macos while also avoi…
antoniovicente Jan 13, 2021
d0565e5
address review comments
antoniovicente Jan 15, 2021
197362f
revert back to the original wait for a raw connection followed by che…
antoniovicente Jan 16, 2021
3aee6d7
add thread annotations to FakeConnectionBase::initialized_
antoniovicente Jan 19, 2021
8fccaf3
fix use-after-free in test framework
antoniovicente Jan 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,7 @@ TEST_P(ThriftConnManagerIntegrationTest, OnewayEarlyClosePartialRequest) {
ASSERT_TRUE(tcp_client->write(partial_request));
tcp_client->close();

FakeRawConnectionPtr fake_upstream_connection;
ASSERT_TRUE(expected_upstream->waitForRawConnection(fake_upstream_connection));
ASSERT_TRUE(expected_upstream->waitForAndConsumeDisconnectedConnection());

test_server_->waitForCounterGe("thrift.thrift_stats.cx_destroy_remote_with_active_rq", 1);

Expand Down
105 changes: 81 additions & 24 deletions test/integration/fake_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "test/test_common/utility.h"

#include "absl/strings/str_cat.h"
#include "absl/synchronization/notification.h"

using namespace std::chrono_literals;

Expand Down Expand Up @@ -354,14 +355,28 @@ AssertionResult FakeConnectionBase::close(std::chrono::milliseconds timeout) {
}

AssertionResult FakeConnectionBase::readDisable(bool disable, std::chrono::milliseconds timeout) {
return shared_connection_.executeOnDispatcher(
[disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout);
// Do the work inline if called from the dispatcher thread, executeOnDispatcher can only be called
// from outside the dispatcher thread.
if (shared_connection_.connection().dispatcher().isThreadSafe()) {
shared_connection_.connection().readDisable(disable);
return AssertionSuccess();
} else {
return shared_connection_.executeOnDispatcher(
[disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to special-case same-thread calls? Is that because when calling from the same thread, we are assuming the operation is blocking? (add code comments and we are good to go)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executeOnDispatcher blocks on until the posted callback finishes. Calling post and waiting from the dispatcher thread results in a deadlock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense; adding a code comment may help the reader.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idea: Instead of dealing this in all call sites, could executeOnDispatcher() figure this out and just run the lambda versus posting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I undid this branching and reverted this function.

FakeUpstream now does readDisable and enableHalfClose directly on the network connection instead of going through this wrapper in the cases where the operation happens from the dispatcher thread. Some tests still call this method and that requires going down the executeOnDispatcher version.

}
}

AssertionResult FakeConnectionBase::enableHalfClose(bool enable,
std::chrono::milliseconds timeout) {
return shared_connection_.executeOnDispatcher(
[enable](Network::Connection& connection) { connection.enableHalfClose(enable); }, timeout);
// Do the work inline if called from the dispatcher thread, executeOnDispatcher can only be called
// from outside the dispatcher thread.
if (shared_connection_.connection().dispatcher().isThreadSafe()) {
shared_connection_.connection().enableHalfClose(enable);
return AssertionSuccess();
} else {
return shared_connection_.executeOnDispatcher(
[enable](Network::Connection& connection) { connection.enableHalfClose(enable); }, timeout);
}
}

Http::RequestDecoder& FakeHttpConnection::newStream(Http::ResponseEncoder& encoder, bool) {
Expand Down Expand Up @@ -552,16 +567,19 @@ AssertionResult FakeUpstream::waitForHttpConnection(
client_dispatcher, timeout)) {
return AssertionFailure() << "Timed out waiting for new connection.";
}
}

return runOnDispatcherThreadAndWait([&]() {
absl::MutexLock lock(&lock_);
connection = std::make_unique<FakeHttpConnection>(
*this, consumeConnection(), http_type_, time_system_, max_request_headers_kb,
max_request_headers_count, headers_with_underscores_action);
}
VERIFY_ASSERTION(connection->initialize());
if (read_disable_on_new_connection_) {
VERIFY_ASSERTION(connection->readDisable(false));
}
return AssertionSuccess();
VERIFY_ASSERTION(connection->initialize());
if (read_disable_on_new_connection_) {
VERIFY_ASSERTION(connection->readDisable(false));
}
return AssertionSuccess();
});
}

AssertionResult
Expand All @@ -585,14 +603,18 @@ FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher,
client_dispatcher, 5ms)) {
continue;
}
}

return upstream.runOnDispatcherThreadAndWait([&]() {
absl::MutexLock lock(&upstream.lock_);
connection = std::make_unique<FakeHttpConnection>(
upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(),
Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT,
envoy::config::core::v3::HttpProtocolOptions::ALLOW);
}
VERIFY_ASSERTION(connection->initialize());
VERIFY_ASSERTION(connection->readDisable(false));
return AssertionSuccess();
VERIFY_ASSERTION(connection->initialize());
VERIFY_ASSERTION(connection->readDisable(false));
return AssertionSuccess();
});
}
}
return AssertionFailure() << "Timed out waiting for HTTP connection.";
Expand All @@ -610,11 +632,29 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect
if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
return AssertionFailure() << "Timed out waiting for raw connection";
}
}

return runOnDispatcherThreadAndWait([&]() {
absl::MutexLock lock(&lock_);
connection = std::make_unique<FakeRawConnection>(consumeConnection(), timeSystem());
VERIFY_ASSERTION(connection->initialize());
VERIFY_ASSERTION(connection->readDisable(false));
VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_));
return AssertionSuccess();
});
}

testing::AssertionResult
FakeUpstream::waitForAndConsumeDisconnectedConnection(std::chrono::milliseconds timeout) {
absl::MutexLock lock(&lock_);
const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
return !new_connections_.empty() && !new_connections_.front()->connected();
};

if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
return AssertionFailure() << "Timed out waiting for raw connection";
}
VERIFY_ASSERTION(connection->initialize());
VERIFY_ASSERTION(connection->readDisable(false));
VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_));
consumeConnection();
return AssertionSuccess();
}

Expand Down Expand Up @@ -647,6 +687,18 @@ void FakeUpstream::onRecvDatagram(Network::UdpRecvData& data) {
received_datagrams_.emplace_back(std::move(data));
}

AssertionResult FakeUpstream::runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have a timeout to avoid test hard timeout? It could be hard coded to 15s if that is easier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a timeout which results in a RELEASE_ASSERT if hit. The callbacks run on the dispatcher should be really fast, so it taking an extended period of time indicates that something has gone really wrong. Cancelling the work is not an option since it is hard to tell how far the callback got before the timeout happens.

Returning AssertionResult is not a good option because there are FakeUpstream functions that wait for an HTTP upstream connection with short timeout in a loop and interpret the returned AssertionResult as a retryable failure.

AssertionResult result = AssertionSuccess();
absl::Notification done;
ASSERT(!dispatcher_->isThreadSafe());
dispatcher_->post([&]() {
result = cb();
done.Notify();
});
done.WaitForNotification();
return result;
}

void FakeUpstream::sendUdpDatagram(const std::string& buffer,
const Network::Address::InstanceConstSharedPtr& peer) {
dispatcher_->post([this, buffer, peer] {
Expand Down Expand Up @@ -683,14 +735,19 @@ FakeRawConnection::~FakeRawConnection() {
}

testing::AssertionResult FakeRawConnection::initialize() {
auto filter = Network::ReadFilterSharedPtr{new ReadFilter(*this)};
ASSERT(shared_connection_.connection().dispatcher().isThreadSafe());
Network::ReadFilterSharedPtr filter{new ReadFilter(*this)};
read_filter_ = filter;
testing::AssertionResult result = shared_connection_.executeOnDispatcher(
[filter = std::move(filter)](Network::Connection& connection) {
connection.addReadFilter(filter);
});
if (!result) {
return result;
if (shared_connection_.connection().dispatcher().isThreadSafe()) {
shared_connection_.connection().addReadFilter(filter);
} else {
testing::AssertionResult result = shared_connection_.executeOnDispatcher(
[filter = std::move(filter)](Network::Connection& connection) {
connection.addReadFilter(filter);
});
if (!result) {
return result;
}
}
return FakeConnectionBase::initialize();
}
Expand Down
22 changes: 9 additions & 13 deletions test/integration/fake_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,17 +257,6 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,
connection_.addConnectionCallbacks(*this);
}

Common::CallbackHandle* addDisconnectCallback(DisconnectCallback callback) {
absl::MutexLock lock(&lock_);
return disconnect_callback_manager_.add(callback);
}

// Avoid directly removing by caller, since CallbackManager is not thread safe.
void removeDisconnectCallback(Common::CallbackHandle* handle) {
absl::MutexLock lock(&lock_);
handle->remove();
}

// Network::ConnectionCallbacks
void onEvent(Network::ConnectionEvent event) override {
// Throughout this entire function, we know that the connection_ cannot disappear, since this
Expand All @@ -278,7 +267,6 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,
if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {
disconnected_ = true;
disconnect_callback_manager_.runCallbacks();
}
}

Expand Down Expand Up @@ -315,6 +303,8 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,
if (disconnected_) {
return testing::AssertionSuccess();
}
ASSERT(!connection_.dispatcher().isThreadSafe(),
"deadlock: executeOnDispatcher called from dispatcher thread.");
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
bool callback_ready_event = false;
bool unexpected_disconnect = false;
connection_.dispatcher().post(
Expand Down Expand Up @@ -345,13 +335,13 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,

void setParented() {
absl::MutexLock lock(&lock_);
ASSERT(!parented_);
parented_ = true;
}

private:
Network::Connection& connection_;
absl::Mutex lock_;
Common::CallbackManager<> disconnect_callback_manager_ ABSL_GUARDED_BY(lock_);
bool parented_ ABSL_GUARDED_BY(lock_){};
bool disconnected_ ABSL_GUARDED_BY(lock_){};
};
Expand Down Expand Up @@ -579,6 +569,11 @@ class FakeUpstream : Logger::Loggable<Logger::Id::testing>,
testing::AssertionResult
waitForRawConnection(FakeRawConnectionPtr& connection,
std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

ABSL_MUST_USE_RESULT
testing::AssertionResult waitForAndConsumeDisconnectedConnection(
std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

Network::Address::InstanceConstSharedPtr localAddress() const { return socket_->localAddress(); }

// Wait for one of the upstreams to receive a connection
Expand Down Expand Up @@ -734,6 +729,7 @@ class FakeUpstream : Logger::Loggable<Logger::Id::testing>,
void threadRoutine();
SharedConnectionWrapper& consumeConnection() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_);
void onRecvDatagram(Network::UdpRecvData& data);
AssertionResult runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb);

Network::SocketSharedPtr socket_;
Network::ListenSocketFactorySharedPtr socket_factory_;
Expand Down
26 changes: 11 additions & 15 deletions test/integration/hds_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,11 +438,9 @@ TEST_P(HdsIntegrationTest, SingleEndpointTimeoutHttp) {
hds_stream_->sendGrpcMessage(server_health_check_specifier_);
test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_);

// Envoy sends a health check message to an endpoint
ASSERT_TRUE(host_upstream_->waitForRawConnection(host_fake_raw_connection_));

// Endpoint doesn't respond to the health check
ASSERT_TRUE(host_fake_raw_connection_->waitForDisconnect());
// Envoy sends a health check message to an endpoint, but the endpoint doesn't respond to the
// health check
ASSERT_TRUE(host_upstream_->waitForAndConsumeDisconnectedConnection());

// Receive updates until the one we expect arrives
waitForEndpointHealthResponse(envoy::config::core::v3::TIMEOUT);
Expand Down Expand Up @@ -515,11 +513,9 @@ TEST_P(HdsIntegrationTest, SingleEndpointTimeoutTcp) {
hds_stream_->sendGrpcMessage(server_health_check_specifier_);
test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_);

// Envoys asks the endpoint if it's healthy
ASSERT_TRUE(host_upstream_->waitForRawConnection(host_fake_raw_connection_));

// No response from the endpoint
ASSERT_TRUE(host_fake_raw_connection_->waitForDisconnect());
// Envoy sends a health check message to an endpoint, but the endpoint doesn't respond to the
// health check
ASSERT_TRUE(host_upstream_->waitForAndConsumeDisconnectedConnection());

// Receive updates until the one we expect arrives
waitForEndpointHealthResponse(envoy::config::core::v3::TIMEOUT);
Expand Down Expand Up @@ -1031,6 +1027,8 @@ TEST_P(HdsIntegrationTest, SingleEndpointUnhealthyTlsMissingSocketMatch) {
tls_hosts_ = true;

initialize();
// Allow the fake upstreams to detect an error and disconnect during the TLS handshake.
host_upstream_->setReadDisableOnNewConnection(false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The readDisable on the connection does not prevent the delivery of EPOLLOUT events to the connection. The delivery of a write event triggers the start of the SSL handshake operation which can perform reads and writes. The handling of the out event sometimes results in detection of the plain HTTP request sent by the proxy to the fake upstream, which can trigger termination of the upstream connection in the middle of the process of creating a raw connection, which results in a use-after-free crash.

By fully enabling events on the TLS upstream and waiting for disconnect we avoid flaky failures.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add all this color to the comment for the next person? (or since you have the same fix below maybe have a well named helper function with a bunch of comments? Up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to go back to see if I can fix this properly.

I think that the original waitForRawConnection followed by waitForDisconnect API was more intuitive than the setReadDisableOnNewConnection+waitForAndConsumeDisconnectedConnection that I need to avoid use after free in the TLS handshake failure cases. I think the solution is to avoid creating the network connection object until later and thus preventing events from being handled when the filter chain and related objects are not fully setup

/wait

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to make tests work with the old API by allowing raw connection creation to succeed in cases where the network connection is already disconnected. Not having to call different methods in tests that expect disconnects feels easier.

PTAL.


// Server <--> Envoy
waitForHdsStream();
Expand All @@ -1046,11 +1044,9 @@ TEST_P(HdsIntegrationTest, SingleEndpointUnhealthyTlsMissingSocketMatch) {
hds_stream_->sendGrpcMessage(server_health_check_specifier_);
test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_);

// Envoy sends a health check message to an endpoint
ASSERT_TRUE(host_upstream_->waitForRawConnection(host_fake_raw_connection_));

// Endpoint doesn't respond to the health check
ASSERT_TRUE(host_fake_raw_connection_->waitForDisconnect());
// Envoy sends a health check message to an endpoint, but the endpoint doesn't respond to the
// health check
ASSERT_TRUE(host_upstream_->waitForAndConsumeDisconnectedConnection());

// Receive updates until the one we expect arrives. This should be UNHEALTHY and not TIMEOUT,
// because TIMEOUT occurs in the situation where there is no response from the endpoint. In this
Expand Down
4 changes: 1 addition & 3 deletions test/integration/sds_dynamic_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,7 @@ TEST_P(SdsDynamicUpstreamIntegrationTest, WrongSecretFirst) {
EXPECT_EQ("503", response->headers().getStatusValue());

// To flush out the reset connection from the first request in upstream.
FakeRawConnectionPtr fake_upstream_connection;
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
ASSERT_TRUE(fake_upstreams_[0]->waitForAndConsumeDisconnectedConnection());

// Failure
EXPECT_EQ(0, test_server_->counter("sds.client_cert.update_success")->value());
Expand Down