Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLI - cancel execution upon signal reception #367

Merged
merged 21 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b973b75
Extract SignalHandler
oschaaf Jan 15, 2020
fb5c434
Remove accidentally left in comment
oschaaf Jan 15, 2020
1bd1d31
Fix format
oschaaf Jan 15, 2020
fcba594
Merge remote-tracking branch 'upstream/master' into extract-signal-ha…
oschaaf Jan 15, 2020
ce5ead6
Amend according to clang-tidy's complaints
oschaaf Jan 15, 2020
3f99119
Pass const ref
oschaaf Jan 15, 2020
111ba5d
Merge remote-tracking branch 'upstream/master' into extract-signal-ha…
oschaaf Jan 16, 2020
89dfdf6
Merge remote-tracking branch 'upstream/master' into extract-signal-ha…
oschaaf Jun 10, 2020
c457af6
Merge remote-tracking branch 'upstream/master' into extract-signal-ha…
oschaaf Jun 15, 2020
04eec56
CLI: Handle signals, allow cancellation of executions
oschaaf Jun 17, 2020
a11050f
Add lock guarding of the cancellation process + tests
oschaaf Jun 17, 2020
781df87
Merge remote-tracking branch 'upstream/master' into execution-cancell…
oschaaf Jun 18, 2020
2c04d10
s/cancel_requests/graceful_stop_requested/
oschaaf Jun 18, 2020
55f694f
Eliminate the NullTerminationPredicate: dead code
oschaaf Jun 18, 2020
d1a696b
Remove debug print line
oschaaf Jun 18, 2020
3c35f86
Merge remote-tracking branch 'upstream/master' into execution-cancell…
oschaaf Jun 19, 2020
9e0d9b2
Merge remote-tracking branch 'upstream/master' into execution-cancell…
oschaaf Jun 19, 2020
57d71f0
Merge remote-tracking branch 'upstream/master' into execution-cancell…
oschaaf Jun 22, 2020
1fddbec
Partially address review feedback
oschaaf Jun 22, 2020
20e6108
Review feedback pt II
oschaaf Jun 22, 2020
3198623
Review feedback
oschaaf Jun 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/nighthawk/client/client_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ class ClientWorker : virtual public Worker {
* @return const Phase& associated to this worker.
*/
virtual const Phase& phase() const PURE;

/**
* Requests execution cancellation.
*/
virtual void requestExecutionCancellation() PURE;
};

using ClientWorkerPtr = std::unique_ptr<ClientWorker>;
Expand Down
5 changes: 5 additions & 0 deletions include/nighthawk/client/process.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ class Process {
* Shuts down the worker. Mandatory call before destructing.
*/
virtual void shutdown() PURE;

/**
* Will request all workers to cancel execution asap.
*/
virtual bool requestExecutionCancellation() PURE;
};

using ProcessPtr = std::unique_ptr<Process>;
Expand Down
12 changes: 9 additions & 3 deletions source/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "api/client/service.grpc.pb.h"

#include "common/frequency.h"
#include "common/signal_handler.h"
#include "common/uri_impl.h"
#include "common/utility.h"

