Skip to content

Commit

Permalink
test: FakeUpstream threading fixes (#14526)
Browse files Browse the repository at this point in the history
Signed-off-by: Antonio Vicente <avd@google.com>
  • Loading branch information
antoniovicente authored Jan 21, 2021
1 parent 42c75fa commit 8de5ad9
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 68 deletions.
4 changes: 1 addition & 3 deletions test/common/grpc/grpc_client_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ class VersionedGrpcClientIntegrationParamTest
return fmt::format("{}_{}_{}",
std::get<0>(p.param) == Network::Address::IpVersion::v4 ? "IPv4" : "IPv6",
std::get<1>(p.param) == ClientType::GoogleGrpc ? "GoogleGrpc" : "EnvoyGrpc",
std::get<2>(p.param) == envoy::config::core::v3::ApiVersion::V3
? "V3"
: envoy::config::core::v3::ApiVersion::V2 ? "V2" : "AUTO");
ApiVersion_Name(std::get<2>(p.param)));
}
Network::Address::IpVersion ipVersion() const override { return std::get<0>(GetParam()); }
ClientType clientType() const override { return std::get<1>(GetParam()); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ class AccessLogIntegrationTest : public Grpc::VersionedGrpcClientIntegrationPara
};

INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, AccessLogIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

// Test a basic full access logging flow.
TEST_P(AccessLogIntegrationTest, BasicAccessLogFlow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ class TcpGrpcAccessLogIntegrationTest : public Grpc::VersionedGrpcClientIntegrat
};

INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, TcpGrpcAccessLogIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

// Test a basic full access logging flow.
TEST_P(TcpGrpcAccessLogIntegrationTest, BasicAccessLogFlow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,8 @@ class ExtAuthzHttpIntegrationTest : public HttpIntegrationTest,
};

INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, ExtAuthzGrpcIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

// Verifies that the request body is included in the CheckRequest when the downstream protocol is
// HTTP/1.1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,18 @@ class RatelimitFilterEnvoyRatelimitedHeaderDisabledIntegrationTest
};

INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);
INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitFailureModeIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);
INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitFilterHeadersEnabledIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);
INSTANTIATE_TEST_SUITE_P(IpVersionsClientType,
RatelimitFilterEnvoyRatelimitedHeaderDisabledIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

TEST_P(RatelimitIntegrationTest, Ok) {
XDS_DEPRECATED_FEATURE_TEST_SKIP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio
};

INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, MetricsServiceIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

// Test a basic metric service flow.
TEST_P(MetricsServiceIntegrationTest, BasicFlow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class ProxyProtocolIntegrationTest : public testing::TestWithParam<Network::Addr

void TearDown() override {
test_server_.reset();
fake_upstream_connection_.reset();
fake_upstreams_.clear();
}

Expand Down
3 changes: 1 addition & 2 deletions test/integration/autonomous_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ bool AutonomousUpstream::createNetworkFilterChain(Network::Connection& connectio
shared_connections_.emplace_back(new SharedConnectionWrapper(connection));
AutonomousHttpConnectionPtr http_connection(
new AutonomousHttpConnection(*this, *shared_connections_.back(), http_type_, *this));
testing::AssertionResult result = http_connection->initialize();
RELEASE_ASSERT(result, result.message());
http_connection->initialize();
http_connections_.push_back(std::move(http_connection));
return true;
}
Expand Down
90 changes: 60 additions & 30 deletions test/integration/fake_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "test/test_common/utility.h"

#include "absl/strings/str_cat.h"
#include "absl/synchronization/notification.h"

using namespace std::chrono_literals;

Expand Down Expand Up @@ -358,12 +359,6 @@ AssertionResult FakeConnectionBase::readDisable(bool disable, std::chrono::milli
[disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout);
}

AssertionResult FakeConnectionBase::enableHalfClose(bool enable,
std::chrono::milliseconds timeout) {
return shared_connection_.executeOnDispatcher(
[enable](Network::Connection& connection) { connection.enableHalfClose(enable); }, timeout);
}

Http::RequestDecoder& FakeHttpConnection::newStream(Http::ResponseEncoder& encoder, bool) {
absl::MutexLock lock(&lock_);
new_streams_.emplace_back(new FakeStream(*this, encoder, time_system_));
Expand Down Expand Up @@ -531,6 +526,9 @@ bool FakeUpstream::createNetworkFilterChain(Network::Connection& connection,
const std::vector<Network::FilterFactoryCb>&) {
absl::MutexLock lock(&lock_);
if (read_disable_on_new_connection_) {
// Disable early close detection to avoid closing the network connection before full
// initialization is complete.
connection.detectEarlyCloseWhenReadDisabled(false);
connection.readDisable(true);
}
auto connection_wrapper = std::make_unique<SharedConnectionWrapper>(connection);
Expand Down Expand Up @@ -570,16 +568,16 @@ AssertionResult FakeUpstream::waitForHttpConnection(
client_dispatcher, timeout)) {
return AssertionFailure() << "Timed out waiting for new connection.";
}
}

return runOnDispatcherThreadAndWait([&]() {
absl::MutexLock lock(&lock_);
connection = std::make_unique<FakeHttpConnection>(
*this, consumeConnection(), http_type_, time_system_, max_request_headers_kb,
max_request_headers_count, headers_with_underscores_action);
}
VERIFY_ASSERTION(connection->initialize());
if (read_disable_on_new_connection_) {
VERIFY_ASSERTION(connection->readDisable(false));
}
return AssertionSuccess();
connection->initialize();
return AssertionSuccess();
});
}

