Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLI - cancel execution upon signal reception #367

Merged
merged 21 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b973b75
Extract SignalHandler
oschaaf Jan 15, 2020
fb5c434
Remove accidentally left in comment
oschaaf Jan 15, 2020
1bd1d31
Fix format
oschaaf Jan 15, 2020
fcba594
Merge remote-tracking branch 'upstream/master' into extract-signal-ha…
oschaaf Jan 15, 2020
ce5ead6
Amend according to clang-tidy's complaints
oschaaf Jan 15, 2020
3f99119
Pass const ref
oschaaf Jan 15, 2020
111ba5d
Merge remote-tracking branch 'upstream/master' into extract-signal-ha…
oschaaf Jan 16, 2020
89dfdf6
Merge remote-tracking branch 'upstream/master' into extract-signal-ha…
oschaaf Jun 10, 2020
c457af6
Merge remote-tracking branch 'upstream/master' into extract-signal-ha…
oschaaf Jun 15, 2020
04eec56
CLI: Handle signals, allow cancellation of executions
oschaaf Jun 17, 2020
a11050f
Add lock guarding of the cancellation process + tests
oschaaf Jun 17, 2020
781df87
Merge remote-tracking branch 'upstream/master' into execution-cancell…
oschaaf Jun 18, 2020
2c04d10
s/cancel_requests/graceful_stop_requested/
oschaaf Jun 18, 2020
55f694f
Eliminate the NullTerminationPredicate: dead code
oschaaf Jun 18, 2020
d1a696b
Remove debug print line
oschaaf Jun 18, 2020
3c35f86
Merge remote-tracking branch 'upstream/master' into execution-cancell…
oschaaf Jun 19, 2020
9e0d9b2
Merge remote-tracking branch 'upstream/master' into execution-cancell…
oschaaf Jun 19, 2020
57d71f0
Merge remote-tracking branch 'upstream/master' into execution-cancell…
oschaaf Jun 22, 2020
1fddbec
Partially address review feedback
oschaaf Jun 22, 2020
20e6108
Review feedback pt II
oschaaf Jun 22, 2020
3198623
Review feedback
oschaaf Jun 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/nighthawk/client/client_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ class ClientWorker : virtual public Worker {
* @return const Phase& associated to this worker.
*/
virtual const Phase& phase() const PURE;

/**
* Requests execution cancellation.
*/
virtual void requestExecutionCancellation() PURE;
};

using ClientWorkerPtr = std::unique_ptr<ClientWorker>;
Expand Down
5 changes: 5 additions & 0 deletions include/nighthawk/client/process.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ class Process {
* Shuts down the worker. Mandatory call before destructing.
*/
virtual void shutdown() PURE;

/**
* Will request all workers to cancel execution asap.
*/
virtual bool requestExecutionCancellation() PURE;
};

using ProcessPtr = std::unique_ptr<Process>;
Expand Down
8 changes: 7 additions & 1 deletion source/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "api/client/service.grpc.pb.h"

#include "common/frequency.h"
#include "common/signal_handler.h"
#include "common/uri_impl.h"
#include "common/utility.h"

Expand Down Expand Up @@ -73,7 +74,12 @@ bool Main::run() {
}
OutputFormatterFactoryImpl output_formatter_factory;
OutputCollectorImpl output_collector(time_system, *options_);
const bool res = process->run(output_collector);
bool res;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) If we can, we should try to avoid abbreviations in variable names (res).

