Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: FakeUpstream threading fixes #14526

Merged
merged 18 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
512610f
test: FakeUpstream threading fixes
antoniovicente Dec 24, 2020
19a63f7
address review comments
antoniovicente Dec 29, 2020
5f767cf
fix flakiness in hds_integration_test
antoniovicente Dec 29, 2020
1bfa958
Merge remote-tracking branch 'upstream/master' into fake_upstream_thr…
antoniovicente Dec 29, 2020
45b8864
Fix wait for disconnect. Previously disconnect condition happened un…
antoniovicente Jan 6, 2021
3b653f2
fix use after free due to connection possibly being deleted.
antoniovicente Jan 7, 2021
ee052d6
fix more use-after-free
antoniovicente Jan 7, 2021
e9f4597
fix flaky test
antoniovicente Jan 7, 2021
3c55b38
Merge remote-tracking branch 'upstream/master' into fake_upstream_thr…
antoniovicente Jan 8, 2021
495b19e
add logging to debug macos failure
antoniovicente Jan 9, 2021
6580555
Wait outside the upstream lock since holding that lock is not needed.
antoniovicente Jan 11, 2021
f90610f
Symbolize names of VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS test cases.
antoniovicente Jan 11, 2021
22211c9
also stringify arguments to other VERSIONED_GRPC_CLIENT_INTEGRATION_P…
antoniovicente Jan 11, 2021
0d4fc3d
Fix issue of timeouts waiting for disconnect on macos while also avoi…
antoniovicente Jan 13, 2021
d0565e5
address review comments
antoniovicente Jan 15, 2021
197362f
revert back to the original wait for a raw connection followed by che…
antoniovicente Jan 16, 2021
3aee6d7
add thread annotations to FakeConnectionBase::initialized_
antoniovicente Jan 19, 2021
8fccaf3
fix use-after-free in test framework
antoniovicente Jan 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions test/common/grpc/grpc_client_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ class VersionedGrpcClientIntegrationParamTest
return fmt::format("{}_{}_{}",
std::get<0>(p.param) == Network::Address::IpVersion::v4 ? "IPv4" : "IPv6",
std::get<1>(p.param) == ClientType::GoogleGrpc ? "GoogleGrpc" : "EnvoyGrpc",
std::get<2>(p.param) == envoy::config::core::v3::ApiVersion::V3
? "V3"
: envoy::config::core::v3::ApiVersion::V2 ? "V2" : "AUTO");
ApiVersion_Name(std::get<2>(p.param)));
}
Network::Address::IpVersion ipVersion() const override { return std::get<0>(GetParam()); }
ClientType clientType() const override { return std::get<1>(GetParam()); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ class AccessLogIntegrationTest : public Grpc::VersionedGrpcClientIntegrationPara
};

INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, AccessLogIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

// Test a basic full access logging flow.
TEST_P(AccessLogIntegrationTest, BasicAccessLogFlow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ class TcpGrpcAccessLogIntegrationTest : public Grpc::VersionedGrpcClientIntegrat
};

INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, TcpGrpcAccessLogIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

// Test a basic full access logging flow.
TEST_P(TcpGrpcAccessLogIntegrationTest, BasicAccessLogFlow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,8 @@ class ExtAuthzHttpIntegrationTest : public HttpIntegrationTest,
};

INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, ExtAuthzGrpcIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

// Verifies that the request body is included in the CheckRequest when the downstream protocol is
// HTTP/1.1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,18 @@ class RatelimitFilterEnvoyRatelimitedHeaderDisabledIntegrationTest
};

INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);
INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitFailureModeIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);
INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitFilterHeadersEnabledIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);
INSTANTIATE_TEST_SUITE_P(IpVersionsClientType,
RatelimitFilterEnvoyRatelimitedHeaderDisabledIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

TEST_P(RatelimitIntegrationTest, Ok) {
XDS_DEPRECATED_FEATURE_TEST_SKIP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio
};

INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, MetricsServiceIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

// Test a basic metric service flow.
TEST_P(MetricsServiceIntegrationTest, BasicFlow) {
Expand Down
130 changes: 105 additions & 25 deletions test/integration/fake_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "test/test_common/utility.h"

#include "absl/strings/str_cat.h"
#include "absl/synchronization/notification.h"

using namespace std::chrono_literals;

Expand Down Expand Up @@ -354,14 +355,28 @@ AssertionResult FakeConnectionBase::close(std::chrono::milliseconds timeout) {
}

AssertionResult FakeConnectionBase::readDisable(bool disable, std::chrono::milliseconds timeout) {
return shared_connection_.executeOnDispatcher(
[disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout);
// Do the work inline if called from the dispatcher thread, executeOnDispatcher can only be called
// from outside the dispatcher thread.
if (shared_connection_.connection().dispatcher().isThreadSafe()) {
shared_connection_.connection().readDisable(disable);
return AssertionSuccess();
} else {
return shared_connection_.executeOnDispatcher(
[disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to special-case same-thread calls? Is that because when calling from the same thread, we are assuming the operation is blocking? (add code comments and we are good to go)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executeOnDispatcher blocks on until the posted callback finishes. Calling post and waiting from the dispatcher thread results in a deadlock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense; adding a code comment may help the reader.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idea: Instead of dealing this in all call sites, could executeOnDispatcher() figure this out and just run the lambda versus posting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I undid this branching and reverted this function.

FakeUpstream now does readDisable and enableHalfClose directly on the network connection instead of going through this wrapper in the cases where the operation happens from the dispatcher thread. Some tests still call this method and that requires going down the executeOnDispatcher version.

}
}

AssertionResult FakeConnectionBase::enableHalfClose(bool enable,
std::chrono::milliseconds timeout) {
return shared_connection_.executeOnDispatcher(
[enable](Network::Connection& connection) { connection.enableHalfClose(enable); }, timeout);
// Do the work inline if called from the dispatcher thread, executeOnDispatcher can only be called
// from outside the dispatcher thread.
if (shared_connection_.connection().dispatcher().isThreadSafe()) {
shared_connection_.connection().enableHalfClose(enable);
return AssertionSuccess();
} else {
return shared_connection_.executeOnDispatcher(
[enable](Network::Connection& connection) { connection.enableHalfClose(enable); }, timeout);
}
}

Http::RequestDecoder& FakeHttpConnection::newStream(Http::ResponseEncoder& encoder, bool) {
Expand Down Expand Up @@ -513,6 +528,9 @@ bool FakeUpstream::createNetworkFilterChain(Network::Connection& connection,
const std::vector<Network::FilterFactoryCb>&) {
absl::MutexLock lock(&lock_);
if (read_disable_on_new_connection_) {
// Disable early close detection to avoid closing the network connection before full
// initialization is complete.
connection.detectEarlyCloseWhenReadDisabled(false);
connection.readDisable(true);
}
auto connection_wrapper = std::make_unique<SharedConnectionWrapper>(connection);
Expand Down Expand Up @@ -552,16 +570,16 @@ AssertionResult FakeUpstream::waitForHttpConnection(
client_dispatcher, timeout)) {
return AssertionFailure() << "Timed out waiting for new connection.";
}
}

return runOnDispatcherThreadAndWait([&]() {
absl::MutexLock lock(&lock_);
connection = std::make_unique<FakeHttpConnection>(
*this, consumeConnection(), http_type_, time_system_, max_request_headers_kb,
max_request_headers_count, headers_with_underscores_action);
}
VERIFY_ASSERTION(connection->initialize());
if (read_disable_on_new_connection_) {
VERIFY_ASSERTION(connection->readDisable(false));
}
return AssertionSuccess();
VERIFY_ASSERTION(connection->initialize());
return AssertionSuccess();
});
}

AssertionResult
Expand All @@ -585,14 +603,17 @@ FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher,
client_dispatcher, 5ms)) {
continue;
}
}