AssertionResult
Expand All @@ -603,14 +601,17 @@ FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher,
client_dispatcher, 5ms)) {
continue;
}
}

return upstream.runOnDispatcherThreadAndWait([&]() {
absl::MutexLock lock(&upstream.lock_);
connection = std::make_unique<FakeHttpConnection>(
upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(),
Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT,
envoy::config::core::v3::HttpProtocolOptions::ALLOW);
}
VERIFY_ASSERTION(connection->initialize());
VERIFY_ASSERTION(connection->readDisable(false));
return AssertionSuccess();
connection->initialize();
return AssertionSuccess();
});
}
}
return AssertionFailure() << "Timed out waiting for HTTP connection.";
Expand All @@ -628,19 +629,35 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect
if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
return AssertionFailure() << "Timed out waiting for raw connection";
}
connection = std::make_unique<FakeRawConnection>(consumeConnection(), timeSystem());
}
VERIFY_ASSERTION(connection->initialize());
VERIFY_ASSERTION(connection->readDisable(false));
VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_));
return AssertionSuccess();

return runOnDispatcherThreadAndWait([&]() {
absl::MutexLock lock(&lock_);
connection = std::make_unique<FakeRawConnection>(consumeConnection(), timeSystem());
connection->initialize();
// Skip enableHalfClose if the connection is already disconnected.
if (connection->connected()) {
connection->connection().enableHalfClose(enable_half_close_);
}
return AssertionSuccess();
});
}

SharedConnectionWrapper& FakeUpstream::consumeConnection() {
ASSERT(!new_connections_.empty());
auto* const connection_wrapper = new_connections_.front().get();
// Skip the thread safety check if the network connection has already been freed since there's no
// alternate way to get access to the dispatcher.
ASSERT(!connection_wrapper->connected() ||
connection_wrapper->connection().dispatcher().isThreadSafe());
connection_wrapper->setParented();
connection_wrapper->moveBetweenLists(new_connections_, consumed_connections_);
if (read_disable_on_new_connection_ && connection_wrapper->connected()) {
// Re-enable read and early close detection.
auto& connection = connection_wrapper->connection();
connection.detectEarlyCloseWhenReadDisabled(true);
connection.readDisable(false);
}
return *connection_wrapper;
}

Expand All @@ -665,6 +682,20 @@ void FakeUpstream::onRecvDatagram(Network::UdpRecvData& data) {
received_datagrams_.emplace_back(std::move(data));
}

AssertionResult FakeUpstream::runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb,
std::chrono::milliseconds timeout) {
auto result = std::make_shared<AssertionResult>(AssertionSuccess());
auto done = std::make_shared<absl::Notification>();
ASSERT(!dispatcher_->isThreadSafe());
dispatcher_->post([&]() {
*result = cb();
done->Notify();
});
RELEASE_ASSERT(done->WaitForNotificationWithTimeout(absl::FromChrono(timeout)),
"Timed out waiting for cb to run on dispatcher");
return *result;
}

