Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDS: destroy cluster info on master thread #14089

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ class Dispatcher {
*/
virtual void post(PostCb callback) PURE;

/**
* Similar to `post()` but return false if the dispatcher rejects.
*/
virtual bool tryPost(PostCb callback) PURE;

/**
* Runs the event loop. This will not return until exit() is called either from within a callback
* or from a different thread.
Expand Down
54 changes: 52 additions & 2 deletions source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,20 @@ void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) {
}
}

void DispatcherImpl::exit() { base_scheduler_.loopExit(); }
void DispatcherImpl::exit() {
{
Thread::LockGuard lock(post_lock_);
exited_ = true;
// No more post!
}
base_scheduler_.loopExit();
{
FANCY_LOG(debug, "lambdai: running postcallbacks after exit");
// TODO(lambdai): runtime key.
runPostCallbacks();
FANCY_LOG(debug, "lambdai: complete postcallbacks after exit");
}
}

SignalEventPtr DispatcherImpl::listenForSignal(int signal_num, SignalCb cb) {
ASSERT(isThreadSafe());
Expand All @@ -236,15 +249,52 @@ void DispatcherImpl::post(std::function<void()> callback) {
}
}

bool DispatcherImpl::tryPost(std::function<void()> callback) {
bool do_post;
// Post master to master. Skip.
if (isThreadSafe()) {
return false;
}
{
Thread::LockGuard lock(post_lock_);
// TODO(lambdai): For compatibility we should allow blind post.
if (exited_) {
return false;
}
do_post = post_callbacks_.empty();
post_callbacks_.push_back(callback);
}

if (do_post) {
post_cb_->scheduleCallbackCurrentIteration();
}
return true;
}

void DispatcherImpl::run(RunType type) {
run_tid_ = api_.threadFactory().currentThreadId();

{
// Allows tryPost.
FANCY_LOG(debug, "lambdai: run {}", type);
Thread::LockGuard lock(post_lock_);
if (type == RunType::Block) {
exited_ = false;
}
}
// Flush all post callbacks before we run the event loop. We do this because there are post
// callbacks that have to get run before the initial event loop starts running. libevent does
// not guarantee that events are run in any particular order. So even if we post() and call
// event_base_once() before some other event, the other event might get called first.
runPostCallbacks();
base_scheduler_.run(type);
{
FANCY_LOG(debug, "lambdai: run return {}", type);
// Reject all the follow up tryPost.
Thread::LockGuard lock(post_lock_);
exited_ = true;
}
// TODO(lambdai): reconsider this.
runPostCallbacks();
}

MonotonicTime DispatcherImpl::approximateMonotonicTime() const {
Expand Down
3 changes: 2 additions & 1 deletion source/common/event/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
void exit() override;
SignalEventPtr listenForSignal(int signal_num, SignalCb cb) override;
void post(std::function<void()> callback) override;
bool tryPost(std::function<void()> callback) override;
void run(RunType type) override;
Buffer::WatermarkFactory& getWatermarkFactory() override { return *buffer_factory_; }
const ScopeTrackedObject* setTrackedObject(const ScopeTrackedObject* object) override {
Expand Down Expand Up @@ -135,7 +136,7 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
bool isThreadSafe() const override {
return run_tid_.isEmpty() || run_tid_ == api_.threadFactory().currentThreadId();
}

std::atomic<bool> exited_{};
const std::string name_;
Api::Api& api_;
std::string stats_prefix_;
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ envoy_cc_library(
":strict_dns_cluster_lib",
":upstream_includes",
":transport_socket_match_lib",
"//source/common/event:deferred_task",
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto",
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "common/common/assert.h"
#include "common/common/enum_to_int.h"
#include "common/common/fmt.h"
#include "common/common/macros.h"
#include "common/common/utility.h"
#include "common/config/new_grpc_mux_impl.h"
#include "common/config/utility.h"
Expand Down
28 changes: 25 additions & 3 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -900,9 +900,31 @@ ClusterImplBase::ClusterImplBase(
auto socket_factory = createTransportSocketFactory(cluster, factory_context);
auto socket_matcher = std::make_unique<TransportSocketMatcherImpl>(
cluster.transport_socket_matches(), factory_context, socket_factory, *stats_scope);
info_ = std::make_unique<ClusterInfoImpl>(cluster, factory_context.clusterManager().bindConfig(),
runtime, std::move(socket_matcher),
std::move(stats_scope), added_via_api, factory_context);
auto& dispatcher = factory_context.dispatcher();
info_ = std::shared_ptr<const ClusterInfoImpl>(
new ClusterInfoImpl(cluster, factory_context.clusterManager().bindConfig(), runtime,
std::move(socket_matcher), std::move(stats_scope), added_via_api,
factory_context),
[&dispatcher](const ClusterInfoImpl* self) {
FANCY_LOG(debug, "lambdai: schedule destroy cluster info {} on this thread", self->name());
if (!dispatcher.tryPost([self]() {
// TODO(lambdai): Yet there is risk that master dispatcher receives the function but
// doesn't execute during the shutdown. We can either 1) Introduce folly::function
// which supports with unique_ptr capture and destroy cluster info by RAII, or 2) Call
// run post callback in master thread after no worker post back.
FANCY_LOG(debug,
"lambdai: execute destroy cluster info {} on this thread. Master thread is "
"expected.",
self->name());
delete self;
})) {
FANCY_LOG(debug,
"lambdai: cannot post. Has the master thread exited? Executing destroy cluster "
"info {} on this thread.",
self->name());
delete self;
}
});
// Create the default (empty) priority set before registering callbacks to
// avoid getting an update the first time it is accessed.
priority_set_.getOrCreateHostSet(0);
Expand Down
2 changes: 2 additions & 0 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -837,8 +837,10 @@ TEST_F(ClusterManagerImplTest, HttpHealthChecker) {
createClientConnection_(
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:11001")), _, _, _))
.WillOnce(Return(connection));
EXPECT_CALL(factory_.dispatcher_, tryPost(_)).Times(1);
create(parseBootstrapFromV3Yaml(yaml));
factory_.tls_.shutdownThread();
factory_.dispatcher_.to_delete_.clear();
}

TEST_F(ClusterManagerImplTest, UnknownCluster) {
Expand Down
1 change: 1 addition & 0 deletions test/mocks/event/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ class MockDispatcher : public Dispatcher {
MOCK_METHOD(void, exit, ());
MOCK_METHOD(SignalEvent*, listenForSignal_, (int signal_num, SignalCb cb));
MOCK_METHOD(void, post, (std::function<void()> callback));
MOCK_METHOD(bool, tryPost, (std::function<void()> callback));
MOCK_METHOD(void, run, (RunType type));
MOCK_METHOD(const ScopeTrackedObject*, setTrackedObject, (const ScopeTrackedObject* object));
MOCK_METHOD(bool, isThreadSafe, (), (const));
Expand Down
4 changes: 4 additions & 0 deletions test/mocks/event/wrapped_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ class WrappedDispatcher : public Dispatcher {

void post(std::function<void()> callback) override { impl_.post(std::move(callback)); }

bool tryPost(std::function<void()> callback) override {
return impl_.tryPost(std::move(callback));
}

void run(RunType type) override { impl_.run(type); }

Buffer::WatermarkFactory& getWatermarkFactory() override { return impl_.getWatermarkFactory(); }
Expand Down