From b973b75a2527a0c89e4eaafc04887211a27bfe31 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Wed, 15 Jan 2020 16:21:15 +0100 Subject: [PATCH 01/13] Extract SignalHandler In preparation of sharing functionality for signal handling, extract what we have right now into SignalHandler. Status: draft For fixing #280 Signed-off-by: Otto van der Schaaf --- source/client/service_main.cc | 38 ++----------------------- source/client/service_main.h | 19 ++----------- source/common/BUILD | 2 ++ source/common/signal_handler.cc | 50 +++++++++++++++++++++++++++++++++ source/common/signal_handler.h | 44 +++++++++++++++++++++++++++++ 5 files changed, 101 insertions(+), 52 deletions(-) create mode 100644 source/common/signal_handler.cc create mode 100644 source/common/signal_handler.h diff --git a/source/client/service_main.cc b/source/client/service_main.cc index ee27b36b7..378e0ba59 100644 --- a/source/client/service_main.cc +++ b/source/client/service_main.cc @@ -5,22 +5,16 @@ #include "nighthawk/common/exception.h" +#include "client/service_impl.h" #include "common/utility.h" #include "common/version_info.h" -#include "client/service_impl.h" - #include "absl/strings/strip.h" #include "tclap/CmdLine.h" namespace Nighthawk { namespace Client { -namespace { -std::function signal_handler_delegate; -void signal_handler(int signal) { signal_handler_delegate(signal); } -} // namespace - ServiceMain::ServiceMain(int argc, const char** argv) { const char* descr = "L7 (HTTP/HTTPS/HTTP2) performance characterization tool."; TCLAP::CmdLine cmd(descr, ' ', VersionInfo::version()); // NOLINT @@ -76,43 +70,17 @@ void ServiceMain::start() { } channel_ = grpc::CreateChannel(listener_bound_address_, grpc::InsecureChannelCredentials()); stub_ = std::make_unique(channel_); - pipe_fds_.resize(2); - // The shutdown thread will be notified of by our signal handler and take it from there. - RELEASE_ASSERT(pipe(pipe_fds_.data()) == 0, "pipe failed"); - - shutdown_thread_ = std::thread([this]() { - int tmp; - RELEASE_ASSERT(read(pipe_fds_[0], &tmp, sizeof(int)) >= 0, "read failed"); - RELEASE_ASSERT(close(pipe_fds_[0]) == 0, "read side close failed"); - RELEASE_ASSERT(close(pipe_fds_[1]) == 0, "write side close failed"); - pipe_fds_.clear(); - server_->Shutdown(); - }); + signal_handler_ = std::make_unique([this]() { server_->Shutdown(); }); } void ServiceMain::wait() { - signal_handler_delegate = [this](int) { onSignal(); }; - signal(SIGTERM, signal_handler); - signal(SIGINT, signal_handler); server_->Wait(); shutdown(); } -void ServiceMain::onSignal() { initiateShutdown(); } - -void ServiceMain::initiateShutdown() { - if (pipe_fds_.size() == 2) { - const int tmp = 0; - RELEASE_ASSERT(write(pipe_fds_[1], &tmp, sizeof(int)) == sizeof(int), "write failed"); - } -} - void ServiceMain::shutdown() { - initiateShutdown(); - if (shutdown_thread_.joinable()) { - shutdown_thread_.join(); - } ENVOY_LOG(info, "Nighthawk grpc service exits"); + std::cerr << "3" << std::endl; } } // namespace Client diff --git a/source/client/service_main.h b/source/client/service_main.h index e4a814069..f08504e86 100644 --- a/source/client/service_main.h +++ b/source/client/service_main.h @@ -13,6 +13,7 @@ #include "api/client/service.pb.h" #include "client/service_impl.h" +#include "common/signal_handler.h" #include "tclap/CmdLine.h" @@ -37,17 +38,6 @@ class ServiceMain : public Envoy::Logger::Loggable { static std::string appendDefaultPortIfNeeded(absl::string_view host_and_maybe_port); private: - /** - * Notifies the thread responsible for shutting down the server that it is time to do so, if - * needed. Safe to use in signal handling, and non-blocking. - */ - void initiateShutdown(); - - /** - * Fires on signal reception. - */ - void onSignal(); - grpc::ServerBuilder builder_; ServiceImpl service_; std::unique_ptr server_; @@ -56,12 +46,7 @@ class ServiceMain : public Envoy::Logger::Loggable { int listener_port_{-1}; std::string listener_bound_address_; std::string listener_output_path_; - // Signal handling needs to be lean so we can't directly initiate shutdown while handling a - // signal. Therefore we write a bite to a this pipe to propagate signal reception. Subsequently, - // the read side will handle the actual shut down of the gRPC service without having to worry - // about signal-safety. - std::vector pipe_fds_; - std::thread shutdown_thread_; + SignalHandlerPtr signal_handler_; }; } // namespace Client diff --git a/source/common/BUILD b/source/common/BUILD index 8fad24e18..60a356bb1 100644 --- a/source/common/BUILD +++ b/source/common/BUILD @@ -43,6 +43,7 @@ envoy_cc_library( srcs = [ "rate_limiter_impl.cc", "sequencer_impl.cc", + "signal_handler.cc", "statistic_impl.cc", "termination_predicate_impl.cc", "uri_impl.cc", @@ -55,6 +56,7 @@ envoy_cc_library( "platform_util_impl.h", "rate_limiter_impl.h", "sequencer_impl.h", + "signal_handler.h", "statistic_impl.h", "termination_predicate_impl.h", "uri_impl.h", diff --git a/source/common/signal_handler.cc b/source/common/signal_handler.cc new file mode 100644 index 000000000..b7361d2a3 --- /dev/null +++ b/source/common/signal_handler.cc @@ -0,0 +1,50 @@ +#include "common/signal_handler.h" + +#include + +#include "external/envoy/source/common/common/assert.h" +#include "external/envoy/source/common/common/macros.h" + +namespace Nighthawk { + +namespace { +std::function signal_handler_delegate; +void signal_handler(int signal) { signal_handler_delegate(signal); } +} // namespace + +SignalHandler::SignalHandler(std::function signal_callback) { + pipe_fds_.resize(2); + // The shutdown thread will be notified of by our signal handler and take it from there. + RELEASE_ASSERT(pipe(pipe_fds_.data()) == 0, "pipe failed"); + + shutdown_thread_ = std::thread([this, signal_callback]() { + int tmp; + RELEASE_ASSERT(read(pipe_fds_[0], &tmp, sizeof(int)) >= 0, "read failed"); + RELEASE_ASSERT(close(pipe_fds_[0]) == 0, "read side close failed"); + RELEASE_ASSERT(close(pipe_fds_[1]) == 0, "write side close failed"); + pipe_fds_.clear(); + signal_callback(); + }); + + signal_handler_delegate = [this](int) { onSignal(); }; + signal(SIGTERM, signal_handler); + signal(SIGINT, signal_handler); +} + +SignalHandler::~SignalHandler() { + initiateShutdown(); + if (shutdown_thread_.joinable()) { + shutdown_thread_.join(); + } +} + +void SignalHandler::initiateShutdown() { + if (pipe_fds_.size() == 2) { + const int tmp = 0; + RELEASE_ASSERT(write(pipe_fds_[1], &tmp, sizeof(int)) == sizeof(int), "write failed"); + } +} + +void SignalHandler::onSignal() { initiateShutdown(); } + +} // namespace Nighthawk \ No newline at end of file diff --git a/source/common/signal_handler.h b/source/common/signal_handler.h new file mode 100644 index 000000000..988e444b3 --- /dev/null +++ b/source/common/signal_handler.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include +#include +#include + +#include "external/envoy/source/common/common/logger.h" + +namespace Nighthawk { + +using SignalCallback = std::function; + +class SignalHandler final : public Envoy::Logger::Loggable { +public: + SignalHandler(SignalCallback signal_callback); + SignalHandler(SignalHandler const&) = delete; + void operator=(SignalHandler const&) = delete; + ~SignalHandler(); + +private: + /** + * Fires on signal reception. + */ + void onSignal(); + + /** + * Notifies the thread responsible for shutting down the server that it is time to do so, if + * needed. Safe to use in signal handling, and non-blocking. + */ + void initiateShutdown(); + + std::thread shutdown_thread_; + + // Signal handling needs to be lean so we can't directly initiate shutdown while handling a + // signal. Therefore we write a bite to a this pipe to propagate signal reception. Subsequently, + // the read side will handle the actual shut down of the gRPC service without having to worry + // about signal-safety. + std::vector pipe_fds_; +}; + +using SignalHandlerPtr = std::unique_ptr; + +} // namespace Nighthawk \ No newline at end of file From fb5c4347cf4f4294d3cf4d71ddf563efb80df7be Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Wed, 15 Jan 2020 16:38:12 +0100 Subject: [PATCH 02/13] Remove accidentally left in comment Signed-off-by: Otto van der Schaaf --- source/client/service_main.cc | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/source/client/service_main.cc b/source/client/service_main.cc index 378e0ba59..a60155c75 100644 --- a/source/client/service_main.cc +++ b/source/client/service_main.cc @@ -78,10 +78,7 @@ void ServiceMain::wait() { shutdown(); } -void ServiceMain::shutdown() { - ENVOY_LOG(info, "Nighthawk grpc service exits"); - std::cerr << "3" << std::endl; -} +void ServiceMain::shutdown() { ENVOY_LOG(info, "Nighthawk grpc service exits"); } } // namespace Client } // namespace Nighthawk From 1bd1d31a2053b3017f8f8b05265e0d9b7f35d5b9 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Wed, 15 Jan 2020 16:56:49 +0100 Subject: [PATCH 03/13] Fix format Signed-off-by: Otto van der Schaaf --- source/client/service_main.cc | 3 ++- source/client/service_main.h | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/client/service_main.cc b/source/client/service_main.cc index a60155c75..a357d09c2 100644 --- a/source/client/service_main.cc +++ b/source/client/service_main.cc @@ -5,10 +5,11 @@ #include "nighthawk/common/exception.h" -#include "client/service_impl.h" #include "common/utility.h" #include "common/version_info.h" +#include "client/service_impl.h" + #include "absl/strings/strip.h" #include "tclap/CmdLine.h" diff --git a/source/client/service_main.h b/source/client/service_main.h index f08504e86..97ae86109 100644 --- a/source/client/service_main.h +++ b/source/client/service_main.h @@ -12,9 +12,10 @@ #include "api/client/service.pb.h" -#include "client/service_impl.h" #include "common/signal_handler.h" +#include "client/service_impl.h" + #include "tclap/CmdLine.h" namespace Nighthawk { From ce5ead6a60328526234090a76df8fe23a69449ee Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Wed, 15 Jan 2020 20:50:40 +0100 Subject: [PATCH 04/13] Amend according to clang-tidy's complaints Signed-off-by: Otto van der Schaaf --- source/common/signal_handler.cc | 4 ++-- source/common/signal_handler.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/common/signal_handler.cc b/source/common/signal_handler.cc index b7361d2a3..8c8a0c4a2 100644 --- a/source/common/signal_handler.cc +++ b/source/common/signal_handler.cc @@ -1,6 +1,6 @@ #include "common/signal_handler.h" -#include +#include #include "external/envoy/source/common/common/assert.h" #include "external/envoy/source/common/common/macros.h" @@ -12,7 +12,7 @@ std::function signal_handler_delegate; void signal_handler(int signal) { signal_handler_delegate(signal); } } // namespace -SignalHandler::SignalHandler(std::function signal_callback) { +SignalHandler::SignalHandler(const std::function signal_callback) { pipe_fds_.resize(2); // The shutdown thread will be notified of by our signal handler and take it from there. RELEASE_ASSERT(pipe(pipe_fds_.data()) == 0, "pipe failed"); diff --git a/source/common/signal_handler.h b/source/common/signal_handler.h index 988e444b3..ae989566b 100644 --- a/source/common/signal_handler.h +++ b/source/common/signal_handler.h @@ -13,7 +13,7 @@ using SignalCallback = std::function; class SignalHandler final : public Envoy::Logger::Loggable { public: - SignalHandler(SignalCallback signal_callback); + SignalHandler(const SignalCallback signal_callback); SignalHandler(SignalHandler const&) = delete; void operator=(SignalHandler const&) = delete; ~SignalHandler(); From 3f991199ee82bd0c5c87dfa6e3e59c9723443b2e Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Wed, 15 Jan 2020 21:18:02 +0100 Subject: [PATCH 05/13] Pass const ref Signed-off-by: Otto van der Schaaf --- source/common/signal_handler.cc | 2 +- source/common/signal_handler.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/common/signal_handler.cc b/source/common/signal_handler.cc index 8c8a0c4a2..aa9316e66 100644 --- a/source/common/signal_handler.cc +++ b/source/common/signal_handler.cc @@ -12,7 +12,7 @@ std::function signal_handler_delegate; void signal_handler(int signal) { signal_handler_delegate(signal); } } // namespace -SignalHandler::SignalHandler(const std::function signal_callback) { +SignalHandler::SignalHandler(const std::function& signal_callback) { pipe_fds_.resize(2); // The shutdown thread will be notified of by our signal handler and take it from there. RELEASE_ASSERT(pipe(pipe_fds_.data()) == 0, "pipe failed"); diff --git a/source/common/signal_handler.h b/source/common/signal_handler.h index ae989566b..38bce2531 100644 --- a/source/common/signal_handler.h +++ b/source/common/signal_handler.h @@ -13,7 +13,7 @@ using SignalCallback = std::function; class SignalHandler final : public Envoy::Logger::Loggable { public: - SignalHandler(const SignalCallback signal_callback); + SignalHandler(const SignalCallback& signal_callback); SignalHandler(SignalHandler const&) = delete; void operator=(SignalHandler const&) = delete; ~SignalHandler(); From 04eec560a5c2d075662c0d4142a097431891e974 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Wed, 17 Jun 2020 12:18:24 +0200 Subject: [PATCH 06/13] CLI: Handle signals, allow cancellation of executions Signed-off-by: Otto van der Schaaf --- include/nighthawk/client/client_worker.h | 5 ++++ include/nighthawk/client/process.h | 5 ++++ source/client/client.cc | 8 +++++- source/client/client_worker_impl.cc | 7 +++++ source/client/client_worker_impl.h | 2 ++ source/client/factories_impl.cc | 6 +++++ source/client/process_impl.cc | 8 ++++++ source/client/process_impl.h | 2 ++ source/client/remote_process_impl.cc | 5 ++++ source/client/remote_process_impl.h | 2 ++ test/integration/test_integration_basics.py | 29 +++++++++++++++++++++ 11 files changed, 78 insertions(+), 1 deletion(-) diff --git a/include/nighthawk/client/client_worker.h b/include/nighthawk/client/client_worker.h index ecb651dca..ad234aa72 100644 --- a/include/nighthawk/client/client_worker.h +++ b/include/nighthawk/client/client_worker.h @@ -34,6 +34,11 @@ class ClientWorker : virtual public Worker { * @return const Phase& associated to this worker. */ virtual const Phase& phase() const PURE; + + /** + * Requests execution cancellation. + */ + virtual void requestExecutionCancellation() PURE; }; using ClientWorkerPtr = std::unique_ptr; diff --git a/include/nighthawk/client/process.h b/include/nighthawk/client/process.h index 39c214e4c..943517726 100644 --- a/include/nighthawk/client/process.h +++ b/include/nighthawk/client/process.h @@ -23,6 +23,11 @@ class Process { * Shuts down the worker. Mandatory call before destructing. */ virtual void shutdown() PURE; + + /** + * Will request all workers to cancel execution asap. + */ + virtual bool requestExecutionCancellation() PURE; }; using ProcessPtr = std::unique_ptr; diff --git a/source/client/client.cc b/source/client/client.cc index cd3b46743..19ef1ff7b 100644 --- a/source/client/client.cc +++ b/source/client/client.cc @@ -23,6 +23,7 @@ #include "api/client/service.grpc.pb.h" #include "common/frequency.h" +#include "common/signal_handler.h" #include "common/uri_impl.h" #include "common/utility.h" @@ -73,7 +74,12 @@ bool Main::run() { } OutputFormatterFactoryImpl output_formatter_factory; OutputCollectorImpl output_collector(time_system, *options_); - const bool res = process->run(output_collector); + bool res; + { + auto signal_handler = + std::make_unique([&process]() { process->requestExecutionCancellation(); }); + res = process->run(output_collector); + } auto formatter = output_formatter_factory.create(options_->outputFormat()); std::cout << formatter->formatProto(output_collector.toProto()); process->shutdown(); diff --git a/source/client/client_worker_impl.cc b/source/client/client_worker_impl.cc index e4231bc87..d1a670895 100644 --- a/source/client/client_worker_impl.cc +++ b/source/client/client_worker_impl.cc @@ -87,6 +87,13 @@ void ClientWorkerImpl::work() { void ClientWorkerImpl::shutdownThread() { benchmark_client_->terminate(); } +void ClientWorkerImpl::requestExecutionCancellation() { + // We just bump a counter, which is watched by a static termination predicate. + // A useful side effect is that this counter will propagate to the output, which leaves + // a note about that execution was subject to cancellation. + dispatcher_->post([this]() { worker_number_scope_->counterFromString("cancel_requests").inc(); }); +} + StatisticPtrMap ClientWorkerImpl::statistics() const { StatisticPtrMap statistics; StatisticPtrMap s1 = benchmark_client_->statistics(); diff --git a/source/client/client_worker_impl.h b/source/client/client_worker_impl.h index 05f2fcb35..41b2660bc 100644 --- a/source/client/client_worker_impl.h +++ b/source/client/client_worker_impl.h @@ -46,6 +46,8 @@ class ClientWorkerImpl : public WorkerImpl, virtual public ClientWorker { void shutdownThread() override; + void requestExecutionCancellation() override; + protected: void work() override; diff --git a/source/client/factories_impl.cc b/source/client/factories_impl.cc index e65111f01..5cf330bd9 100644 --- a/source/client/factories_impl.cc +++ b/source/client/factories_impl.cc @@ -175,6 +175,12 @@ TerminationPredicateFactoryImpl::create(Envoy::TimeSource& time_source, Envoy::S TerminationPredicatePtr duration_predicate = std::make_unique( time_source, options_.duration(), scheduled_starting_time); TerminationPredicate* current_predicate = duration_predicate.get(); + + // We'll always link a predicate which checks for requests to cancel. + current_predicate = ¤t_predicate->link( + std::make_unique( + scope.counterFromString("cancel_requests"), 0, TerminationPredicate::Status::TERMINATE)); + current_predicate = linkConfiguredPredicates(*current_predicate, options_.failurePredicates(), TerminationPredicate::Status::FAIL, scope); linkConfiguredPredicates(*current_predicate, options_.terminationPredicates(), diff --git a/source/client/process_impl.cc b/source/client/process_impl.cc index 1baada4aa..352ab1981 100644 --- a/source/client/process_impl.cc +++ b/source/client/process_impl.cc @@ -135,6 +135,14 @@ void ProcessImpl::shutdown() { shutdown_ = true; } +bool ProcessImpl::requestExecutionCancellation() { + ENVOY_LOG(debug, "Requesting workers to cancel execution"); + for (auto& worker : workers_) { + worker->requestExecutionCancellation(); + } + return true; +} + const std::vector& ProcessImpl::createWorkers(const uint32_t concurrency) { // TODO(oschaaf): Expose kMinimalDelay in configuration. const std::chrono::milliseconds kMinimalWorkerDelay = 500ms + (concurrency * 50ms); diff --git a/source/client/process_impl.h b/source/client/process_impl.h index 967ee7c59..fc5b2b138 100644 --- a/source/client/process_impl.h +++ b/source/client/process_impl.h @@ -69,6 +69,8 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable Date: Wed, 17 Jun 2020 16:26:48 +0200 Subject: [PATCH 07/13] Add lock guarding of the cancellation process + tests Signed-off-by: Otto van der Schaaf --- source/client/process_impl.cc | 95 ++++++++++++--------- source/client/process_impl.h | 4 +- source/client/remote_process_impl.cc | 1 + test/integration/test_integration_basics.py | 4 +- test/process_test.cc | 33 ++++++- 5 files changed, 92 insertions(+), 45 deletions(-) diff --git a/source/client/process_impl.cc b/source/client/process_impl.cc index 352ab1981..15ee6611d 100644 --- a/source/client/process_impl.cc +++ b/source/client/process_impl.cc @@ -123,11 +123,15 @@ void ProcessImpl::shutdown() { // Before we shut down the worker threads, stop threading. tls_.shutdownGlobalThreading(); store_root_.shutdownThreading(); - // Before shutting down the cluster manager, stop the workers. - for (auto& worker : workers_) { - worker->shutdown(); + + { + auto guard = std::make_unique(workers_lock_); + // Before shutting down the cluster manager, stop the workers. + for (auto& worker : workers_) { + worker->shutdown(); + } + workers_.clear(); } - workers_.clear(); if (cluster_manager_ != nullptr) { cluster_manager_->shutdown(); } @@ -137,13 +141,15 @@ void ProcessImpl::shutdown() { bool ProcessImpl::requestExecutionCancellation() { ENVOY_LOG(debug, "Requesting workers to cancel execution"); + auto guard = std::make_unique(workers_lock_); for (auto& worker : workers_) { worker->requestExecutionCancellation(); } + cancelled_ = true; return true; } -const std::vector& ProcessImpl::createWorkers(const uint32_t concurrency) { +void ProcessImpl::createWorkers(const uint32_t concurrency) { // TODO(oschaaf): Expose kMinimalDelay in configuration. const std::chrono::milliseconds kMinimalWorkerDelay = 500ms + (concurrency * 50ms); ASSERT(workers_.empty()); @@ -173,7 +179,6 @@ const std::vector& ProcessImpl::createWorkers(const uint32_t co : ClientWorkerImpl::HardCodedWarmupStyle::OFF)); worker_number++; } - return workers_; } void ProcessImpl::configureComponentLogLevels(spdlog::level::level_enum level) { @@ -385,44 +390,50 @@ void ProcessImpl::addRequestSourceCluster( bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector& uris, const UriPtr& request_source_uri, const UriPtr& tracing_uri) { - int number_of_workers = determineConcurrency(); - shutdown_ = false; - const std::vector& workers = createWorkers(number_of_workers); - tls_.registerThread(*dispatcher_, true); - store_root_.initializeThreading(*dispatcher_, tls_); - runtime_singleton_ = std::make_unique( - Envoy::Runtime::LoaderPtr{new Envoy::Runtime::LoaderImpl( - *dispatcher_, tls_, {}, *local_info_, store_root_, generator_, - Envoy::ProtobufMessage::getStrictValidationVisitor(), *api_)}); - ssl_context_manager_ = - std::make_unique(time_system_); - cluster_manager_factory_ = std::make_unique( - admin_, Envoy::Runtime::LoaderSingleton::get(), store_root_, tls_, generator_, - dispatcher_->createDnsResolver({}, false), *ssl_context_manager_, *dispatcher_, *local_info_, - secret_manager_, validation_context_, *api_, http_context_, grpc_context_, - access_log_manager_, *singleton_manager_); - cluster_manager_factory_->setConnectionReuseStrategy( - options_.h1ConnectionReuseStrategy() == nighthawk::client::H1ConnectionReuseStrategy::LRU - ? Http1PoolImpl::ConnectionReuseStrategy::LRU - : Http1PoolImpl::ConnectionReuseStrategy::MRU); - cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections()); - envoy::config::bootstrap::v3::Bootstrap bootstrap; - createBootstrapConfiguration(bootstrap, uris, request_source_uri, number_of_workers); - if (tracing_uri != nullptr) { - setupTracingImplementation(bootstrap, *tracing_uri); - addTracingCluster(bootstrap, *tracing_uri); - } - ENVOY_LOG(debug, "Computed configuration: {}", bootstrap.DebugString()); - cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap); - maybeCreateTracingDriver(bootstrap.tracing()); - cluster_manager_->setInitializedCb([this]() -> void { init_manager_.initialize(init_watcher_); }); + { + auto guard = std::make_unique(workers_lock_); + if (cancelled_) { + return true; + } + int number_of_workers = determineConcurrency(); + shutdown_ = false; + createWorkers(number_of_workers); + tls_.registerThread(*dispatcher_, true); + store_root_.initializeThreading(*dispatcher_, tls_); + runtime_singleton_ = std::make_unique( + Envoy::Runtime::LoaderPtr{new Envoy::Runtime::LoaderImpl( + *dispatcher_, tls_, {}, *local_info_, store_root_, generator_, + Envoy::ProtobufMessage::getStrictValidationVisitor(), *api_)}); + ssl_context_manager_ = + std::make_unique(time_system_); + cluster_manager_factory_ = std::make_unique( + admin_, Envoy::Runtime::LoaderSingleton::get(), store_root_, tls_, generator_, + dispatcher_->createDnsResolver({}, false), *ssl_context_manager_, *dispatcher_, + *local_info_, secret_manager_, validation_context_, *api_, http_context_, grpc_context_, + access_log_manager_, *singleton_manager_); + cluster_manager_factory_->setConnectionReuseStrategy( + options_.h1ConnectionReuseStrategy() == nighthawk::client::H1ConnectionReuseStrategy::LRU + ? Http1PoolImpl::ConnectionReuseStrategy::LRU + : Http1PoolImpl::ConnectionReuseStrategy::MRU); + cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections()); + envoy::config::bootstrap::v3::Bootstrap bootstrap; + createBootstrapConfiguration(bootstrap, uris, request_source_uri, number_of_workers); + if (tracing_uri != nullptr) { + setupTracingImplementation(bootstrap, *tracing_uri); + addTracingCluster(bootstrap, *tracing_uri); + } + ENVOY_LOG(debug, "Computed configuration: {}", bootstrap.DebugString()); + cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap); + maybeCreateTracingDriver(bootstrap.tracing()); + cluster_manager_->setInitializedCb( + [this]() -> void { init_manager_.initialize(init_watcher_); }); - Runtime::LoaderSingleton::get().initialize(*cluster_manager_); + Runtime::LoaderSingleton::get().initialize(*cluster_manager_); - for (auto& w : workers_) { - w->start(); + for (auto& w : workers_) { + w->start(); + } } - for (auto& w : workers_) { w->waitForCompletion(); } @@ -451,7 +462,7 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector 0; }); StatisticFactoryImpl statistic_factory(options_); - collector.addResult("global", mergeWorkerStatistics(workers), counters, + collector.addResult("global", mergeWorkerStatistics(workers_), counters, total_execution_duration / workers_.size()); return counters.find("sequencer.failed_terminations") == counters.end(); } diff --git a/source/client/process_impl.h b/source/client/process_impl.h index fc5b2b138..fd8f57a8a 100644 --- a/source/client/process_impl.h +++ b/source/client/process_impl.h @@ -90,7 +90,7 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable& createWorkers(const uint32_t concurrency); + void createWorkers(const uint32_t concurrency); std::vector vectorizeStatisticPtrMap(const StatisticPtrMap& statistics) const; std::vector mergeWorkerStatistics(const std::vector& workers) const; @@ -136,6 +136,8 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable #include #include "nighthawk/common/exception.h" @@ -32,12 +33,35 @@ class ProcessTest : public TestWithParam { : loopback_address_(Envoy::Network::Test::getLoopbackAddressUrlString(GetParam())), options_(TestUtility::createOptionsImpl( fmt::format("foo --duration 1 -v error --rps 10 https://{}/", loopback_address_))){}; - void runProcess(RunExpectation expectation) { + void runProcess(RunExpectation expectation, bool do_cancel = false) { ProcessPtr process = std::make_unique(*options_, time_system_); OutputCollectorImpl collector(time_system_, *options_); + std::thread cancel_thread; + if (do_cancel) { + cancel_thread = std::thread([&process] { + sleep(5); + std::cerr << "request cancel" << std::endl; + process->requestExecutionCancellation(); + }); + } const auto result = process->run(collector) ? RunExpectation::EXPECT_SUCCESS : RunExpectation::EXPECT_FAILURE; EXPECT_EQ(result, expectation); + if (cancel_thread.joinable()) { + cancel_thread.join(); + } + if (do_cancel) { + auto proto = collector.toProto(); + int cancel_requests = 0; + for (const auto& result : proto.results()) { + for (const auto& counter : result.counters()) { + if (counter.name() == "cancel_requests") { + cancel_requests++; + } + } + } + EXPECT_EQ(3, cancel_requests); // global results + two workers + } process->shutdown(); } @@ -64,5 +88,12 @@ TEST_P(ProcessTest, BadTracerSpec) { runProcess(RunExpectation::EXPECT_FAILURE); } +TEST_P(ProcessTest, CancelExecution) { + options_ = TestUtility::createOptionsImpl( + fmt::format("foo --duration 10 --failure-predicate foo:0 --concurrency 2 https://{}/", + loopback_address_)); + runProcess(RunExpectation::EXPECT_SUCCESS, true); +} + } // namespace Client } // namespace Nighthawk From 2c04d1012199b0e7d9de447c2f662c90eaf87fd6 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Fri, 19 Jun 2020 00:36:00 +0200 Subject: [PATCH 08/13] s/cancel_requests/graceful_stop_requested/ Signed-off-by: Otto van der Schaaf --- source/client/client_worker_impl.cc | 3 ++- source/client/factories_impl.cc | 3 ++- test/integration/test_integration_basics.py | 2 +- test/process_test.cc | 8 ++++---- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/source/client/client_worker_impl.cc b/source/client/client_worker_impl.cc index d1a670895..09aaba66a 100644 --- a/source/client/client_worker_impl.cc +++ b/source/client/client_worker_impl.cc @@ -91,7 +91,8 @@ void ClientWorkerImpl::requestExecutionCancellation() { // We just bump a counter, which is watched by a static termination predicate. // A useful side effect is that this counter will propagate to the output, which leaves // a note about that execution was subject to cancellation. - dispatcher_->post([this]() { worker_number_scope_->counterFromString("cancel_requests").inc(); }); + dispatcher_->post( + [this]() { worker_number_scope_->counterFromString("graceful_stop_requested").inc(); }); } StatisticPtrMap ClientWorkerImpl::statistics() const { diff --git a/source/client/factories_impl.cc b/source/client/factories_impl.cc index d18bce4db..f681166bc 100644 --- a/source/client/factories_impl.cc +++ b/source/client/factories_impl.cc @@ -177,7 +177,8 @@ TerminationPredicateFactoryImpl::create(Envoy::TimeSource& time_source, Envoy::S // We'll always link a predicate which checks for requests to cancel. TerminationPredicatePtr root_predicate = std::make_unique( - scope.counterFromString("cancel_requests"), 0, TerminationPredicate::Status::TERMINATE); + scope.counterFromString("graceful_stop_requested"), 0, + TerminationPredicate::Status::TERMINATE); TerminationPredicate* current_predicate = root_predicate.get(); if (!options_.noDuration()) { diff --git a/test/integration/test_integration_basics.py b/test/integration/test_integration_basics.py index 3e557b051..6c08d7b69 100644 --- a/test/integration/test_integration_basics.py +++ b/test/integration/test_integration_basics.py @@ -682,4 +682,4 @@ def test_cancellation(http_test_server_fixture): assert (client_process.returncode == 0) parsed_json = json.loads(output) counters = http_test_server_fixture.getNighthawkCounterMapFromJson(parsed_json) - assertCounterEqual(counters, "cancel_requests", 2) + assertCounterEqual(counters, "graceful_stop_requested", 2) diff --git a/test/process_test.cc b/test/process_test.cc index f0ffabe2d..ea82b9b06 100644 --- a/test/process_test.cc +++ b/test/process_test.cc @@ -52,15 +52,15 @@ class ProcessTest : public TestWithParam { } if (do_cancel) { auto proto = collector.toProto(); - int cancel_requests = 0; + int graceful_stop_requested = 0; for (const auto& result : proto.results()) { for (const auto& counter : result.counters()) { - if (counter.name() == "cancel_requests") { - cancel_requests++; + if (counter.name() == "graceful_stop_requested") { + graceful_stop_requested++; } } } - EXPECT_EQ(3, cancel_requests); // global results + two workers + EXPECT_EQ(3, graceful_stop_requested); // global results + two workers } process->shutdown(); } From 55f694f200b6dfc8f6e018c74ec8b7459b9750ea Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Fri, 19 Jun 2020 00:37:30 +0200 Subject: [PATCH 09/13] Eliminate the NullTerminationPredicate: dead code Signed-off-by: Otto van der Schaaf --- source/common/termination_predicate_impl.h | 8 -------- 1 file changed, 8 deletions(-) diff --git a/source/common/termination_predicate_impl.h b/source/common/termination_predicate_impl.h index 80e4c67a9..c1c761345 100644 --- a/source/common/termination_predicate_impl.h +++ b/source/common/termination_predicate_impl.h @@ -27,14 +27,6 @@ class TerminationPredicateBaseImpl : public TerminationPredicate { TerminationPredicatePtr linked_child_; }; -/** - * Predicate which always returns TerminationPredicate::Status::PROCEED. - */ -class NullTerminationPredicateImpl : public TerminationPredicateBaseImpl { -public: - TerminationPredicate::Status evaluate() override; -}; - /** * Predicate which indicates termination iff the passed in duration has expired. * time tracking starts at the first call to evaluate(). From d1a696b38b4739d93ca05656ba243585a92f3a55 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Fri, 19 Jun 2020 00:40:58 +0200 Subject: [PATCH 10/13] Remove debug print line Signed-off-by: Otto van der Schaaf --- source/common/termination_predicate_impl.cc | 4 ---- test/process_test.cc | 1 - 2 files changed, 5 deletions(-) diff --git a/source/common/termination_predicate_impl.cc b/source/common/termination_predicate_impl.cc index 2ad96c5a6..d32f2006b 100644 --- a/source/common/termination_predicate_impl.cc +++ b/source/common/termination_predicate_impl.cc @@ -15,10 +15,6 @@ TerminationPredicate::Status TerminationPredicateBaseImpl::evaluateChain() { return status; } -TerminationPredicate::Status NullTerminationPredicateImpl::evaluate() { - return TerminationPredicate::Status::PROCEED; -} - TerminationPredicate::Status DurationTerminationPredicateImpl::evaluate() { return time_source_.monotonicTime() - start_ > duration_ ? TerminationPredicate::Status::TERMINATE : TerminationPredicate::Status::PROCEED; diff --git a/test/process_test.cc b/test/process_test.cc index ea82b9b06..c7f461967 100644 --- a/test/process_test.cc +++ b/test/process_test.cc @@ -40,7 +40,6 @@ class ProcessTest : public TestWithParam { if (do_cancel) { cancel_thread = std::thread([&process] { sleep(5); - std::cerr << "request cancel" << std::endl; process->requestExecutionCancellation(); }); } From 1fddbec526a464606577a569cf8a8a464d175025 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Mon, 22 Jun 2020 20:47:51 +0200 Subject: [PATCH 11/13] Partially address review feedback Signed-off-by: Otto van der Schaaf --- test/integration/test_integration_basics.py | 1 + test/process_test.cc | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/test/integration/test_integration_basics.py b/test/integration/test_integration_basics.py index 6cf1b5120..4fd2d7aa8 100644 --- a/test/integration/test_integration_basics.py +++ b/test/integration/test_integration_basics.py @@ -683,6 +683,7 @@ def test_cancellation(http_test_server_fixture): parsed_json = json.loads(output) counters = http_test_server_fixture.getNighthawkCounterMapFromJson(parsed_json) assertCounterEqual(counters, "graceful_stop_requested", 2) + assertCounterGreaterEqual(counters, "benchmark.http_2xx", 1) def _run_client_with_args(args): diff --git a/test/process_test.cc b/test/process_test.cc index c7f461967..f440bd28a 100644 --- a/test/process_test.cc +++ b/test/process_test.cc @@ -33,6 +33,7 @@ class ProcessTest : public TestWithParam { : loopback_address_(Envoy::Network::Test::getLoopbackAddressUrlString(GetParam())), options_(TestUtility::createOptionsImpl( fmt::format("foo --duration 1 -v error --rps 10 https://{}/", loopback_address_))){}; + void runProcess(RunExpectation expectation, bool do_cancel = false) { ProcessPtr process = std::make_unique(*options_, time_system_); OutputCollectorImpl collector(time_system_, *options_); @@ -88,8 +89,11 @@ TEST_P(ProcessTest, BadTracerSpec) { } TEST_P(ProcessTest, CancelExecution) { + // The failure predicate below is there to wipe out any stock ones. We want this to run for a long + // time, even if the upstream fails (there is no live upstream in this test, we send traffic into + // the void), so we can check cancellation works. options_ = TestUtility::createOptionsImpl( - fmt::format("foo --duration 10 --failure-predicate foo:0 --concurrency 2 https://{}/", + fmt::format("foo --duration 300 --failure-predicate foo:0 --concurrency 2 https://{}/", loopback_address_)); runProcess(RunExpectation::EXPECT_SUCCESS, true); } From 20e610867dbbbeb48d07952fa0de45b1bad62d52 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Mon, 22 Jun 2020 21:29:09 +0200 Subject: [PATCH 12/13] Review feedback pt II Signed-off-by: Otto van der Schaaf --- test/process_test.cc | 43 ++++++++++++++++++++++++++++++------------- 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/test/process_test.cc b/test/process_test.cc index f440bd28a..29065590f 100644 --- a/test/process_test.cc +++ b/test/process_test.cc @@ -34,33 +34,43 @@ class ProcessTest : public TestWithParam { options_(TestUtility::createOptionsImpl( fmt::format("foo --duration 1 -v error --rps 10 https://{}/", loopback_address_))){}; - void runProcess(RunExpectation expectation, bool do_cancel = false) { + void runProcess(RunExpectation expectation, bool do_cancel = false, + bool terminate_right_away = false) { ProcessPtr process = std::make_unique(*options_, time_system_); OutputCollectorImpl collector(time_system_, *options_); std::thread cancel_thread; if (do_cancel) { - cancel_thread = std::thread([&process] { - sleep(5); + cancel_thread = std::thread([&process, terminate_right_away] { + if (!terminate_right_away) { + sleep(5); + } process->requestExecutionCancellation(); }); + if (terminate_right_away) { + cancel_thread.join(); + } } const auto result = process->run(collector) ? RunExpectation::EXPECT_SUCCESS : RunExpectation::EXPECT_FAILURE; EXPECT_EQ(result, expectation); - if (cancel_thread.joinable()) { - cancel_thread.join(); - } if (do_cancel) { + if (cancel_thread.joinable()) { + cancel_thread.join(); + } auto proto = collector.toProto(); - int graceful_stop_requested = 0; - for (const auto& result : proto.results()) { - for (const auto& counter : result.counters()) { - if (counter.name() == "graceful_stop_requested") { - graceful_stop_requested++; + if (terminate_right_away) { + EXPECT_EQ(0, proto.results().size()); + } else { + int graceful_stop_requested = 0; + for (const auto& result : proto.results()) { + for (const auto& counter : result.counters()) { + if (counter.name() == "graceful_stop_requested") { + graceful_stop_requested++; + } } } + EXPECT_EQ(3, graceful_stop_requested); // global results + two workers } - EXPECT_EQ(3, graceful_stop_requested); // global results + two workers } process->shutdown(); } @@ -88,7 +98,7 @@ TEST_P(ProcessTest, BadTracerSpec) { runProcess(RunExpectation::EXPECT_FAILURE); } -TEST_P(ProcessTest, CancelExecution) { +TEST_P(ProcessTest, CancelDuringLoadTest) { // The failure predicate below is there to wipe out any stock ones. We want this to run for a long // time, even if the upstream fails (there is no live upstream in this test, we send traffic into // the void), so we can check cancellation works. @@ -98,5 +108,12 @@ TEST_P(ProcessTest, CancelExecution) { runProcess(RunExpectation::EXPECT_SUCCESS, true); } +TEST_P(ProcessTest, CancelExecutionBeforeBeginLoadTest) { + options_ = TestUtility::createOptionsImpl( + fmt::format("foo --duration 300 --failure-predicate foo:0 --concurrency 2 https://{}/", + loopback_address_)); + runProcess(RunExpectation::EXPECT_SUCCESS, true, true); +} + } // namespace Client } // namespace Nighthawk From 31986235bfd50c70aad4c7e5ade9d94c7030c5ef Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Tue, 23 Jun 2020 00:39:03 +0200 Subject: [PATCH 13/13] Review feedback Signed-off-by: Otto van der Schaaf --- source/client/client.cc | 8 ++++---- source/client/process_impl.h | 6 ++++++ source/client/remote_process_impl.cc | 2 +- test/integration/test_integration_basics.py | 13 +++++++------ test/process_test.cc | 3 +++ 5 files changed, 21 insertions(+), 11 deletions(-) diff --git a/source/client/client.cc b/source/client/client.cc index 19ef1ff7b..8c08eda00 100644 --- a/source/client/client.cc +++ b/source/client/client.cc @@ -74,21 +74,21 @@ bool Main::run() { } OutputFormatterFactoryImpl output_formatter_factory; OutputCollectorImpl output_collector(time_system, *options_); - bool res; + bool result; { auto signal_handler = std::make_unique([&process]() { process->requestExecutionCancellation(); }); - res = process->run(output_collector); + result = process->run(output_collector); } auto formatter = output_formatter_factory.create(options_->outputFormat()); std::cout << formatter->formatProto(output_collector.toProto()); process->shutdown(); - if (!res) { + if (!result) { ENVOY_LOG(error, "An error ocurred."); } else { ENVOY_LOG(info, "Done."); } - return res; + return result; } } // namespace Client diff --git a/source/client/process_impl.h b/source/client/process_impl.h index 343c52abf..11d265fb8 100644 --- a/source/client/process_impl.h +++ b/source/client/process_impl.h @@ -101,6 +101,12 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable vectorizeStatisticPtrMap(const StatisticPtrMap& statistics) const; std::vector diff --git a/source/client/remote_process_impl.cc b/source/client/remote_process_impl.cc index fea08ddd3..05e0b859f 100644 --- a/source/client/remote_process_impl.cc +++ b/source/client/remote_process_impl.cc @@ -56,7 +56,7 @@ bool RemoteProcessImpl::run(OutputCollector& collector) { bool RemoteProcessImpl::requestExecutionCancellation() { ENVOY_LOG(error, "Remote process cancellation not supported yet"); - // TODO(XXX): Send a cancel request to the gRPC service. + // TODO(#380): Send a cancel request to the gRPC service. return false; } diff --git a/test/integration/test_integration_basics.py b/test/integration/test_integration_basics.py index 4fd2d7aa8..d82fdb478 100644 --- a/test/integration/test_integration_basics.py +++ b/test/integration/test_integration_basics.py @@ -658,16 +658,17 @@ def test_http_request_release_timing(http_test_server_fixture, qps_parameterizat assertCounterEqual(counters, "benchmark.http_2xx", (total_requests)) -def send_sigterm(p): +def _send_sigterm(process): # Sleep for a while, under tsan the client needs a lot of time - # to start up. + # to start up. 10 seconds has been determined to work through + # emperical observation. time.sleep(10) - p.terminate() + process.terminate() def test_cancellation(http_test_server_fixture): """ - That that we can use signals to cancel execution. + Make sure that we can use signals to cancel execution. """ args = [ http_test_server_fixture.nighthawk_client_path, "--concurrency", "2", @@ -675,11 +676,11 @@ def test_cancellation(http_test_server_fixture): "json" ] client_process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - Thread(target=(lambda: send_sigterm(client_process))).start() + Thread(target=(lambda: _send_sigterm(client_process))).start() stdout, stderr = client_process.communicate() client_process.wait() output = stdout.decode('utf-8') - assert (client_process.returncode == 0) + assertEqual(client_process.returncode, 0) parsed_json = json.loads(output) counters = http_test_server_fixture.getNighthawkCounterMapFromJson(parsed_json) assertCounterEqual(counters, "graceful_stop_requested", 2) diff --git a/test/process_test.cc b/test/process_test.cc index 29065590f..b2b977175 100644 --- a/test/process_test.cc +++ b/test/process_test.cc @@ -42,6 +42,9 @@ class ProcessTest : public TestWithParam { if (do_cancel) { cancel_thread = std::thread([&process, terminate_right_away] { if (!terminate_right_away) { + // We sleep to give the the load test execution in the other thread a change to get + // started before we request cancellation. Five seconds has been determined to work with + // the sanitizer runs in CI through emperical observation. sleep(5); } process->requestExecutionCancellation();