void FakeUpstream::sendUdpDatagram(const std::string& buffer,
const Network::Address::InstanceConstSharedPtr& peer) {
dispatcher_->post([this, buffer, peer] {
Expand Down Expand Up @@ -700,17 +731,16 @@ FakeRawConnection::~FakeRawConnection() {
}
}

testing::AssertionResult FakeRawConnection::initialize() {
auto filter = Network::ReadFilterSharedPtr{new ReadFilter(*this)};
void FakeRawConnection::initialize() {
FakeConnectionBase::initialize();
Network::ReadFilterSharedPtr filter{new ReadFilter(*this)};
read_filter_ = filter;
testing::AssertionResult result = shared_connection_.executeOnDispatcher(
[filter = std::move(filter)](Network::Connection& connection) {
connection.addReadFilter(filter);
});
if (!result) {
return result;
if (!shared_connection_.connected()) {
ENVOY_LOG(warn, "FakeRawConnection::initialize: network connection is already disconnected");
return;
}
return FakeConnectionBase::initialize();
ASSERT(shared_connection_.connection().dispatcher().isThreadSafe());
shared_connection_.connection().addReadFilter(filter);
}

AssertionResult FakeRawConnection::waitForData(uint64_t num_bytes, std::string* data,
Expand Down
39 changes: 16 additions & 23 deletions test/integration/fake_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,17 +257,6 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,
connection_.addConnectionCallbacks(*this);
}

Common::CallbackHandle* addDisconnectCallback(DisconnectCallback callback) {
absl::MutexLock lock(&lock_);
return disconnect_callback_manager_.add(callback);
}

// Avoid directly removing by caller, since CallbackManager is not thread safe.
void removeDisconnectCallback(Common::CallbackHandle* handle) {
absl::MutexLock lock(&lock_);
handle->remove();
}

// Network::ConnectionCallbacks
void onEvent(Network::ConnectionEvent event) override {
// Throughout this entire function, we know that the connection_ cannot disappear, since this
Expand All @@ -278,7 +267,6 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,
if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {
disconnected_ = true;
disconnect_callback_manager_.runCallbacks();
}
}

Expand Down Expand Up @@ -315,6 +303,10 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,
if (disconnected_) {
return testing::AssertionSuccess();
}
// Sanity check: detect if the post and wait is attempted from the dispatcher thread; fail
// immediately instead of deadlocking.
ASSERT(!connection_.dispatcher().isThreadSafe(),
"deadlock: executeOnDispatcher called from dispatcher thread.");
bool callback_ready_event = false;
bool unexpected_disconnect = false;
connection_.dispatcher().post(
Expand Down Expand Up @@ -345,13 +337,13 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks,

void setParented() {
absl::MutexLock lock(&lock_);
ASSERT(!parented_);
parented_ = true;
}

private:
Network::Connection& connection_;
absl::Mutex lock_;
Common::CallbackManager<> disconnect_callback_manager_ ABSL_GUARDED_BY(lock_);
bool parented_ ABSL_GUARDED_BY(lock_){};
bool disconnected_ ABSL_GUARDED_BY(lock_){};
};
Expand All @@ -363,7 +355,10 @@ using SharedConnectionWrapperPtr = std::unique_ptr<SharedConnectionWrapper>;
*/
class FakeConnectionBase : public Logger::Loggable<Logger::Id::testing> {
public:
virtual ~FakeConnectionBase() { ASSERT(initialized_); }
virtual ~FakeConnectionBase() {
absl::MutexLock lock(&lock_);
ASSERT(initialized_);
}

ABSL_MUST_USE_RESULT
testing::AssertionResult close(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
Expand All @@ -380,14 +375,10 @@ class FakeConnectionBase : public Logger::Loggable<Logger::Id::testing> {
testing::AssertionResult
waitForHalfClose(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

ABSL_MUST_USE_RESULT
virtual testing::AssertionResult initialize() {
virtual void initialize() {
absl::MutexLock lock(&lock_);
initialized_ = true;
return testing::AssertionSuccess();
}
ABSL_MUST_USE_RESULT
testing::AssertionResult
enableHalfClose(bool enabled, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
// The same caveats apply here as in SharedConnectionWrapper::connection().
Network::Connection& connection() const { return shared_connection_.connection(); }
bool connected() const { return shared_connection_.connected(); }
Expand All @@ -398,9 +389,9 @@ class FakeConnectionBase : public Logger::Loggable<Logger::Id::testing> {
time_system_(time_system) {}

SharedConnectionWrapper& shared_connection_;
bool initialized_{};
absl::Mutex& lock_; // TODO(mattklein123): Use the shared connection lock and figure out better
// guarded by annotations.
bool initialized_ ABSL_GUARDED_BY(lock_){};
bool half_closed_ ABSL_GUARDED_BY(lock_){};
Event::TestTimeSystem& time_system_;
};
Expand Down Expand Up @@ -499,8 +490,7 @@ class FakeRawConnection : public FakeConnectionBase {
testing::AssertionResult write(const std::string& data, bool end_stream = false,
std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

ABSL_MUST_USE_RESULT
testing::AssertionResult initialize() override;
void initialize() override;

// Creates a ValidatorFunction which returns true when data_to_wait_for is
// contained in the incoming data string. Unlike many of Envoy waitFor functions,
Expand Down Expand Up @@ -742,6 +732,9 @@ class FakeUpstream : Logger::Loggable<Logger::Id::testing>,
void threadRoutine();
SharedConnectionWrapper& consumeConnection() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_);
void onRecvDatagram(Network::UdpRecvData& data);
AssertionResult
runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb,
std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

Network::SocketSharedPtr socket_;
Network::ListenSocketFactorySharedPtr socket_factory_;
Expand Down
3 changes: 2 additions & 1 deletion test/integration/hds_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ class HdsIntegrationTest : public Grpc::VersionedGrpcClientIntegrationParamTest,
};

INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, HdsIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

// Tests Envoy HTTP health checking a single healthy endpoint and reporting that it is
// indeed healthy to the server.
Expand Down
3 changes: 2 additions & 1 deletion test/integration/load_stats_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,8 @@ class LoadStatsIntegrationTest : public Grpc::VersionedGrpcClientIntegrationPara
};

INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, LoadStatsIntegrationTest,
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS);
VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS,
Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString);

// Validate the load reports for successful requests as cluster membership
// changes.
Expand Down

0 comments on commit 8de5ad9

Please sign in to comment.