return upstream.runOnDispatcherThreadAndWait([&]() {
absl::MutexLock lock(&upstream.lock_);
connection = std::make_unique<FakeHttpConnection>(
upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(),
Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT,
envoy::config::core::v3::HttpProtocolOptions::ALLOW);
}
VERIFY_ASSERTION(connection->initialize());
VERIFY_ASSERTION(connection->readDisable(false));
return AssertionSuccess();
VERIFY_ASSERTION(connection->initialize());
return AssertionSuccess();
});
}
}
return AssertionFailure() << "Timed out waiting for HTTP connection.";
Expand All @@ -610,19 +631,57 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect
if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
return AssertionFailure() << "Timed out waiting for raw connection";
}
}

return runOnDispatcherThreadAndWait([&]() {
absl::MutexLock lock(&lock_);
connection = std::make_unique<FakeRawConnection>(consumeConnection(), timeSystem());
VERIFY_ASSERTION(connection->initialize());
VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_));
return AssertionSuccess();
});
}

testing::AssertionResult
FakeUpstream::waitForAndConsumeDisconnectedConnection(std::chrono::milliseconds timeout) {
ASSERT(!read_disable_on_new_connection_);
SharedConnectionWrapper* connection;
{
absl::MutexLock lock(&lock_);
ENVOY_LOG_MISC(critical, "waitForAndConsumeDisconnectedConnection");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? delete or change log level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I thought I had removed this debug comment from the time I was trying to track down macos issues via CI.

const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
return !new_connections_.empty();
};

if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
return AssertionFailure() << "Timed out waiting for raw connection";
}
}
VERIFY_ASSERTION(connection->initialize());
VERIFY_ASSERTION(connection->readDisable(false));
VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_));
return AssertionSuccess();

VERIFY_ASSERTION(runOnDispatcherThreadAndWait([&]() {
absl::MutexLock lock(&lock_);
connection = &consumeConnection();
return AssertionSuccess();
}));

return connection->waitForDisconnect(time_system_, timeout);
}

SharedConnectionWrapper& FakeUpstream::consumeConnection() {
ASSERT(!new_connections_.empty());
auto* const connection_wrapper = new_connections_.front().get();
// Skip the thread safety check if the network connection has already been freed since there's no
// alternate way to get access to the dispatcher.
ASSERT(!connection_wrapper->connected() ||
connection_wrapper->connection().dispatcher().isThreadSafe());
connection_wrapper->setParented();
connection_wrapper->moveBetweenLists(new_connections_, consumed_connections_);
if (read_disable_on_new_connection_) {
// Re-enable read and early close detection.
auto& connection = connection_wrapper->connection();
connection.detectEarlyCloseWhenReadDisabled(true);
connection.readDisable(false);
}
return *connection_wrapper;
}

Expand All @@ -647,6 +706,18 @@ void FakeUpstream::onRecvDatagram(Network::UdpRecvData& data) {
received_datagrams_.emplace_back(std::move(data));
}

AssertionResult FakeUpstream::runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have a timeout to avoid test hard timeout? It could be hard coded to 15s if that is easier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a timeout which results in a RELEASE_ASSERT if hit. The callbacks run on the dispatcher should be really fast, so it taking an extended period of time indicates that something has gone really wrong. Cancelling the work is not an option since it is hard to tell how far the callback got before the timeout happens.

Returning AssertionResult is not a good option because there are FakeUpstream functions that wait for an HTTP upstream connection with short timeout in a loop and interpret the returned AssertionResult as a retryable failure.

AssertionResult result = AssertionSuccess();
absl::Notification done;
ASSERT(!dispatcher_->isThreadSafe());
dispatcher_->post([&]() {
result = cb();
done.Notify();
});
done.WaitForNotification();
return result;
}

