Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: FakeUpstream threading fixes #14526

Merged
merged 18 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
512610f
test: FakeUpstream threading fixes
antoniovicente Dec 24, 2020
19a63f7
address review comments
antoniovicente Dec 29, 2020
5f767cf
fix flakiness in hds_integration_test
antoniovicente Dec 29, 2020
1bfa958
Merge remote-tracking branch 'upstream/master' into fake_upstream_thr…
antoniovicente Dec 29, 2020
45b8864
Fix wait for disconnect. Previously disconnect condition happened un…
antoniovicente Jan 6, 2021
3b653f2
fix use after free due to connection possibly being deleted.
antoniovicente Jan 7, 2021
ee052d6
fix more use-after-free
antoniovicente Jan 7, 2021
e9f4597
fix flaky test
antoniovicente Jan 7, 2021
3c55b38
Merge remote-tracking branch 'upstream/master' into fake_upstream_thr…
antoniovicente Jan 8, 2021
495b19e
add logging to debug macos failure
antoniovicente Jan 9, 2021
6580555
Wait outside the upstream lock since holding that lock is not needed.
antoniovicente Jan 11, 2021
f90610f
Symbolize names of VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS test cases.
antoniovicente Jan 11, 2021
22211c9
also stringify arguments to other VERSIONED_GRPC_CLIENT_INTEGRATION_P…
antoniovicente Jan 11, 2021
0d4fc3d
Fix issue of timeouts waiting for disconnect on macos while also avoi…
antoniovicente Jan 13, 2021
d0565e5
address review comments
antoniovicente Jan 15, 2021
197362f
revert back to the original wait for a raw connection followed by che…
antoniovicente Jan 16, 2021
3aee6d7
add thread annotations to FakeConnectionBase::initialized_
antoniovicente Jan 19, 2021
8fccaf3
fix use-after-free in test framework
antoniovicente Jan 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions test/common/grpc/grpc_client_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ class VersionedGrpcClientIntegrationParamTest
return fmt::format("{}_{}_{}",
std::get<0>(p.param) == Network::Address::IpVersion::v4 ? "IPv4" : "IPv6",
std::get<1>(p.param) == ClientType::GoogleGrpc ? "GoogleGrpc" : "EnvoyGrpc",
std::get<2>(p.param) == envoy::config::core::v3::ApiVersion::V3
? "V3"
: envoy::config::core::v3::ApiVersion::V2 ? "V2" : "AUTO");
ApiVersion_Name(std::get<2>(p.param)));
}
Network::Address::IpVersion ipVersion() const override { return std::get<0>(GetParam()); }
ClientType clientType() const override { return std::get<1>(GetParam()); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ class AccessLogIntegrationTest : public Grpc::VersionedGrpcClientIntegrationPara
};

INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, AccessLogIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

// Test a basic full access logging flow.
TEST_P(AccessLogIntegrationTest, BasicAccessLogFlow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ class TcpGrpcAccessLogIntegrationTest : public Grpc::VersionedGrpcClientIntegrat
};

INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, TcpGrpcAccessLogIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

// Test a basic full access logging flow.
TEST_P(TcpGrpcAccessLogIntegrationTest, BasicAccessLogFlow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,8 @@ class ExtAuthzHttpIntegrationTest : public HttpIntegrationTest,
};

INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, ExtAuthzGrpcIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

// Verifies that the request body is included in the CheckRequest when the downstream protocol is
// HTTP/1.1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,18 @@ class RatelimitFilterEnvoyRatelimitedHeaderDisabledIntegrationTest
};

INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);
INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitFailureModeIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);
INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitFilterHeadersEnabledIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);
INSTANTIATE_TEST_SUITE_P(IpVersionsClientType,
RatelimitFilterEnvoyRatelimitedHeaderDisabledIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

TEST_P(RatelimitIntegrationTest, Ok) {
XDS_DEPRECATED_FEATURE_TEST_SKIP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio
};

INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, MetricsServiceIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

// Test a basic metric service flow.
TEST_P(MetricsServiceIntegrationTest, BasicFlow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class ProxyProtocolIntegrationTest : public testing::TestWithParam<Network::Addr

void TearDown() override {
test_server_.reset();
fake_upstream_connection_.reset();
fake_upstreams_.clear();
}

Expand Down
3 changes: 1 addition & 2 deletions test/integration/autonomous_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ bool AutonomousUpstream::createNetworkFilterChain(Network::Connection& connectio
shared_connections_.emplace_back(new SharedConnectionWrapper(connection));
AutonomousHttpConnectionPtr http_connection(
new AutonomousHttpConnection(*this, *shared_connections_.back(), http_type_, *this));
testing::AssertionResult result = http_connection->initialize();
RELEASE_ASSERT(result, result.message());
http_connection->initialize();
http_connections_.push_back(std::move(http_connection));
return true;
}
Expand Down
90 changes: 60 additions & 30 deletions test/integration/fake_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "test/test_common/utility.h"

#include "absl/strings/str_cat.h"
#include "absl/synchronization/notification.h"

using namespace std::chrono_literals;

Expand Down Expand Up @@ -358,12 +359,6 @@ AssertionResult FakeConnectionBase::readDisable(bool disable, std::chrono::milli
[disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout);
}

AssertionResult FakeConnectionBase::enableHalfClose(bool enable,
std::chrono::milliseconds timeout) {
return shared_connection_.executeOnDispatcher(
[enable](Network::Connection& connection) { connection.enableHalfClose(enable); }, timeout);
}

Http::RequestDecoder& FakeHttpConnection::newStream(Http::ResponseEncoder& encoder, bool) {
absl::MutexLock lock(&lock_);
new_streams_.emplace_back(new FakeStream(*this, encoder, time_system_));
Expand Down Expand Up @@ -513,6 +508,9 @@ bool FakeUpstream::createNetworkFilterChain(Network::Connection& connection,
const std::vector<Network::FilterFactoryCb>&) {
absl::MutexLock lock(&lock_);
if (read_disable_on_new_connection_) {
// Disable early close detection to avoid closing the network connection before full
// initialization is complete.
connection.detectEarlyCloseWhenReadDisabled(false);
connection.readDisable(true);
}
auto connection_wrapper = std::make_unique<SharedConnectionWrapper>(connection);
Expand Down Expand Up @@ -552,16 +550,16 @@ AssertionResult FakeUpstream::waitForHttpConnection(
client_dispatcher, timeout)) {
return AssertionFailure() << "Timed out waiting for new connection.";
}
}

return runOnDispatcherThreadAndWait([&]() {
absl::MutexLock lock(&lock_);
connection = std::make_unique<FakeHttpConnection>(
*this, consumeConnection(), http_type_, time_system_, max_request_headers_kb,
max_request_headers_count, headers_with_underscores_action);
}
VERIFY_ASSERTION(connection->initialize());
if (read_disable_on_new_connection_) {
VERIFY_ASSERTION(connection->readDisable(false));
}
return AssertionSuccess();
connection->initialize();
return AssertionSuccess();
});
}

AssertionResult
Expand All @@ -585,14 +583,17 @@ FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher,
client_dispatcher, 5ms)) {
continue;
}
}

return upstream.runOnDispatcherThreadAndWait([&]() {
absl::MutexLock lock(&upstream.lock_);
connection = std::make_unique<FakeHttpConnection>(
upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(),
Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT,
envoy::config::core::v3::HttpProtocolOptions::ALLOW);
}
VERIFY_ASSERTION(connection->initialize());
VERIFY_ASSERTION(connection->readDisable(false));
return AssertionSuccess();
connection->initialize();
return AssertionSuccess();
});
}
}
return AssertionFailure() << "Timed out waiting for HTTP connection.";
Expand All @@ -610,19 +611,35 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect
if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
return AssertionFailure() << "Timed out waiting for raw connection";
}
connection = std::make_unique<FakeRawConnection>(consumeConnection(), timeSystem());
}
VERIFY_ASSERTION(connection->initialize());
VERIFY_ASSERTION(connection->readDisable(false));
VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_));
return AssertionSuccess();

