Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLI - cancel execution upon signal reception #367

Merged
merged 21 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b973b75
Extract SignalHandler
oschaaf Jan 15, 2020
fb5c434
Remove accidentally left in comment
oschaaf Jan 15, 2020
1bd1d31
Fix format
oschaaf Jan 15, 2020
fcba594
Merge remote-tracking branch 'upstream/master' into extract-signal-ha…
oschaaf Jan 15, 2020
ce5ead6
Amend according to clang-tidy's complaints
oschaaf Jan 15, 2020
3f99119
Pass const ref
oschaaf Jan 15, 2020
111ba5d
Merge remote-tracking branch 'upstream/master' into extract-signal-ha…
oschaaf Jan 16, 2020
89dfdf6
Merge remote-tracking branch 'upstream/master' into extract-signal-ha…
oschaaf Jun 10, 2020
c457af6
Merge remote-tracking branch 'upstream/master' into extract-signal-ha…
oschaaf Jun 15, 2020
04eec56
CLI: Handle signals, allow cancellation of executions
oschaaf Jun 17, 2020
a11050f
Add lock guarding of the cancellation process + tests
oschaaf Jun 17, 2020
781df87
Merge remote-tracking branch 'upstream/master' into execution-cancell…
oschaaf Jun 18, 2020
2c04d10
s/cancel_requests/graceful_stop_requested/
oschaaf Jun 18, 2020
55f694f
Eliminate the NullTerminationPredicate: dead code
oschaaf Jun 18, 2020
d1a696b
Remove debug print line
oschaaf Jun 18, 2020
3c35f86
Merge remote-tracking branch 'upstream/master' into execution-cancell…
oschaaf Jun 19, 2020
9e0d9b2
Merge remote-tracking branch 'upstream/master' into execution-cancell…
oschaaf Jun 19, 2020
57d71f0
Merge remote-tracking branch 'upstream/master' into execution-cancell…
oschaaf Jun 22, 2020
1fddbec
Partially address review feedback
oschaaf Jun 22, 2020
20e6108
Review feedback pt II
oschaaf Jun 22, 2020
3198623
Review feedback
oschaaf Jun 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 53 additions & 42 deletions source/client/process_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,15 @@ void ProcessImpl::shutdown() {
// Before we shut down the worker threads, stop threading.
tls_.shutdownGlobalThreading();
store_root_.shutdownThreading();
// Before shutting down the cluster manager, stop the workers.
for (auto& worker : workers_) {
worker->shutdown();

{
auto guard = std::make_unique<Envoy::Thread::LockGuard>(workers_lock_);
// Before shutting down the cluster manager, stop the workers.
for (auto& worker : workers_) {
worker->shutdown();
}
workers_.clear();
}
workers_.clear();
if (cluster_manager_ != nullptr) {
cluster_manager_->shutdown();
}
Expand All @@ -137,13 +141,15 @@ void ProcessImpl::shutdown() {

bool ProcessImpl::requestExecutionCancellation() {
ENVOY_LOG(debug, "Requesting workers to cancel execution");
auto guard = std::make_unique<Envoy::Thread::LockGuard>(workers_lock_);
for (auto& worker : workers_) {
worker->requestExecutionCancellation();
}
cancelled_ = true;
return true;
}

const std::vector<ClientWorkerPtr>& ProcessImpl::createWorkers(const uint32_t concurrency) {
void ProcessImpl::createWorkers(const uint32_t concurrency) {
// TODO(oschaaf): Expose kMinimalDelay in configuration.
const std::chrono::milliseconds kMinimalWorkerDelay = 500ms + (concurrency * 50ms);
ASSERT(workers_.empty());
Expand Down Expand Up @@ -173,7 +179,6 @@ const std::vector<ClientWorkerPtr>& ProcessImpl::createWorkers(const uint32_t co
: ClientWorkerImpl::HardCodedWarmupStyle::OFF));
worker_number++;
}
return workers_;
}

void ProcessImpl::configureComponentLogLevels(spdlog::level::level_enum level) {
Expand Down Expand Up @@ -385,44 +390,50 @@ void ProcessImpl::addRequestSourceCluster(

bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriPtr>& uris,
const UriPtr& request_source_uri, const UriPtr& tracing_uri) {
int number_of_workers = determineConcurrency();
shutdown_ = false;
const std::vector<ClientWorkerPtr>& workers = createWorkers(number_of_workers);
tls_.registerThread(*dispatcher_, true);
store_root_.initializeThreading(*dispatcher_, tls_);
runtime_singleton_ = std::make_unique<Envoy::Runtime::ScopedLoaderSingleton>(
Envoy::Runtime::LoaderPtr{new Envoy::Runtime::LoaderImpl(
*dispatcher_, tls_, {}, *local_info_, store_root_, generator_,
Envoy::ProtobufMessage::getStrictValidationVisitor(), *api_)});
ssl_context_manager_ =
std::make_unique<Extensions::TransportSockets::Tls::ContextManagerImpl>(time_system_);
cluster_manager_factory_ = std::make_unique<ClusterManagerFactory>(
admin_, Envoy::Runtime::LoaderSingleton::get(), store_root_, tls_, generator_,
dispatcher_->createDnsResolver({}, false), *ssl_context_manager_, *dispatcher_, *local_info_,
secret_manager_, validation_context_, *api_, http_context_, grpc_context_,
access_log_manager_, *singleton_manager_);
cluster_manager_factory_->setConnectionReuseStrategy(
options_.h1ConnectionReuseStrategy() == nighthawk::client::H1ConnectionReuseStrategy::LRU
? Http1PoolImpl::ConnectionReuseStrategy::LRU
: Http1PoolImpl::ConnectionReuseStrategy::MRU);
cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections());
envoy::config::bootstrap::v3::Bootstrap bootstrap;
createBootstrapConfiguration(bootstrap, uris, request_source_uri, number_of_workers);
if (tracing_uri != nullptr) {
setupTracingImplementation(bootstrap, *tracing_uri);
addTracingCluster(bootstrap, *tracing_uri);
}
ENVOY_LOG(debug, "Computed configuration: {}", bootstrap.DebugString());
cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap);
maybeCreateTracingDriver(bootstrap.tracing());
cluster_manager_->setInitializedCb([this]() -> void { init_manager_.initialize(init_watcher_); });
{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: The "hide whitespace" toggle in Github makes this a lot more fun to review.
(The scope introduced for the lock guard introduces a lot of whitespace noise).

auto guard = std::make_unique<Envoy::Thread::LockGuard>(workers_lock_);
if (cancelled_) {
return true;
}
int number_of_workers = determineConcurrency();
shutdown_ = false;
createWorkers(number_of_workers);
tls_.registerThread(*dispatcher_, true);
store_root_.initializeThreading(*dispatcher_, tls_);
runtime_singleton_ = std::make_unique<Envoy::Runtime::ScopedLoaderSingleton>(
Envoy::Runtime::LoaderPtr{new Envoy::Runtime::LoaderImpl(
*dispatcher_, tls_, {}, *local_info_, store_root_, generator_,
Envoy::ProtobufMessage::getStrictValidationVisitor(), *api_)});
ssl_context_manager_ =
std::make_unique<Extensions::TransportSockets::Tls::ContextManagerImpl>(time_system_);
cluster_manager_factory_ = std::make_unique<ClusterManagerFactory>(
admin_, Envoy::Runtime::LoaderSingleton::get(), store_root_, tls_, generator_,
dispatcher_->createDnsResolver({}, false), *ssl_context_manager_, *dispatcher_,
*local_info_, secret_manager_, validation_context_, *api_, http_context_, grpc_context_,
access_log_manager_, *singleton_manager_);
cluster_manager_factory_->setConnectionReuseStrategy(
options_.h1ConnectionReuseStrategy() == nighthawk::client::H1ConnectionReuseStrategy::LRU
? Http1PoolImpl::ConnectionReuseStrategy::LRU
: Http1PoolImpl::ConnectionReuseStrategy::MRU);
cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections());
envoy::config::bootstrap::v3::Bootstrap bootstrap;
createBootstrapConfiguration(bootstrap, uris, request_source_uri, number_of_workers);
if (tracing_uri != nullptr) {
setupTracingImplementation(bootstrap, *tracing_uri);
addTracingCluster(bootstrap, *tracing_uri);
}
ENVOY_LOG(debug, "Computed configuration: {}", bootstrap.DebugString());
cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap);
maybeCreateTracingDriver(bootstrap.tracing());
cluster_manager_->setInitializedCb(
[this]() -> void { init_manager_.initialize(init_watcher_); });

Runtime::LoaderSingleton::get().initialize(*cluster_manager_);
Runtime::LoaderSingleton::get().initialize(*cluster_manager_);

for (auto& w : workers_) {
w->start();
for (auto& w : workers_) {
w->start();
}
}

for (auto& w : workers_) {
w->waitForCompletion();
}
Expand Down Expand Up @@ -451,7 +462,7 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriP
const auto& counters = Utility().mapCountersFromStore(
store_root_, [](absl::string_view, uint64_t value) { return value > 0; });
StatisticFactoryImpl statistic_factory(options_);
collector.addResult("global", mergeWorkerStatistics(workers), counters,
collector.addResult("global", mergeWorkerStatistics(workers_), counters,
total_execution_duration / workers_.size());
return counters.find("sequencer.failed_terminations") == counters.end();
}
Expand Down
4 changes: 3 additions & 1 deletion source/client/process_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger
void maybeCreateTracingDriver(const envoy::config::trace::v3::Tracing& configuration);

void configureComponentLogLevels(spdlog::level::level_enum level);
const std::vector<ClientWorkerPtr>& createWorkers(const uint32_t concurrency);
void createWorkers(const uint32_t concurrency);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(optional) Is this a good time to document the method?

std::vector<StatisticPtr> vectorizeStatisticPtrMap(const StatisticPtrMap& statistics) const;
std::vector<StatisticPtr>
mergeWorkerStatistics(const std::vector<ClientWorkerPtr>& workers) const;
Expand Down Expand Up @@ -136,6 +136,8 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger
Envoy::Server::ValidationAdmin admin_;
Envoy::ProtobufMessage::ProdValidationContextImpl validation_context_;
bool shutdown_{true};
Envoy::Thread::MutexBasicLockable workers_lock_;
bool cancelled_{false};
};

} // namespace Client
Expand Down
1 change: 1 addition & 0 deletions source/client/remote_process_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ bool RemoteProcessImpl::run(OutputCollector& collector) {

bool RemoteProcessImpl::requestExecutionCancellation() {
ENVOY_LOG(error, "Remote process cancellation not supported yet");
// TODO(XXX): Send a cancel request to the gRPC service.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we want to replace XXX with a value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #380 (and updated XXX to #380) in the changes I'm about to push.

return false;
}

Expand Down
4 changes: 3 additions & 1 deletion test/integration/test_integration_basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,9 @@ def test_http_request_release_timing(http_test_server_fixture, qps_parameterizat


def send_sigterm(p):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we choose a more readable name than "p".

time.sleep(2)
# Sleep for a while, under tsan the client needs a lot of time
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a Python style docstring, since the function is public.

# to start up.
time.sleep(10)
p.terminate()


Expand Down
33 changes: 32 additions & 1 deletion test/process_test.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <thread>
#include <vector>

#include "nighthawk/common/exception.h"
Expand Down Expand Up @@ -32,12 +33,35 @@ class ProcessTest : public TestWithParam<Envoy::Network::Address::IpVersion> {
: loopback_address_(Envoy::Network::Test::getLoopbackAddressUrlString(GetParam())),
options_(TestUtility::createOptionsImpl(
fmt::format("foo --duration 1 -v error --rps 10 https://{}/", loopback_address_))){};
void runProcess(RunExpectation expectation) {
void runProcess(RunExpectation expectation, bool do_cancel = false) {
ProcessPtr process = std::make_unique<ProcessImpl>(*options_, time_system_);
OutputCollectorImpl collector(time_system_, *options_);
std::thread cancel_thread;
if (do_cancel) {
cancel_thread = std::thread([&process] {
sleep(5);
std::cerr << "request cancel" << std::endl;
process->requestExecutionCancellation();
});
}
const auto result =
process->run(collector) ? RunExpectation::EXPECT_SUCCESS : RunExpectation::EXPECT_FAILURE;
EXPECT_EQ(result, expectation);
if (cancel_thread.joinable()) {
cancel_thread.join();
}
if (do_cancel) {
auto proto = collector.toProto();
int cancel_requests = 0;
for (const auto& result : proto.results()) {
for (const auto& counter : result.counters()) {
if (counter.name() == "cancel_requests") {
cancel_requests++;
}
}
}
EXPECT_EQ(3, cancel_requests); // global results + two workers
}
process->shutdown();
}

Expand All @@ -64,5 +88,12 @@ TEST_P(ProcessTest, BadTracerSpec) {
runProcess(RunExpectation::EXPECT_FAILURE);
}

TEST_P(ProcessTest, CancelExecution) {
options_ = TestUtility::createOptionsImpl(
fmt::format("foo --duration 10 --failure-predicate foo:0 --concurrency 2 https://{}/",
loopback_address_));
runProcess(RunExpectation::EXPECT_SUCCESS, true);
}

} // namespace Client
} // namespace Nighthawk