Skip to content

Commit

Permalink
CLI - cancel execution upon signal reception (envoyproxy#367)
Browse files Browse the repository at this point in the history
Teach the CLI to handle SIGTERM/SIGINT, and handle those as a request to
gracefully cancel execution.

Partially resolves envoyproxy#280
  • Loading branch information
oschaaf authored and wjuan-AFK committed Jul 14, 2020
1 parent 73d1d1e commit 0fe6bdf
Show file tree
Hide file tree
Showing 14 changed files with 208 additions and 66 deletions.
5 changes: 5 additions & 0 deletions include/nighthawk/client/client_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ class ClientWorker : virtual public Worker {
* @return const Phase& associated to this worker.
*/
virtual const Phase& phase() const PURE;

/**
* Requests execution cancellation.
*/
virtual void requestExecutionCancellation() PURE;
};

using ClientWorkerPtr = std::unique_ptr<ClientWorker>;
Expand Down
5 changes: 5 additions & 0 deletions include/nighthawk/client/process.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ class Process {
* Shuts down the worker. Mandatory call before destructing.
*/
virtual void shutdown() PURE;

/**
* Will request all workers to cancel execution asap.
*/
virtual bool requestExecutionCancellation() PURE;
};

using ProcessPtr = std::unique_ptr<Process>;
Expand Down
12 changes: 9 additions & 3 deletions source/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "api/client/service.grpc.pb.h"

#include "common/frequency.h"
#include "common/signal_handler.h"
#include "common/uri_impl.h"
#include "common/utility.h"

Expand Down Expand Up @@ -73,16 +74,21 @@ bool Main::run() {
}
OutputFormatterFactoryImpl output_formatter_factory;
OutputCollectorImpl output_collector(time_system, *options_);
const bool res = process->run(output_collector);
bool result;
{
auto signal_handler =
std::make_unique<SignalHandler>([&process]() { process->requestExecutionCancellation(); });
result = process->run(output_collector);
}
auto formatter = output_formatter_factory.create(options_->outputFormat());
std::cout << formatter->formatProto(output_collector.toProto());
process->shutdown();
if (!res) {
if (!result) {
ENVOY_LOG(error, "An error ocurred.");
} else {
ENVOY_LOG(info, "Done.");
}
return res;
return result;
}

} // namespace Client
Expand Down
8 changes: 8 additions & 0 deletions source/client/client_worker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ void ClientWorkerImpl::work() {

void ClientWorkerImpl::shutdownThread() { benchmark_client_->terminate(); }

void ClientWorkerImpl::requestExecutionCancellation() {
// We just bump a counter, which is watched by a static termination predicate.
// A useful side effect is that this counter will propagate to the output, which leaves
// a note about that execution was subject to cancellation.
dispatcher_->post(
[this]() { worker_number_scope_->counterFromString("graceful_stop_requested").inc(); });
}

StatisticPtrMap ClientWorkerImpl::statistics() const {
StatisticPtrMap statistics;
StatisticPtrMap s1 = benchmark_client_->statistics();
Expand Down
2 changes: 2 additions & 0 deletions source/client/client_worker_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class ClientWorkerImpl : public WorkerImpl, virtual public ClientWorker {

void shutdownThread() override;

void requestExecutionCancellation() override;

protected:
void work() override;

Expand Down
18 changes: 11 additions & 7 deletions source/client/factories_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,18 @@ TerminationPredicateFactoryImpl::TerminationPredicateFactoryImpl(const Options&
TerminationPredicatePtr
TerminationPredicateFactoryImpl::create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope,
const Envoy::MonotonicTime scheduled_starting_time) const {
TerminationPredicatePtr root_predicate;
if (options_.noDuration()) {
root_predicate = std::make_unique<NullTerminationPredicateImpl>();
} else {
root_predicate = std::make_unique<DurationTerminationPredicateImpl>(
time_source, options_.duration(), scheduled_starting_time);
}
// We'll always link a predicate which checks for requests to cancel.
TerminationPredicatePtr root_predicate =
std::make_unique<StatsCounterAbsoluteThresholdTerminationPredicateImpl>(
scope.counterFromString("graceful_stop_requested"), 0,
TerminationPredicate::Status::TERMINATE);

TerminationPredicate* current_predicate = root_predicate.get();
if (!options_.noDuration()) {
current_predicate = &current_predicate->link(std::make_unique<DurationTerminationPredicateImpl>(
time_source, options_.duration(), scheduled_starting_time));
}

current_predicate = linkConfiguredPredicates(*current_predicate, options_.failurePredicates(),
TerminationPredicate::Status::FAIL, scope);
linkConfiguredPredicates(*current_predicate, options_.terminationPredicates(),
Expand Down
103 changes: 61 additions & 42 deletions source/client/process_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,33 @@ void ProcessImpl::shutdown() {
// Before we shut down the worker threads, stop threading.
tls_.shutdownGlobalThreading();
store_root_.shutdownThreading();
// Before shutting down the cluster manager, stop the workers.
for (auto& worker : workers_) {
worker->shutdown();

{
auto guard = std::make_unique<Envoy::Thread::LockGuard>(workers_lock_);
// Before shutting down the cluster manager, stop the workers.
for (auto& worker : workers_) {
worker->shutdown();
}
workers_.clear();
}
workers_.clear();
if (cluster_manager_ != nullptr) {
cluster_manager_->shutdown();
}
tls_.shutdownThread();
shutdown_ = true;
}

const std::vector<ClientWorkerPtr>& ProcessImpl::createWorkers(const uint32_t concurrency) {
bool ProcessImpl::requestExecutionCancellation() {
ENVOY_LOG(debug, "Requesting workers to cancel execution");
auto guard = std::make_unique<Envoy::Thread::LockGuard>(workers_lock_);
for (auto& worker : workers_) {
worker->requestExecutionCancellation();
}
cancelled_ = true;
return true;
}

void ProcessImpl::createWorkers(const uint32_t concurrency) {
// TODO(oschaaf): Expose kMinimalDelay in configuration.
const std::chrono::milliseconds kMinimalWorkerDelay = 500ms + (concurrency * 50ms);
ASSERT(workers_.empty());
Expand Down Expand Up @@ -168,7 +182,6 @@ const std::vector<ClientWorkerPtr>& ProcessImpl::createWorkers(const uint32_t co
: ClientWorkerImpl::HardCodedWarmupStyle::OFF));
worker_number++;
}
return workers_;
}

void ProcessImpl::configureComponentLogLevels(spdlog::level::level_enum level) {
Expand Down Expand Up @@ -381,44 +394,50 @@ void ProcessImpl::addRequestSourceCluster(

bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriPtr>& uris,
const UriPtr& request_source_uri, const UriPtr& tracing_uri) {
int number_of_workers = determineConcurrency();
shutdown_ = false;
const std::vector<ClientWorkerPtr>& workers = createWorkers(number_of_workers);
tls_.registerThread(*dispatcher_, true);
store_root_.initializeThreading(*dispatcher_, tls_);
runtime_singleton_ = std::make_unique<Envoy::Runtime::ScopedLoaderSingleton>(
Envoy::Runtime::LoaderPtr{new Envoy::Runtime::LoaderImpl(
*dispatcher_, tls_, {}, *local_info_, store_root_, generator_,
Envoy::ProtobufMessage::getStrictValidationVisitor(), *api_)});
ssl_context_manager_ =
std::make_unique<Extensions::TransportSockets::Tls::ContextManagerImpl>(time_system_);
cluster_manager_factory_ = std::make_unique<ClusterManagerFactory>(
admin_, Envoy::Runtime::LoaderSingleton::get(), store_root_, tls_, generator_,
dispatcher_->createDnsResolver({}, false), *ssl_context_manager_, *dispatcher_, *local_info_,
secret_manager_, validation_context_, *api_, http_context_, grpc_context_,
access_log_manager_, *singleton_manager_);
cluster_manager_factory_->setConnectionReuseStrategy(
options_.h1ConnectionReuseStrategy() == nighthawk::client::H1ConnectionReuseStrategy::LRU
? Http1PoolImpl::ConnectionReuseStrategy::LRU
: Http1PoolImpl::ConnectionReuseStrategy::MRU);
cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections());
envoy::config::bootstrap::v3::Bootstrap bootstrap;
createBootstrapConfiguration(bootstrap, uris, request_source_uri, number_of_workers);
if (tracing_uri != nullptr) {
setupTracingImplementation(bootstrap, *tracing_uri);
addTracingCluster(bootstrap, *tracing_uri);
}
ENVOY_LOG(debug, "Computed configuration: {}", bootstrap.DebugString());
cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap);
maybeCreateTracingDriver(bootstrap.tracing());
cluster_manager_->setInitializedCb([this]() -> void { init_manager_.initialize(init_watcher_); });
{
auto guard = std::make_unique<Envoy::Thread::LockGuard>(workers_lock_);
if (cancelled_) {
return true;
}
int number_of_workers = determineConcurrency();
shutdown_ = false;
createWorkers(number_of_workers);
tls_.registerThread(*dispatcher_, true);
store_root_.initializeThreading(*dispatcher_, tls_);
runtime_singleton_ = std::make_unique<Envoy::Runtime::ScopedLoaderSingleton>(
Envoy::Runtime::LoaderPtr{new Envoy::Runtime::LoaderImpl(
*dispatcher_, tls_, {}, *local_info_, store_root_, generator_,
Envoy::ProtobufMessage::getStrictValidationVisitor(), *api_)});
ssl_context_manager_ =
std::make_unique<Extensions::TransportSockets::Tls::ContextManagerImpl>(time_system_);
cluster_manager_factory_ = std::make_unique<ClusterManagerFactory>(
admin_, Envoy::Runtime::LoaderSingleton::get(), store_root_, tls_, generator_,
dispatcher_->createDnsResolver({}, false), *ssl_context_manager_, *dispatcher_,
*local_info_, secret_manager_, validation_context_, *api_, http_context_, grpc_context_,
access_log_manager_, *singleton_manager_);
cluster_manager_factory_->setConnectionReuseStrategy(
options_.h1ConnectionReuseStrategy() == nighthawk::client::H1ConnectionReuseStrategy::LRU
? Http1PoolImpl::ConnectionReuseStrategy::LRU
: Http1PoolImpl::ConnectionReuseStrategy::MRU);
cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections());
envoy::config::bootstrap::v3::Bootstrap bootstrap;
createBootstrapConfiguration(bootstrap, uris, request_source_uri, number_of_workers);
if (tracing_uri != nullptr) {
setupTracingImplementation(bootstrap, *tracing_uri);
addTracingCluster(bootstrap, *tracing_uri);
}
ENVOY_LOG(debug, "Computed configuration: {}", bootstrap.DebugString());
cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap);
maybeCreateTracingDriver(bootstrap.tracing());
cluster_manager_->setInitializedCb(
[this]() -> void { init_manager_.initialize(init_watcher_); });

Runtime::LoaderSingleton::get().initialize(*cluster_manager_);
Runtime::LoaderSingleton::get().initialize(*cluster_manager_);

for (auto& w : workers_) {
w->start();
for (auto& w : workers_) {
w->start();
}
}

for (auto& w : workers_) {
w->waitForCompletion();
}
Expand Down Expand Up @@ -447,7 +466,7 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriP
const auto& counters = Utility().mapCountersFromStore(
store_root_, [](absl::string_view, uint64_t value) { return value > 0; });
StatisticFactoryImpl statistic_factory(options_);
collector.addResult("global", mergeWorkerStatistics(workers), counters,
collector.addResult("global", mergeWorkerStatistics(workers_), counters,
total_execution_duration / workers_.size());
return counters.find("sequencer.failed_terminations") == counters.end();
}
Expand Down
12 changes: 11 additions & 1 deletion source/client/process_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger
*/
void shutdown() override;

bool requestExecutionCancellation() override;

private:
/**
* @brief Creates a cluster for usage by a remote request source.
Expand All @@ -99,7 +101,13 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger
void maybeCreateTracingDriver(const envoy::config::trace::v3::Tracing& configuration);

void configureComponentLogLevels(spdlog::level::level_enum level);
const std::vector<ClientWorkerPtr>& createWorkers(const uint32_t concurrency);
/**
* Prepare the ProcessImpl instance by creating and configuring the workers it needs for execution
* of the load test.
*
* @param concurrency the amount of workers that should be created.
*/
void createWorkers(const uint32_t concurrency);
std::vector<StatisticPtr> vectorizeStatisticPtrMap(const StatisticPtrMap& statistics) const;
std::vector<StatisticPtr>
mergeWorkerStatistics(const std::vector<ClientWorkerPtr>& workers) const;
Expand Down Expand Up @@ -145,6 +153,8 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger
Envoy::Server::ValidationAdmin admin_;
Envoy::ProtobufMessage::ProdValidationContextImpl validation_context_;
bool shutdown_{true};
Envoy::Thread::MutexBasicLockable workers_lock_;
bool cancelled_{false};
};

} // namespace Client
Expand Down
6 changes: 6 additions & 0 deletions source/client/remote_process_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,11 @@ bool RemoteProcessImpl::run(OutputCollector& collector) {
return false;
}

bool RemoteProcessImpl::requestExecutionCancellation() {
ENVOY_LOG(error, "Remote process cancellation not supported yet");
// TODO(#380): Send a cancel request to the gRPC service.
return false;
}

} // namespace Client
} // namespace Nighthawk
2 changes: 2 additions & 0 deletions source/client/remote_process_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class RemoteProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::
*/
void shutdown() override{};