Expand Down Expand Up @@ -73,16 +74,21 @@ bool Main::run() {
}
OutputFormatterFactoryImpl output_formatter_factory;
OutputCollectorImpl output_collector(time_system, *options_);
const bool res = process->run(output_collector);
bool result;
{
auto signal_handler =
std::make_unique<SignalHandler>([&process]() { process->requestExecutionCancellation(); });
result = process->run(output_collector);
}
auto formatter = output_formatter_factory.create(options_->outputFormat());
std::cout << formatter->formatProto(output_collector.toProto());
process->shutdown();
if (!res) {
if (!result) {
ENVOY_LOG(error, "An error ocurred.");
} else {
ENVOY_LOG(info, "Done.");
}
return res;
return result;
}

} // namespace Client
Expand Down
8 changes: 8 additions & 0 deletions source/client/client_worker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ void ClientWorkerImpl::work() {

void ClientWorkerImpl::shutdownThread() { benchmark_client_->terminate(); }

void ClientWorkerImpl::requestExecutionCancellation() {
// We just bump a counter, which is watched by a static termination predicate.
// A useful side effect is that this counter will propagate to the output, which leaves
// a note about that execution was subject to cancellation.
dispatcher_->post(
[this]() { worker_number_scope_->counterFromString("graceful_stop_requested").inc(); });
}

StatisticPtrMap ClientWorkerImpl::statistics() const {
StatisticPtrMap statistics;
StatisticPtrMap s1 = benchmark_client_->statistics();
Expand Down
2 changes: 2 additions & 0 deletions source/client/client_worker_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class ClientWorkerImpl : public WorkerImpl, virtual public ClientWorker {

void shutdownThread() override;

void requestExecutionCancellation() override;

protected:
void work() override;

Expand Down
18 changes: 11 additions & 7 deletions source/client/factories_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,18 @@ TerminationPredicateFactoryImpl::TerminationPredicateFactoryImpl(const Options&
TerminationPredicatePtr
TerminationPredicateFactoryImpl::create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope,
const Envoy::MonotonicTime scheduled_starting_time) const {
TerminationPredicatePtr root_predicate;
if (options_.noDuration()) {
root_predicate = std::make_unique<NullTerminationPredicateImpl>();
} else {
root_predicate = std::make_unique<DurationTerminationPredicateImpl>(
time_source, options_.duration(), scheduled_starting_time);
}
// We'll always link a predicate which checks for requests to cancel.
TerminationPredicatePtr root_predicate =
std::make_unique<StatsCounterAbsoluteThresholdTerminationPredicateImpl>(
scope.counterFromString("graceful_stop_requested"), 0,
TerminationPredicate::Status::TERMINATE);

TerminationPredicate* current_predicate = root_predicate.get();
if (!options_.noDuration()) {
current_predicate = &current_predicate->link(std::make_unique<DurationTerminationPredicateImpl>(
time_source, options_.duration(), scheduled_starting_time));
}

current_predicate = linkConfiguredPredicates(*current_predicate, options_.failurePredicates(),
TerminationPredicate::Status::FAIL, scope);
linkConfiguredPredicates(*current_predicate, options_.terminationPredicates(),
Expand Down
103 changes: 61 additions & 42 deletions source/client/process_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,33 @@ void ProcessImpl::shutdown() {
// Before we shut down the worker threads, stop threading.
tls_.shutdownGlobalThreading();
store_root_.shutdownThreading();
// Before shutting down the cluster manager, stop the workers.
for (auto& worker : workers_) {
worker->shutdown();

{
auto guard = std::make_unique<Envoy::Thread::LockGuard>(workers_lock_);
// Before shutting down the cluster manager, stop the workers.
for (auto& worker : workers_) {
worker->shutdown();
}
workers_.clear();
}
workers_.clear();
if (cluster_manager_ != nullptr) {
cluster_manager_->shutdown();
}
tls_.shutdownThread();
shutdown_ = true;
}

const std::vector<ClientWorkerPtr>& ProcessImpl::createWorkers(const uint32_t concurrency) {
bool ProcessImpl::requestExecutionCancellation() {
ENVOY_LOG(debug, "Requesting workers to cancel execution");
auto guard = std::make_unique<Envoy::Thread::LockGuard>(workers_lock_);
for (auto& worker : workers_) {
worker->requestExecutionCancellation();
}
cancelled_ = true;
return true;
}

void ProcessImpl::createWorkers(const uint32_t concurrency) {
// TODO(oschaaf): Expose kMinimalDelay in configuration.
const std::chrono::milliseconds kMinimalWorkerDelay = 500ms + (concurrency * 50ms);
ASSERT(workers_.empty());
Expand Down Expand Up @@ -168,7 +182,6 @@ const std::vector<ClientWorkerPtr>& ProcessImpl::createWorkers(const uint32_t co
: ClientWorkerImpl::HardCodedWarmupStyle::OFF));
worker_number++;
}
return workers_;
}

void ProcessImpl::configureComponentLogLevels(spdlog::level::level_enum level) {
Expand Down Expand Up @@ -381,44 +394,50 @@ void ProcessImpl::addRequestSourceCluster(

bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriPtr>& uris,
const UriPtr& request_source_uri, const UriPtr& tracing_uri) {
int number_of_workers = determineConcurrency();
shutdown_ = false;
const std::vector<ClientWorkerPtr>& workers = createWorkers(number_of_workers);
tls_.registerThread(*dispatcher_, true);
store_root_.initializeThreading(*dispatcher_, tls_);
runtime_singleton_ = std::make_unique<Envoy::Runtime::ScopedLoaderSingleton>(
Envoy::Runtime::LoaderPtr{new Envoy::Runtime::LoaderImpl(
*dispatcher_, tls_, {}, *local_info_, store_root_, generator_,
Envoy::ProtobufMessage::getStrictValidationVisitor(), *api_)});
ssl_context_manager_ =
std::make_unique<Extensions::TransportSockets::Tls::ContextManagerImpl>(time_system_);
cluster_manager_factory_ = std::make_unique<ClusterManagerFactory>(
admin_, Envoy::Runtime::LoaderSingleton::get(), store_root_, tls_, generator_,
dispatcher_->createDnsResolver({}, false), *ssl_context_manager_, *dispatcher_, *local_info_,
secret_manager_, validation_context_, *api_, http_context_, grpc_context_,
access_log_manager_, *singleton_manager_);
cluster_manager_factory_->setConnectionReuseStrategy(
options_.h1ConnectionReuseStrategy() == nighthawk::client::H1ConnectionReuseStrategy::LRU
? Http1PoolImpl::ConnectionReuseStrategy::LRU
: Http1PoolImpl::ConnectionReuseStrategy::MRU);
cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections());
envoy::config::bootstrap::v3::Bootstrap bootstrap;
createBootstrapConfiguration(bootstrap, uris, request_source_uri, number_of_workers);
if (tracing_uri != nullptr) {
setupTracingImplementation(bootstrap, *tracing_uri);
addTracingCluster(bootstrap, *tracing_uri);
}
ENVOY_LOG(debug, "Computed configuration: {}", bootstrap.DebugString());
cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap);
maybeCreateTracingDriver(bootstrap.tracing());
cluster_manager_->setInitializedCb([this]() -> void { init_manager_.initialize(init_watcher_); });
{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: The "hide whitespace" toggle in Github makes this a lot more fun to review.
(The scope introduced for the lock guard introduces a lot of whitespace noise).

auto guard = std::make_unique<Envoy::Thread::LockGuard>(workers_lock_);
if (cancelled_) {
return true;
}
int number_of_workers = determineConcurrency();
shutdown_ = false;
createWorkers(number_of_workers);
tls_.registerThread(*dispatcher_, true);
store_root_.initializeThreading(*dispatcher_, tls_);
runtime_singleton_ = std::make_unique<Envoy::Runtime::ScopedLoaderSingleton>(
Envoy::Runtime::LoaderPtr{new Envoy::Runtime::LoaderImpl(
*dispatcher_, tls_, {}, *local_info_, store_root_, generator_,
Envoy::ProtobufMessage::getStrictValidationVisitor(), *api_)});
ssl_context_manager_ =
std::make_unique<Extensions::TransportSockets::Tls::ContextManagerImpl>(time_system_);
cluster_manager_factory_ = std::make_unique<ClusterManagerFactory>(
admin_, Envoy::Runtime::LoaderSingleton::get(), store_root_, tls_, generator_,
dispatcher_->createDnsResolver({}, false), *ssl_context_manager_, *dispatcher_,
*local_info_, secret_manager_, validation_context_, *api_, http_context_, grpc_context_,
access_log_manager_, *singleton_manager_);
cluster_manager_factory_->setConnectionReuseStrategy(
options_.h1ConnectionReuseStrategy() == nighthawk::client::H1ConnectionReuseStrategy::LRU
? Http1PoolImpl::ConnectionReuseStrategy::LRU
: Http1PoolImpl::ConnectionReuseStrategy::MRU);
cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections());
envoy::config::bootstrap::v3::Bootstrap bootstrap;
createBootstrapConfiguration(bootstrap, uris, request_source_uri, number_of_workers);
if (tracing_uri != nullptr) {
setupTracingImplementation(bootstrap, *tracing_uri);
addTracingCluster(bootstrap, *tracing_uri);
}
ENVOY_LOG(debug, "Computed configuration: {}", bootstrap.DebugString());
cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap);
maybeCreateTracingDriver(bootstrap.tracing());
cluster_manager_->setInitializedCb(
[this]() -> void { init_manager_.initialize(init_watcher_); });

Runtime::LoaderSingleton::get().initialize(*cluster_manager_);
Runtime::LoaderSingleton::get().initialize(*cluster_manager_);

for (auto& w : workers_) {
w->start();
for (auto& w : workers_) {
w->start();
}
}

for (auto& w : workers_) {
w->waitForCompletion();
}
Expand Down Expand Up @@ -447,7 +466,7 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriP
const auto& counters = Utility().mapCountersFromStore(
store_root_, [](absl::string_view, uint64_t value) { return value > 0; });
StatisticFactoryImpl statistic_factory(options_);
collector.addResult("global", mergeWorkerStatistics(workers), counters,
collector.addResult("global", mergeWorkerStatistics(workers_), counters,
total_execution_duration / workers_.size());
return counters.find("sequencer.failed_terminations") == counters.end();
}
Expand Down
12 changes: 11 additions & 1 deletion source/client/process_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger
*/
void shutdown() override;

bool requestExecutionCancellation() override;

private:
/**
* @brief Creates a cluster for usage by a remote request source.
Expand All @@ -99,7 +101,13 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger
void maybeCreateTracingDriver(const envoy::config::trace::v3::Tracing& configuration);

void configureComponentLogLevels(spdlog::level::level_enum level);
const std::vector<ClientWorkerPtr>& createWorkers(const uint32_t concurrency);
/**
* Prepare the ProcessImpl instance by creating and configuring the workers it needs for execution
* of the load test.
*
* @param concurrency the amount of workers that should be created.
*/
void createWorkers(const uint32_t concurrency);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(optional) Is this a good time to document the method?

std::vector<StatisticPtr> vectorizeStatisticPtrMap(const StatisticPtrMap& statistics) const;
std::vector<StatisticPtr>
mergeWorkerStatistics(const std::vector<ClientWorkerPtr>& workers) const;
Expand Down Expand Up @@ -145,6 +153,8 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger
Envoy::Server::ValidationAdmin admin_;
Envoy::ProtobufMessage::ProdValidationContextImpl validation_context_;
bool shutdown_{true};
Envoy::Thread::MutexBasicLockable workers_lock_;
bool cancelled_{false};
};

} // namespace Client
Expand Down
6 changes: 6 additions & 0 deletions source/client/remote_process_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,11 @@ bool RemoteProcessImpl::run(OutputCollector& collector) {
return false;
}

bool RemoteProcessImpl::requestExecutionCancellation() {
ENVOY_LOG(error, "Remote process cancellation not supported yet");
// TODO(#380): Send a cancel request to the gRPC service.
return false;
}

} // namespace Client
} // namespace Nighthawk
2 changes: 2 additions & 0 deletions source/client/remote_process_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class RemoteProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::
*/
void shutdown() override{};