{
auto signal_handler =
std::make_unique<SignalHandler>([&process]() { process->requestExecutionCancellation(); });
res = process->run(output_collector);
}
auto formatter = output_formatter_factory.create(options_->outputFormat());
std::cout << formatter->formatProto(output_collector.toProto());
process->shutdown();
Expand Down
7 changes: 7 additions & 0 deletions source/client/client_worker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ void ClientWorkerImpl::work() {

void ClientWorkerImpl::shutdownThread() { benchmark_client_->terminate(); }

void ClientWorkerImpl::requestExecutionCancellation() {
// We just bump a counter, which is watched by a static termination predicate.
// A useful side effect is that this counter will propagate to the output, which leaves
// a note about that execution was subject to cancellation.
dispatcher_->post([this]() { worker_number_scope_->counterFromString("cancel_requests").inc(); });
}

StatisticPtrMap ClientWorkerImpl::statistics() const {
StatisticPtrMap statistics;
StatisticPtrMap s1 = benchmark_client_->statistics();
Expand Down
2 changes: 2 additions & 0 deletions source/client/client_worker_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class ClientWorkerImpl : public WorkerImpl, virtual public ClientWorker {

void shutdownThread() override;

void requestExecutionCancellation() override;

protected:
void work() override;

Expand Down
6 changes: 6 additions & 0 deletions source/client/factories_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ TerminationPredicateFactoryImpl::create(Envoy::TimeSource& time_source, Envoy::S
TerminationPredicatePtr duration_predicate = std::make_unique<DurationTerminationPredicateImpl>(
time_source, options_.duration(), scheduled_starting_time);
TerminationPredicate* current_predicate = duration_predicate.get();

// We'll always link a predicate which checks for requests to cancel.
current_predicate = &current_predicate->link(
std::make_unique<StatsCounterAbsoluteThresholdTerminationPredicateImpl>(
scope.counterFromString("cancel_requests"), 0, TerminationPredicate::Status::TERMINATE));

current_predicate = linkConfiguredPredicates(*current_predicate, options_.failurePredicates(),
TerminationPredicate::Status::FAIL, scope);
linkConfiguredPredicates(*current_predicate, options_.terminationPredicates(),
Expand Down
103 changes: 61 additions & 42 deletions source/client/process_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,33 @@ void ProcessImpl::shutdown() {
// Before we shut down the worker threads, stop threading.
tls_.shutdownGlobalThreading();
store_root_.shutdownThreading();
// Before shutting down the cluster manager, stop the workers.
for (auto& worker : workers_) {
worker->shutdown();

{
auto guard = std::make_unique<Envoy::Thread::LockGuard>(workers_lock_);
// Before shutting down the cluster manager, stop the workers.
for (auto& worker : workers_) {
worker->shutdown();
}
workers_.clear();
}
workers_.clear();
if (cluster_manager_ != nullptr) {
cluster_manager_->shutdown();
}
tls_.shutdownThread();
shutdown_ = true;
}

const std::vector<ClientWorkerPtr>& ProcessImpl::createWorkers(const uint32_t concurrency) {
bool ProcessImpl::requestExecutionCancellation() {
ENVOY_LOG(debug, "Requesting workers to cancel execution");
auto guard = std::make_unique<Envoy::Thread::LockGuard>(workers_lock_);
for (auto& worker : workers_) {
worker->requestExecutionCancellation();
}
cancelled_ = true;
return true;
}

void ProcessImpl::createWorkers(const uint32_t concurrency) {
// TODO(oschaaf): Expose kMinimalDelay in configuration.
const std::chrono::milliseconds kMinimalWorkerDelay = 500ms + (concurrency * 50ms);
ASSERT(workers_.empty());
Expand Down Expand Up @@ -165,7 +179,6 @@ const std::vector<ClientWorkerPtr>& ProcessImpl::createWorkers(const uint32_t co
: ClientWorkerImpl::HardCodedWarmupStyle::OFF));
worker_number++;
}
return workers_;
}

void ProcessImpl::configureComponentLogLevels(spdlog::level::level_enum level) {
Expand Down Expand Up @@ -377,44 +390,50 @@ void ProcessImpl::addRequestSourceCluster(

bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriPtr>& uris,
const UriPtr& request_source_uri, const UriPtr& tracing_uri) {
int number_of_workers = determineConcurrency();
shutdown_ = false;
const std::vector<ClientWorkerPtr>& workers = createWorkers(number_of_workers);
tls_.registerThread(*dispatcher_, true);
store_root_.initializeThreading(*dispatcher_, tls_);
runtime_singleton_ = std::make_unique<Envoy::Runtime::ScopedLoaderSingleton>(
Envoy::Runtime::LoaderPtr{new Envoy::Runtime::LoaderImpl(
*dispatcher_, tls_, {}, *local_info_, store_root_, generator_,
Envoy::ProtobufMessage::getStrictValidationVisitor(), *api_)});
ssl_context_manager_ =
std::make_unique<Extensions::TransportSockets::Tls::ContextManagerImpl>(time_system_);
cluster_manager_factory_ = std::make_unique<ClusterManagerFactory>(
admin_, Envoy::Runtime::LoaderSingleton::get(), store_root_, tls_, generator_,
dispatcher_->createDnsResolver({}, false), *ssl_context_manager_, *dispatcher_, *local_info_,
secret_manager_, validation_context_, *api_, http_context_, grpc_context_,
access_log_manager_, *singleton_manager_);
cluster_manager_factory_->setConnectionReuseStrategy(
options_.h1ConnectionReuseStrategy() == nighthawk::client::H1ConnectionReuseStrategy::LRU
? Http1PoolImpl::ConnectionReuseStrategy::LRU
: Http1PoolImpl::ConnectionReuseStrategy::MRU);
cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections());
envoy::config::bootstrap::v3::Bootstrap bootstrap;
createBootstrapConfiguration(bootstrap, uris, request_source_uri, number_of_workers);
if (tracing_uri != nullptr) {
setupTracingImplementation(bootstrap, *tracing_uri);
addTracingCluster(bootstrap, *tracing_uri);
}
ENVOY_LOG(debug, "Computed configuration: {}", bootstrap.DebugString());
cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap);
maybeCreateTracingDriver(bootstrap.tracing());
cluster_manager_->setInitializedCb([this]() -> void { init_manager_.initialize(init_watcher_); });
{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: The "hide whitespace" toggle in Github makes this a lot more fun to review.
(The scope introduced for the lock guard introduces a lot of whitespace noise).

auto guard = std::make_unique<Envoy::Thread::LockGuard>(workers_lock_);
if (cancelled_) {
return true;
}
int number_of_workers = determineConcurrency();
shutdown_ = false;
createWorkers(number_of_workers);
tls_.registerThread(*dispatcher_, true);
store_root_.initializeThreading(*dispatcher_, tls_);
runtime_singleton_ = std::make_unique<Envoy::Runtime::ScopedLoaderSingleton>(
Envoy::Runtime::LoaderPtr{new Envoy::Runtime::LoaderImpl(
*dispatcher_, tls_, {}, *local_info_, store_root_, generator_,
Envoy::ProtobufMessage::getStrictValidationVisitor(), *api_)});
ssl_context_manager_ =
std::make_unique<Extensions::TransportSockets::Tls::ContextManagerImpl>(time_system_);
cluster_manager_factory_ = std::make_unique<ClusterManagerFactory>(
admin_, Envoy::Runtime::LoaderSingleton::get(), store_root_, tls_, generator_,
dispatcher_->createDnsResolver({}, false), *ssl_context_manager_, *dispatcher_,
*local_info_, secret_manager_, validation_context_, *api_, http_context_, grpc_context_,
access_log_manager_, *singleton_manager_);
cluster_manager_factory_->setConnectionReuseStrategy(
options_.h1ConnectionReuseStrategy() == nighthawk::client::H1ConnectionReuseStrategy::LRU
? Http1PoolImpl::ConnectionReuseStrategy::LRU
: Http1PoolImpl::ConnectionReuseStrategy::MRU);
cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections());
envoy::config::bootstrap::v3::Bootstrap bootstrap;
createBootstrapConfiguration(bootstrap, uris, request_source_uri, number_of_workers);
if (tracing_uri != nullptr) {
setupTracingImplementation(bootstrap, *tracing_uri);
addTracingCluster(bootstrap, *tracing_uri);
}
ENVOY_LOG(debug, "Computed configuration: {}", bootstrap.DebugString());
cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap);
maybeCreateTracingDriver(bootstrap.tracing());
cluster_manager_->setInitializedCb(
[this]() -> void { init_manager_.initialize(init_watcher_); });

Runtime::LoaderSingleton::get().initialize(*cluster_manager_);
Runtime::LoaderSingleton::get().initialize(*cluster_manager_);

for (auto& w : workers_) {
w->start();
for (auto& w : workers_) {
w->start();
}
}

for (auto& w : workers_) {
w->waitForCompletion();
}
Expand Down Expand Up @@ -443,7 +462,7 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriP
const auto& counters = Utility().mapCountersFromStore(
store_root_, [](absl::string_view, uint64_t value) { return value > 0; });
StatisticFactoryImpl statistic_factory(options_);
collector.addResult("global", mergeWorkerStatistics(workers), counters,
collector.addResult("global", mergeWorkerStatistics(workers_), counters,
total_execution_duration / workers_.size());
return counters.find("sequencer.failed_terminations") == counters.end();
}
Expand Down
6 changes: 5 additions & 1 deletion source/client/process_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger
*/
void shutdown() override;

bool requestExecutionCancellation() override;

private:
/**
* @brief Creates a cluster for usage by a remote request source.
Expand All @@ -88,7 +90,7 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger
void maybeCreateTracingDriver(const envoy::config::trace::v3::Tracing& configuration);

void configureComponentLogLevels(spdlog::level::level_enum level);
const std::vector<ClientWorkerPtr>& createWorkers(const uint32_t concurrency);
void createWorkers(const uint32_t concurrency);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(optional) Is this a good time to document the method?

std::vector<StatisticPtr> vectorizeStatisticPtrMap(const StatisticPtrMap& statistics) const;
std::vector<StatisticPtr>
mergeWorkerStatistics(const std::vector<ClientWorkerPtr>& workers) const;
Expand Down Expand Up @@ -134,6 +136,8 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger
Envoy::Server::ValidationAdmin admin_;
Envoy::ProtobufMessage::ProdValidationContextImpl validation_context_;
bool shutdown_{true};
Envoy::Thread::MutexBasicLockable workers_lock_;
bool cancelled_{false};
};

} // namespace Client
Expand Down
6 changes: 6 additions & 0 deletions source/client/remote_process_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,11 @@ bool RemoteProcessImpl::run(OutputCollector& collector) {
return false;
}

bool RemoteProcessImpl::requestExecutionCancellation() {
ENVOY_LOG(error, "Remote process cancellation not supported yet");
// TODO(XXX): Send a cancel request to the gRPC service.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we want to replace XXX with a value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #380 (and updated XXX to #380) in the changes I'm about to push.

return false;
}

} // namespace Client
} // namespace Nighthawk
2 changes: 2 additions & 0 deletions source/client/remote_process_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class RemoteProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::
*/
void shutdown() override{};

bool requestExecutionCancellation() override;

private:
const Options& options_;
nighthawk::client::NighthawkService::Stub& stub_;
Expand Down
38 changes: 2 additions & 36 deletions source/client/service_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@
namespace Nighthawk {
namespace Client {

namespace {
std::function<void(int)> signal_handler_delegate;
void signal_handler(int signal) { signal_handler_delegate(signal); }
} // namespace

ServiceMain::ServiceMain(int argc, const char** argv) {
const char* descr = "L7 (HTTP/HTTPS/HTTP2) performance characterization tool.";
TCLAP::CmdLine cmd(descr, ' ', VersionInfo::version()); // NOLINT
Expand Down Expand Up @@ -87,44 +82,15 @@ void ServiceMain::start() {
}
channel_ = grpc::CreateChannel(listener_bound_address_, grpc::InsecureChannelCredentials());
stub_ = std::make_unique<nighthawk::client::NighthawkService::Stub>(channel_);
pipe_fds_.resize(2);
// The shutdown thread will be notified of by our signal handler and take it from there.
RELEASE_ASSERT(pipe(pipe_fds_.data()) == 0, "pipe failed");

shutdown_thread_ = std::thread([this]() {
int tmp;
RELEASE_ASSERT(read(pipe_fds_[0], &tmp, sizeof(int)) >= 0, "read failed");
RELEASE_ASSERT(close(pipe_fds_[0]) == 0, "read side close failed");
RELEASE_ASSERT(close(pipe_fds_[1]) == 0, "write side close failed");
pipe_fds_.clear();
server_->Shutdown();
});
signal_handler_ = std::make_unique<SignalHandler>([this]() { server_->Shutdown(); });
}

void ServiceMain::wait() {
signal_handler_delegate = [this](int) { onSignal(); };
signal(SIGTERM, signal_handler);
signal(SIGINT, signal_handler);
server_->Wait();
shutdown();
}

void ServiceMain::onSignal() { initiateShutdown(); }

void ServiceMain::initiateShutdown() {
if (pipe_fds_.size() == 2) {
const int tmp = 0;
RELEASE_ASSERT(write(pipe_fds_[1], &tmp, sizeof(int)) == sizeof(int), "write failed");
}
}

void ServiceMain::shutdown() {
initiateShutdown();
if (shutdown_thread_.joinable()) {
shutdown_thread_.join();
}
ENVOY_LOG(info, "Nighthawk grpc service exits");
}
void ServiceMain::shutdown() { ENVOY_LOG(info, "Nighthawk grpc service exits"); }

} // namespace Client
} // namespace Nighthawk
20 changes: 3 additions & 17 deletions source/client/service_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

#include "api/client/service.pb.h"

#include "common/signal_handler.h"

#include "client/service_impl.h"

#include "tclap/CmdLine.h"
Expand All @@ -37,17 +39,6 @@ class ServiceMain : public Envoy::Logger::Loggable<Envoy::Logger::Id::main> {
static std::string appendDefaultPortIfNeeded(absl::string_view host_and_maybe_port);

private:
/**
* Notifies the thread responsible for shutting down the server that it is time to do so, if
* needed. Safe to use in signal handling, and non-blocking.
*/
void initiateShutdown();

/**
* Fires on signal reception.
*/
void onSignal();

grpc::ServerBuilder builder_;
std::unique_ptr<grpc::Service> service_;
std::unique_ptr<grpc::Server> server_;
Expand All @@ -56,12 +47,7 @@ class ServiceMain : public Envoy::Logger::Loggable<Envoy::Logger::Id::main> {
int listener_port_{-1};
std::string listener_bound_address_;
std::string listener_output_path_;
// Signal handling needs to be lean so we can't directly initiate shutdown while handling a
// signal. Therefore we write a bite to a this pipe to propagate signal reception. Subsequently,
// the read side will handle the actual shut down of the gRPC service without having to worry
// about signal-safety.
std::vector<int> pipe_fds_;
std::thread shutdown_thread_;
SignalHandlerPtr signal_handler_;
};

} // namespace Client
Expand Down
2 changes: 2 additions & 0 deletions source/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ envoy_cc_library(
"phase_impl.cc",
"rate_limiter_impl.cc",
"sequencer_impl.cc",
"signal_handler.cc",
"statistic_impl.cc",
"termination_predicate_impl.cc",
"uri_impl.cc",
Expand All @@ -81,6 +82,7 @@ envoy_cc_library(
"platform_util_impl.h",
"rate_limiter_impl.h",
"sequencer_impl.h",
"signal_handler.h",
"statistic_impl.h",
"termination_predicate_impl.h",
"uri_impl.h",
Expand Down
Loading