return runOnDispatcherThreadAndWait([&]() {
absl::MutexLock lock(&lock_);
connection = std::make_unique<FakeRawConnection>(consumeConnection(), timeSystem());
connection->initialize();
// Skip enableHalfClose if the connection is already disconnected.
if (connection->connected()) {
connection->connection().enableHalfClose(enable_half_close_);
}
return AssertionSuccess();
});
}

SharedConnectionWrapper& FakeUpstream::consumeConnection() {
ASSERT(!new_connections_.empty());
auto* const connection_wrapper = new_connections_.front().get();
// Skip the thread safety check if the network connection has already been freed since there's no
// alternate way to get access to the dispatcher.
ASSERT(!connection_wrapper->connected() ||
connection_wrapper->connection().dispatcher().isThreadSafe());
connection_wrapper->setParented();
connection_wrapper->moveBetweenLists(new_connections_, consumed_connections_);
if (read_disable_on_new_connection_ && connection_wrapper->connected()) {
// Re-enable read and early close detection.
auto& connection = connection_wrapper->connection();
connection.detectEarlyCloseWhenReadDisabled(true);
connection.readDisable(false);
}
return *connection_wrapper;
}

Expand All @@ -647,6 +664,20 @@ void FakeUpstream::onRecvDatagram(Network::UdpRecvData& data) {
received_datagrams_.emplace_back(std::move(data));
}

AssertionResult FakeUpstream::runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb,
std::chrono::milliseconds timeout) {
auto result = std::make_shared<AssertionResult>(AssertionSuccess());
auto done = std::make_shared<absl::Notification>();
ASSERT(!dispatcher_->isThreadSafe());
dispatcher_->post([&]() {
*result = cb();
done->Notify();
});
RELEASE_ASSERT(done->WaitForNotificationWithTimeout(absl::FromChrono(timeout)),
"Timed out waiting for cb to run on dispatcher");
return *result;
}