bool requestExecutionCancellation() override;

private:
const Options& options_;
nighthawk::client::NighthawkService::Stub& stub_;
Expand Down
4 changes: 0 additions & 4 deletions source/common/termination_predicate_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ TerminationPredicate::Status TerminationPredicateBaseImpl::evaluateChain() {
return status;
}

TerminationPredicate::Status NullTerminationPredicateImpl::evaluate() {
return TerminationPredicate::Status::PROCEED;
}

TerminationPredicate::Status DurationTerminationPredicateImpl::evaluate() {
return time_source_.monotonicTime() - start_ > duration_ ? TerminationPredicate::Status::TERMINATE
: TerminationPredicate::Status::PROCEED;
Expand Down
8 changes: 0 additions & 8 deletions source/common/termination_predicate_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,6 @@ class TerminationPredicateBaseImpl : public TerminationPredicate {
TerminationPredicatePtr linked_child_;
};

/**
* Predicate which always returns TerminationPredicate::Status::PROCEED.
*/
class NullTerminationPredicateImpl : public TerminationPredicateBaseImpl {
public:
TerminationPredicate::Status evaluate() override;
};

/**
* Predicate which indicates termination iff the passed in duration has expired.
* time tracking starts at the first call to evaluate().
Expand Down
33 changes: 33 additions & 0 deletions test/integration/test_integration_basics.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
#!/usr/bin/env python3

import json
import logging
import os
import subprocess
import sys
import pytest
import time
from threading import Thread

from test.integration.common import IpVersion
from test.integration.integration_test_fixtures import (
Expand Down Expand Up @@ -654,6 +658,35 @@ def test_http_request_release_timing(http_test_server_fixture, qps_parameterizat
assertCounterEqual(counters, "benchmark.http_2xx", (total_requests))


def _send_sigterm(process):
# Sleep for a while, under tsan the client needs a lot of time
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a Python style docstring, since the function is public.

# to start up. 10 seconds has been determined to work through
# emperical observation.
time.sleep(10)
process.terminate()


def test_cancellation(http_test_server_fixture):
"""
Make sure that we can use signals to cancel execution.
"""
args = [
http_test_server_fixture.nighthawk_client_path, "--concurrency", "2",
http_test_server_fixture.getTestServerRootUri(), "--duration", "1000", "--output-format",
"json"
]
client_process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
Thread(target=(lambda: _send_sigterm(client_process))).start()
stdout, stderr = client_process.communicate()
client_process.wait()
output = stdout.decode('utf-8')
assertEqual(client_process.returncode, 0)
parsed_json = json.loads(output)
counters = http_test_server_fixture.getNighthawkCounterMapFromJson(parsed_json)
assertCounterEqual(counters, "graceful_stop_requested", 2)
assertCounterGreaterEqual(counters, "benchmark.http_2xx", 1)


def _run_client_with_args(args):
return run_binary_with_args("nighthawk_client", args)

Expand Down
Loading