-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CLI - cancel execution upon signal reception #367
Changes from 19 commits
b973b75
fb5c434
1bd1d31
fcba594
ce5ead6
3f99119
111ba5d
89dfdf6
c457af6
04eec56
a11050f
781df87
2c04d10
55f694f
d1a696b
3c35f86
9e0d9b2
57d71f0
1fddbec
20e6108
3198623
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -126,19 +126,33 @@ void ProcessImpl::shutdown() { | |
// Before we shut down the worker threads, stop threading. | ||
tls_.shutdownGlobalThreading(); | ||
store_root_.shutdownThreading(); | ||
// Before shutting down the cluster manager, stop the workers. | ||
for (auto& worker : workers_) { | ||
worker->shutdown(); | ||
|
||
{ | ||
auto guard = std::make_unique<Envoy::Thread::LockGuard>(workers_lock_); | ||
// Before shutting down the cluster manager, stop the workers. | ||
for (auto& worker : workers_) { | ||
worker->shutdown(); | ||
} | ||
workers_.clear(); | ||
} | ||
workers_.clear(); | ||
if (cluster_manager_ != nullptr) { | ||
cluster_manager_->shutdown(); | ||
} | ||
tls_.shutdownThread(); | ||
shutdown_ = true; | ||
} | ||
|
||
const std::vector<ClientWorkerPtr>& ProcessImpl::createWorkers(const uint32_t concurrency) { | ||
bool ProcessImpl::requestExecutionCancellation() { | ||
ENVOY_LOG(debug, "Requesting workers to cancel execution"); | ||
auto guard = std::make_unique<Envoy::Thread::LockGuard>(workers_lock_); | ||
for (auto& worker : workers_) { | ||
worker->requestExecutionCancellation(); | ||
} | ||
cancelled_ = true; | ||
return true; | ||
} | ||
|
||
void ProcessImpl::createWorkers(const uint32_t concurrency) { | ||
// TODO(oschaaf): Expose kMinimalDelay in configuration. | ||
const std::chrono::milliseconds kMinimalWorkerDelay = 500ms + (concurrency * 50ms); | ||
ASSERT(workers_.empty()); | ||
|
@@ -168,7 +182,6 @@ const std::vector<ClientWorkerPtr>& ProcessImpl::createWorkers(const uint32_t co | |
: ClientWorkerImpl::HardCodedWarmupStyle::OFF)); | ||
worker_number++; | ||
} | ||
return workers_; | ||
} | ||
|
||
void ProcessImpl::configureComponentLogLevels(spdlog::level::level_enum level) { | ||
|
@@ -381,44 +394,50 @@ void ProcessImpl::addRequestSourceCluster( | |
|
||
bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriPtr>& uris, | ||
const UriPtr& request_source_uri, const UriPtr& tracing_uri) { | ||
int number_of_workers = determineConcurrency(); | ||
shutdown_ = false; | ||
const std::vector<ClientWorkerPtr>& workers = createWorkers(number_of_workers); | ||
tls_.registerThread(*dispatcher_, true); | ||
store_root_.initializeThreading(*dispatcher_, tls_); | ||
runtime_singleton_ = std::make_unique<Envoy::Runtime::ScopedLoaderSingleton>( | ||
Envoy::Runtime::LoaderPtr{new Envoy::Runtime::LoaderImpl( | ||
*dispatcher_, tls_, {}, *local_info_, store_root_, generator_, | ||
Envoy::ProtobufMessage::getStrictValidationVisitor(), *api_)}); | ||
ssl_context_manager_ = | ||
std::make_unique<Extensions::TransportSockets::Tls::ContextManagerImpl>(time_system_); | ||
cluster_manager_factory_ = std::make_unique<ClusterManagerFactory>( | ||
admin_, Envoy::Runtime::LoaderSingleton::get(), store_root_, tls_, generator_, | ||
dispatcher_->createDnsResolver({}, false), *ssl_context_manager_, *dispatcher_, *local_info_, | ||
secret_manager_, validation_context_, *api_, http_context_, grpc_context_, | ||
access_log_manager_, *singleton_manager_); | ||
cluster_manager_factory_->setConnectionReuseStrategy( | ||
options_.h1ConnectionReuseStrategy() == nighthawk::client::H1ConnectionReuseStrategy::LRU | ||
? Http1PoolImpl::ConnectionReuseStrategy::LRU | ||
: Http1PoolImpl::ConnectionReuseStrategy::MRU); | ||
cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections()); | ||
envoy::config::bootstrap::v3::Bootstrap bootstrap; | ||
createBootstrapConfiguration(bootstrap, uris, request_source_uri, number_of_workers); | ||
if (tracing_uri != nullptr) { | ||
setupTracingImplementation(bootstrap, *tracing_uri); | ||
addTracingCluster(bootstrap, *tracing_uri); | ||
} | ||
ENVOY_LOG(debug, "Computed configuration: {}", bootstrap.DebugString()); | ||
cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap); | ||
maybeCreateTracingDriver(bootstrap.tracing()); | ||
cluster_manager_->setInitializedCb([this]() -> void { init_manager_.initialize(init_watcher_); }); | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to reviewers: The "hide whitespace" toggle in Github makes this a lot more fun to review. |
||
auto guard = std::make_unique<Envoy::Thread::LockGuard>(workers_lock_); | ||
if (cancelled_) { | ||
return true; | ||
} | ||
int number_of_workers = determineConcurrency(); | ||
shutdown_ = false; | ||
createWorkers(number_of_workers); | ||
tls_.registerThread(*dispatcher_, true); | ||
store_root_.initializeThreading(*dispatcher_, tls_); | ||
runtime_singleton_ = std::make_unique<Envoy::Runtime::ScopedLoaderSingleton>( | ||
Envoy::Runtime::LoaderPtr{new Envoy::Runtime::LoaderImpl( | ||
*dispatcher_, tls_, {}, *local_info_, store_root_, generator_, | ||
Envoy::ProtobufMessage::getStrictValidationVisitor(), *api_)}); | ||
ssl_context_manager_ = | ||
std::make_unique<Extensions::TransportSockets::Tls::ContextManagerImpl>(time_system_); | ||
cluster_manager_factory_ = std::make_unique<ClusterManagerFactory>( | ||
admin_, Envoy::Runtime::LoaderSingleton::get(), store_root_, tls_, generator_, | ||
dispatcher_->createDnsResolver({}, false), *ssl_context_manager_, *dispatcher_, | ||
*local_info_, secret_manager_, validation_context_, *api_, http_context_, grpc_context_, | ||
access_log_manager_, *singleton_manager_); | ||
cluster_manager_factory_->setConnectionReuseStrategy( | ||
options_.h1ConnectionReuseStrategy() == nighthawk::client::H1ConnectionReuseStrategy::LRU | ||
? Http1PoolImpl::ConnectionReuseStrategy::LRU | ||
: Http1PoolImpl::ConnectionReuseStrategy::MRU); | ||
cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections()); | ||
envoy::config::bootstrap::v3::Bootstrap bootstrap; | ||
createBootstrapConfiguration(bootstrap, uris, request_source_uri, number_of_workers); | ||
if (tracing_uri != nullptr) { | ||
setupTracingImplementation(bootstrap, *tracing_uri); | ||
addTracingCluster(bootstrap, *tracing_uri); | ||
} | ||
ENVOY_LOG(debug, "Computed configuration: {}", bootstrap.DebugString()); | ||
cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap); | ||
maybeCreateTracingDriver(bootstrap.tracing()); | ||
cluster_manager_->setInitializedCb( | ||
[this]() -> void { init_manager_.initialize(init_watcher_); }); | ||
|
||
Runtime::LoaderSingleton::get().initialize(*cluster_manager_); | ||
Runtime::LoaderSingleton::get().initialize(*cluster_manager_); | ||
|
||
for (auto& w : workers_) { | ||
w->start(); | ||
for (auto& w : workers_) { | ||
w->start(); | ||
} | ||
} | ||
|
||
for (auto& w : workers_) { | ||
w->waitForCompletion(); | ||
} | ||
|
@@ -447,7 +466,7 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriP | |
const auto& counters = Utility().mapCountersFromStore( | ||
store_root_, [](absl::string_view, uint64_t value) { return value > 0; }); | ||
StatisticFactoryImpl statistic_factory(options_); | ||
collector.addResult("global", mergeWorkerStatistics(workers), counters, | ||
collector.addResult("global", mergeWorkerStatistics(workers_), counters, | ||
total_execution_duration / workers_.size()); | ||
return counters.find("sequencer.failed_terminations") == counters.end(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,6 +80,8 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger | |
*/ | ||
void shutdown() override; | ||
|
||
bool requestExecutionCancellation() override; | ||
|
||
private: | ||
/** | ||
* @brief Creates a cluster for usage by a remote request source. | ||
|
@@ -99,7 +101,7 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger | |
void maybeCreateTracingDriver(const envoy::config::trace::v3::Tracing& configuration); | ||
|
||
void configureComponentLogLevels(spdlog::level::level_enum level); | ||
const std::vector<ClientWorkerPtr>& createWorkers(const uint32_t concurrency); | ||
void createWorkers(const uint32_t concurrency); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (optional) Is this a good time to document the method? |
||
std::vector<StatisticPtr> vectorizeStatisticPtrMap(const StatisticPtrMap& statistics) const; | ||
std::vector<StatisticPtr> | ||
mergeWorkerStatistics(const std::vector<ClientWorkerPtr>& workers) const; | ||
|
@@ -145,6 +147,8 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger | |
Envoy::Server::ValidationAdmin admin_; | ||
Envoy::ProtobufMessage::ProdValidationContextImpl validation_context_; | ||
bool shutdown_{true}; | ||
Envoy::Thread::MutexBasicLockable workers_lock_; | ||
bool cancelled_{false}; | ||
}; | ||
|
||
} // namespace Client | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,5 +54,11 @@ bool RemoteProcessImpl::run(OutputCollector& collector) { | |
return false; | ||
} | ||
|
||
bool RemoteProcessImpl::requestExecutionCancellation() { | ||
ENVOY_LOG(error, "Remote process cancellation not supported yet"); | ||
// TODO(XXX): Send a cancel request to the gRPC service. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did we want to replace XXX with a value? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I filed #380 (and updated |
||
return false; | ||
} | ||
|
||
} // namespace Client | ||
} // namespace Nighthawk |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,13 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import json | ||
import logging | ||
import os | ||
import subprocess | ||
import sys | ||
import pytest | ||
import time | ||
from threading import Thread | ||
|
||
from test.integration.common import IpVersion | ||
from test.integration.integration_test_fixtures import ( | ||
|
@@ -654,6 +658,34 @@ def test_http_request_release_timing(http_test_server_fixture, qps_parameterizat | |
assertCounterEqual(counters, "benchmark.http_2xx", (total_requests)) | ||
|
||
|
||
def send_sigterm(p): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we choose a more readable name than "p". |
||
# Sleep for a while, under tsan the client needs a lot of time | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a Python style docstring, since the function is public. |
||
# to start up. | ||
time.sleep(10) | ||
p.terminate() | ||
|
||
|
||
def test_cancellation(http_test_server_fixture): | ||
""" | ||
That that we can use signals to cancel execution. | ||
""" | ||
args = [ | ||
http_test_server_fixture.nighthawk_client_path, "--concurrency", "2", | ||
http_test_server_fixture.getTestServerRootUri(), "--duration", "1000", "--output-format", | ||
"json" | ||
] | ||
client_process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | ||
Thread(target=(lambda: send_sigterm(client_process))).start() | ||
stdout, stderr = client_process.communicate() | ||
client_process.wait() | ||
output = stdout.decode('utf-8') | ||
assert (client_process.returncode == 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a custom error message giving some context? Or should this be a regular test assertion like below? |
||
parsed_json = json.loads(output) | ||
counters = http_test_server_fixture.getNighthawkCounterMapFromJson(parsed_json) | ||
assertCounterEqual(counters, "graceful_stop_requested", 2) | ||
oschaaf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assertCounterGreaterEqual(counters, "benchmark.http_2xx", 1) | ||
|
||
|
||
def _run_client_with_args(args): | ||
return run_binary_with_args("nighthawk_client", args) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) If we can, we should try to avoid abbreviations in variable names (res).