void FakeUpstream::sendUdpDatagram(const std::string& buffer,
const Network::Address::InstanceConstSharedPtr& peer) {
dispatcher_->post([this, buffer, peer] {
Expand Down Expand Up @@ -683,14 +754,23 @@ FakeRawConnection::~FakeRawConnection() {
}

testing::AssertionResult FakeRawConnection::initialize() {
auto filter = Network::ReadFilterSharedPtr{new ReadFilter(*this)};
Network::ReadFilterSharedPtr filter{new ReadFilter(*this)};
read_filter_ = filter;
testing::AssertionResult result = shared_connection_.executeOnDispatcher(
[filter = std::move(filter)](Network::Connection& connection) {
connection.addReadFilter(filter);
});
if (!result) {
return result;
if (!shared_connection_.connected()) {
VERIFY_ASSERTION(FakeConnectionBase::initialize());
return AssertionFailure() << "initialize failed, connection is disconnected.";
}
ASSERT(shared_connection_.connection().dispatcher().isThreadSafe());
if (shared_connection_.connection().dispatcher().isThreadSafe()) {
shared_connection_.connection().addReadFilter(filter);
} else {
testing::AssertionResult result = shared_connection_.executeOnDispatcher(
[filter = std::move(filter)](Network::Connection& connection) {
connection.addReadFilter(filter);
});
if (!result) {
return result;
}
}
return FakeConnectionBase::initialize();
}
Expand Down
34 changes: 21 additions & 13 deletions test/integration/fake_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,17 +257,6 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,
connection_.addConnectionCallbacks(*this);
}

Common::CallbackHandle* addDisconnectCallback(DisconnectCallback callback) {
absl::MutexLock lock(&lock_);
return disconnect_callback_manager_.add(callback);
}

// Avoid directly removing by caller, since CallbackManager is not thread safe.
void removeDisconnectCallback(Common::CallbackHandle* handle) {
absl::MutexLock lock(&lock_);
handle->remove();
}

// Network::ConnectionCallbacks
void onEvent(Network::ConnectionEvent event) override {
// Throughout this entire function, we know that the connection_ cannot disappear, since this
Expand All @@ -278,7 +267,6 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,
if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {
disconnected_ = true;
disconnect_callback_manager_.runCallbacks();
}
}

Expand All @@ -297,6 +285,18 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,
return !disconnected_;
}

ABSL_MUST_USE_RESULT
testing::AssertionResult
waitForDisconnect(Event::TestTimeSystem& time_system,
std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) {
absl::MutexLock lock(&lock_);
if (!time_system.waitFor(lock_, absl::Condition(&disconnected_), timeout)) {
ENVOY_LOG_MISC(critical, "timeout");
return AssertionFailure() << "Timed out waiting for disconnect";
}
return AssertionSuccess();
}

// This provides direct access to the underlying connection, but only to const methods.
// Stateful connection related methods should happen on the connection's dispatcher via
// executeOnDispatcher.
Expand All @@ -315,6 +315,8 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,
if (disconnected_) {
return testing::AssertionSuccess();
}
ASSERT(!connection_.dispatcher().isThreadSafe(),
"deadlock: executeOnDispatcher called from dispatcher thread.");
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
bool callback_ready_event = false;
bool unexpected_disconnect = false;
connection_.dispatcher().post(
Expand Down Expand Up @@ -345,13 +347,13 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,

void setParented() {
absl::MutexLock lock(&lock_);
ASSERT(!parented_);
parented_ = true;
}

private:
Network::Connection& connection_;
absl::Mutex lock_;
Common::CallbackManager<> disconnect_callback_manager_ ABSL_GUARDED_BY(lock_);
bool parented_ ABSL_GUARDED_BY(lock_){};
bool disconnected_ ABSL_GUARDED_BY(lock_){};
};
Expand Down Expand Up @@ -579,6 +581,11 @@ class FakeUpstream : Logger::Loggable<Logger::Id::testing>,
testing::AssertionResult
waitForRawConnection(FakeRawConnectionPtr& connection,
std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

ABSL_MUST_USE_RESULT
testing::AssertionResult waitForAndConsumeDisconnectedConnection(
std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

Network::Address::InstanceConstSharedPtr localAddress() const {
return socket_->addressProvider().localAddress();
}
Expand Down Expand Up @@ -736,6 +743,7 @@ class FakeUpstream : Logger::Loggable<Logger::Id::testing>,
void threadRoutine();
SharedConnectionWrapper& consumeConnection() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_);
void onRecvDatagram(Network::UdpRecvData& data);
AssertionResult runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb);

Network::SocketSharedPtr socket_;
Network::ListenSocketFactorySharedPtr socket_factory_;
Expand Down
Loading