void FakeUpstream::sendUdpDatagram(const std::string& buffer,
const Network::Address::InstanceConstSharedPtr& peer) {
dispatcher_->post([this, buffer, peer] {
Expand Down Expand Up @@ -682,17 +713,16 @@ FakeRawConnection::~FakeRawConnection() {
}
}

testing::AssertionResult FakeRawConnection::initialize() {
auto filter = Network::ReadFilterSharedPtr{new ReadFilter(*this)};
void FakeRawConnection::initialize() {
FakeConnectionBase::initialize();
Network::ReadFilterSharedPtr filter{new ReadFilter(*this)};
read_filter_ = filter;
testing::AssertionResult result = shared_connection_.executeOnDispatcher(
[filter = std::move(filter)](Network::Connection& connection) {
connection.addReadFilter(filter);
});
if (!result) {
return result;
if (!shared_connection_.connected()) {
ENVOY_LOG(warn, "FakeRawConnection::initialize: network connection is already disconnected");
return;
}
return FakeConnectionBase::initialize();
ASSERT(shared_connection_.connection().dispatcher().isThreadSafe());
shared_connection_.connection().addReadFilter(filter);
}

AssertionResult FakeRawConnection::waitForData(uint64_t num_bytes, std::string* data,
Expand Down
39 changes: 16 additions & 23 deletions test/integration/fake_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,17 +257,6 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,
connection_.addConnectionCallbacks(*this);
}

Common::CallbackHandle* addDisconnectCallback(DisconnectCallback callback) {
absl::MutexLock lock(&lock_);
return disconnect_callback_manager_.add(callback);
}

// Avoid directly removing by caller, since CallbackManager is not thread safe.
void removeDisconnectCallback(Common::CallbackHandle* handle) {
absl::MutexLock lock(&lock_);
handle->remove();
}

// Network::ConnectionCallbacks
void onEvent(Network::ConnectionEvent event) override {
// Throughout this entire function, we know that the connection_ cannot disappear, since this
Expand All @@ -278,7 +267,6 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,
if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {
disconnected_ = true;
disconnect_callback_manager_.runCallbacks();
}
}

Expand Down Expand Up @@ -315,6 +303,10 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,
if (disconnected_) {
return testing::AssertionSuccess();
}
// Sanity check: detect if the post and wait is attempted from the dispatcher thread; fail
// immediately instead of deadlocking.
ASSERT(!connection_.dispatcher().isThreadSafe(),
"deadlock: executeOnDispatcher called from dispatcher thread.");
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
bool callback_ready_event = false;
bool unexpected_disconnect = false;
connection_.dispatcher().post(
Expand Down Expand Up @@ -345,13 +337,13 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,

void setParented() {
absl::MutexLock lock(&lock_);
ASSERT(!parented_);
parented_ = true;
}

private:
Network::Connection& connection_;
absl::Mutex lock_;
Common::CallbackManager<> disconnect_callback_manager_ ABSL_GUARDED_BY(lock_);
bool parented_ ABSL_GUARDED_BY(lock_){};
bool disconnected_ ABSL_GUARDED_BY(lock_){};
};
Expand All @@ -363,7 +355,10 @@ using SharedConnectionWrapperPtr = std::unique_ptr<SharedConnectionWrapper>;
*/
class FakeConnectionBase : public Logger::Loggable<Logger::Id::testing> {
public:
virtual ~FakeConnectionBase() { ASSERT(initialized_); }
virtual ~FakeConnectionBase() {
absl::MutexLock lock(&lock_);
ASSERT(initialized_);
}

ABSL_MUST_USE_RESULT
testing::AssertionResult close(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
Expand All @@ -380,14 +375,10 @@ class FakeConnectionBase : public Logger::Loggable<Logger::Id::testing> {
testing::AssertionResult
waitForHalfClose(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

ABSL_MUST_USE_RESULT
virtual testing::AssertionResult initialize() {
virtual void initialize() {
absl::MutexLock lock(&lock_);
initialized_ = true;
return testing::AssertionSuccess();
}
ABSL_MUST_USE_RESULT
testing::AssertionResult
enableHalfClose(bool enabled, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
// The same caveats apply here as in SharedConnectionWrapper::connection().
Network::Connection& connection() const { return shared_connection_.connection(); }
bool connected() const { return shared_connection_.connected(); }
Expand All @@ -398,9 +389,9 @@ class FakeConnectionBase : public Logger::Loggable<Logger::Id::testing> {
time_system_(time_system) {}

SharedConnectionWrapper& shared_connection_;
bool initialized_{};
absl::Mutex& lock_; // TODO(mattklein123): Use the shared connection lock and figure out better
// guarded by annotations.
bool initialized_ ABSL_GUARDED_BY(lock_){};
bool half_closed_ ABSL_GUARDED_BY(lock_){};
Event::TestTimeSystem& time_system_;
};
Expand Down Expand Up @@ -493,8 +484,7 @@ class FakeRawConnection : public FakeConnectionBase {
testing::AssertionResult write(const std::string& data, bool end_stream = false,
std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

ABSL_MUST_USE_RESULT
testing::AssertionResult initialize() override;
void initialize() override;

// Creates a ValidatorFunction which returns true when data_to_wait_for is
// contained in the incoming data string. Unlike many of Envoy waitFor functions,
Expand Down Expand Up @@ -736,6 +726,9 @@ class FakeUpstream : Logger::Loggable<Logger::Id::testing>,
void threadRoutine();
SharedConnectionWrapper& consumeConnection() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_);
void onRecvDatagram(Network::UdpRecvData& data);
AssertionResult
runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb,
std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

Network::SocketSharedPtr socket_;
Network::ListenSocketFactorySharedPtr socket_factory_;
Expand Down
3 changes: 2 additions & 1 deletion test/integration/hds_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ class HdsIntegrationTest : public Grpc::VersionedGrpcClientIntegrationParamTest,
};

INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, HdsIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

// Tests Envoy HTTP health checking a single healthy endpoint and reporting that it is
// indeed healthy to the server.
Expand Down
3 changes: 2 additions & 1 deletion test/integration/load_stats_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,8 @@ class LoadStatsIntegrationTest : public Grpc::VersionedGrpcClientIntegrationPara
};

INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, LoadStatsIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

// Validate the load reports for successful requests as cluster membership
// changes.
Expand Down