bool requestExecutionCancellation() override;

private:
const Options& options_;
nighthawk::client::NighthawkService::Stub& stub_;
Expand Down
4 changes: 0 additions & 4 deletions source/common/termination_predicate_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ TerminationPredicate::Status TerminationPredicateBaseImpl::evaluateChain() {
return status;
}

TerminationPredicate::Status NullTerminationPredicateImpl::evaluate() {
return TerminationPredicate::Status::PROCEED;
}

TerminationPredicate::Status DurationTerminationPredicateImpl::evaluate() {
return time_source_.monotonicTime() - start_ > duration_ ? TerminationPredicate::Status::TERMINATE
: TerminationPredicate::Status::PROCEED;
Expand Down
8 changes: 0 additions & 8 deletions source/common/termination_predicate_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,6 @@ class TerminationPredicateBaseImpl : public TerminationPredicate {
TerminationPredicatePtr linked_child_;
};

/**
* Predicate which always returns TerminationPredicate::Status::PROCEED.
*/
class NullTerminationPredicateImpl : public TerminationPredicateBaseImpl {
public:
TerminationPredicate::Status evaluate() override;
};

/**
* Predicate which indicates termination iff the passed in duration has expired.
* time tracking starts at the first call to evaluate().
Expand Down
33 changes: 33 additions & 0 deletions test/integration/test_integration_basics.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
#!/usr/bin/env python3

import json
import logging
import os
import subprocess
import sys
import pytest
import time
from threading import Thread

from test.integration.common import IpVersion
from test.integration.integration_test_fixtures import (http_test_server_fixture,
Expand Down Expand Up @@ -655,6 +659,35 @@ def test_http_request_release_timing(http_test_server_fixture, qps_parameterizat
assertCounterEqual(counters, "benchmark.http_2xx", (total_requests))


def _send_sigterm(process):
# Sleep for a while, under tsan the client needs a lot of time
# to start up. 10 seconds has been determined to work through
# emperical observation.
time.sleep(10)
process.terminate()


def test_cancellation(http_test_server_fixture):
"""
Make sure that we can use signals to cancel execution.
"""
args = [
http_test_server_fixture.nighthawk_client_path, "--concurrency", "2",
http_test_server_fixture.getTestServerRootUri(), "--duration", "1000", "--output-format",
"json"
]
client_process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
Thread(target=(lambda: _send_sigterm(client_process))).start()
stdout, stderr = client_process.communicate()
client_process.wait()
output = stdout.decode('utf-8')
assertEqual(client_process.returncode, 0)
parsed_json = json.loads(output)
counters = http_test_server_fixture.getNighthawkCounterMapFromJson(parsed_json)
assertCounterEqual(counters, "graceful_stop_requested", 2)
assertCounterGreaterEqual(counters, "benchmark.http_2xx", 1)


def _run_client_with_args(args):
return run_binary_with_args("nighthawk_client", args)

Expand Down
Loading

0 comments on commit 0fe6bdf

Please sign in to comment.