Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
fix: avoid deadlocks during shutdown (#1397)
Browse files Browse the repository at this point in the history
Avoid deadlocks caused by the SessionPool both controlling the
lifetime of the background threads in its destructor, and being also
destructed by those threads. The background threads are now owned by
the ConnectionImpl.
  • Loading branch information
coryan authored Mar 17, 2020
1 parent 4099652 commit 6c07bb0
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 38 deletions.
Binary file modified ci/test-api/spanner_client.expected.abi.dump.gz
Binary file not shown.
10 changes: 5 additions & 5 deletions google/cloud/spanner/internal/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ ConnectionImpl::ConnectionImpl(Database db,
: db_(std::move(db)),
retry_policy_prototype_(std::move(retry_policy)),
backoff_policy_prototype_(std::move(backoff_policy)),
session_pool_(MakeSessionPool(db_, std::move(stubs),
std::move(session_pool_options),
options.background_threads_factory()(),
retry_policy_prototype_->clone(),
backoff_policy_prototype_->clone())),
background_threads_(options.background_threads_factory()()),
session_pool_(MakeSessionPool(
db_, std::move(stubs), std::move(session_pool_options),
background_threads_->cq(), retry_policy_prototype_->clone(),
backoff_policy_prototype_->clone())),
rpc_stream_tracing_enabled_(options.tracing_enabled("rpc-streams")),
tracing_options_(options.tracing_options()) {}

Expand Down
2 changes: 2 additions & 0 deletions google/cloud/spanner/internal/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "google/cloud/spanner/retry_policy.h"
#include "google/cloud/spanner/tracing_options.h"
#include "google/cloud/spanner/version.h"
#include "google/cloud/background_threads.h"
#include "google/cloud/status.h"
#include "google/cloud/status_or.h"
#include <google/spanner/v1/spanner.pb.h>
Expand Down Expand Up @@ -167,6 +168,7 @@ class ConnectionImpl : public Connection {
Database db_;
std::shared_ptr<RetryPolicy const> retry_policy_prototype_;
std::shared_ptr<BackoffPolicy const> backoff_policy_prototype_;
std::unique_ptr<BackgroundThreads> background_threads_;
std::shared_ptr<SessionPool> session_pool_;
bool rpc_stream_tracing_enabled_ = false;
TracingOptions tracing_options_;
Expand Down
16 changes: 6 additions & 10 deletions google/cloud/spanner/internal/session_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "google/cloud/spanner/internal/connection_impl.h"
#include "google/cloud/spanner/internal/retry_loop.h"
#include "google/cloud/spanner/internal/session.h"
#include "google/cloud/background_threads.h"
#include "google/cloud/completion_queue.h"
#include "google/cloud/internal/make_unique.h"
#include "google/cloud/log.h"
Expand All @@ -38,28 +37,26 @@ namespace spanner_proto = ::google::spanner::v1;

std::shared_ptr<SessionPool> MakeSessionPool(
Database db, std::vector<std::shared_ptr<SpannerStub>> stubs,
SessionPoolOptions options,
std::unique_ptr<BackgroundThreads> background_threads,
SessionPoolOptions options, google::cloud::CompletionQueue cq,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy) {
auto pool = std::make_shared<SessionPool>(
std::move(db), std::move(stubs), std::move(options),
std::move(background_threads), std::move(retry_policy),
std::move(backoff_policy));
std::move(db), std::move(stubs), std::move(options), std::move(cq),
std::move(retry_policy), std::move(backoff_policy));
pool->Initialize();
return pool;
}

SessionPool::SessionPool(Database db,
std::vector<std::shared_ptr<SpannerStub>> stubs,
SessionPoolOptions options,
std::unique_ptr<BackgroundThreads> background_threads,
google::cloud::CompletionQueue cq,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy)
: db_(std::move(db)),
options_(std::move(
options.EnforceConstraints(static_cast<int>(stubs.size())))),
background_threads_(std::move(background_threads)),
cq_(std::move(cq)),
retry_policy_prototype_(std::move(retry_policy)),
backoff_policy_prototype_(std::move(backoff_policy)),
max_pool_size_(options_.max_sessions_per_channel() *
Expand Down Expand Up @@ -103,8 +100,7 @@ void SessionPool::ScheduleBackgroundWork(std::chrono::seconds relative_time) {
// See the comment in the destructor about the thread safety of this method.
std::weak_ptr<SessionPool> pool = shared_from_this();
current_timer_ =
background_threads_->cq()
.MakeRelativeTimer(relative_time)
cq_.MakeRelativeTimer(relative_time)
.then([pool](future<StatusOr<std::chrono::system_clock::time_point>>
result) {
if (result.get().ok()) {
Expand Down
10 changes: 4 additions & 6 deletions google/cloud/spanner/internal/session_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include "google/cloud/spanner/retry_policy.h"
#include "google/cloud/spanner/session_pool_options.h"
#include "google/cloud/spanner/version.h"
#include "google/cloud/background_threads.h"
#include "google/cloud/completion_queue.h"
#include "google/cloud/future.h"
#include "google/cloud/status_or.h"
#include <google/spanner/v1/spanner.pb.h>
Expand Down Expand Up @@ -65,8 +65,7 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
* than calling the constructor and `Initialize()` directly.
*/
SessionPool(Database db, std::vector<std::shared_ptr<SpannerStub>> stubs,
SessionPoolOptions options,
std::unique_ptr<BackgroundThreads> background_threads,
SessionPoolOptions options, google::cloud::CompletionQueue cq,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy);

Expand Down Expand Up @@ -142,7 +141,7 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {

Database const db_;
SessionPoolOptions const options_;
std::unique_ptr<BackgroundThreads> const background_threads_;
google::cloud::CompletionQueue cq_;
std::unique_ptr<RetryPolicy const> retry_policy_prototype_;
std::unique_ptr<BackoffPolicy const> backoff_policy_prototype_;
int const max_pool_size_;
Expand Down Expand Up @@ -172,8 +171,7 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
*/
std::shared_ptr<SessionPool> MakeSessionPool(
Database db, std::vector<std::shared_ptr<SpannerStub>> stubs,
SessionPoolOptions options,
std::unique_ptr<BackgroundThreads> background_threads,
SessionPoolOptions options, google::cloud::CompletionQueue cq,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy);

Expand Down
45 changes: 28 additions & 17 deletions google/cloud/spanner/internal/session_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,9 @@ spanner_proto::BatchCreateSessionsResponse MakeSessionsResponse(

std::shared_ptr<SessionPool> MakeSessionPool(
Database db, std::vector<std::shared_ptr<SpannerStub>> stubs,
SessionPoolOptions options) {
SessionPoolOptions options, CompletionQueue cq) {
return MakeSessionPool(
std::move(db), std::move(stubs), std::move(options),
google::cloud::internal::make_unique<
google::cloud::internal::AutomaticallyCreatedBackgroundThreads>(),
std::move(db), std::move(stubs), std::move(options), std::move(cq),
google::cloud::internal::make_unique<LimitedTimeRetryPolicy>(
std::chrono::minutes(10)),
google::cloud::internal::make_unique<ExponentialBackoffPolicy>(
Expand All @@ -88,7 +86,8 @@ TEST(SessionPool, Allocate) {
return MakeSessionsResponse({"session1"});
});

auto pool = MakeSessionPool(db, {mock}, {});
google::cloud::internal::AutomaticallyCreatedBackgroundThreads threads;
auto pool = MakeSessionPool(db, {mock}, {}, threads.cq());
auto session = pool->Allocate();
ASSERT_STATUS_OK(session);
EXPECT_EQ((*session)->session_name(), "session1");
Expand All @@ -114,7 +113,8 @@ TEST(SessionPool, ReleaseBadSession) {
return MakeSessionsResponse({"session2"});
});

auto pool = MakeSessionPool(db, {mock}, {});
google::cloud::internal::AutomaticallyCreatedBackgroundThreads threads;
auto pool = MakeSessionPool(db, {mock}, {}, threads.cq());
{
auto session = pool->Allocate();
ASSERT_STATUS_OK(session);
Expand All @@ -139,7 +139,8 @@ TEST(SessionPool, CreateError) {
EXPECT_CALL(*mock, BatchCreateSessions(_, _))
.WillOnce(Return(ByMove(Status(StatusCode::kInternal, "some failure"))));

auto pool = MakeSessionPool(db, {mock}, {});
google::cloud::internal::AutomaticallyCreatedBackgroundThreads threads;
auto pool = MakeSessionPool(db, {mock}, {}, threads.cq());
auto session = pool->Allocate();
EXPECT_EQ(session.status().code(), StatusCode::kInternal);
EXPECT_THAT(session.status().message(), HasSubstr("some failure"));
Expand All @@ -151,7 +152,8 @@ TEST(SessionPool, ReuseSession) {
EXPECT_CALL(*mock, BatchCreateSessions(_, _))
.WillOnce(Return(ByMove(MakeSessionsResponse({"session1"}))));

auto pool = MakeSessionPool(db, {mock}, {});
google::cloud::internal::AutomaticallyCreatedBackgroundThreads threads;
auto pool = MakeSessionPool(db, {mock}, {}, threads.cq());
auto session = pool->Allocate();
ASSERT_STATUS_OK(session);
EXPECT_EQ((*session)->session_name(), "session1");
Expand All @@ -169,7 +171,8 @@ TEST(SessionPool, Lifo) {
.WillOnce(Return(ByMove(MakeSessionsResponse({"session1"}))))
.WillOnce(Return(ByMove(MakeSessionsResponse({"session2"}))));

auto pool = MakeSessionPool(db, {mock}, {});
google::cloud::internal::AutomaticallyCreatedBackgroundThreads threads;
auto pool = MakeSessionPool(db, {mock}, {}, threads.cq());
auto session = pool->Allocate();
ASSERT_STATUS_OK(session);
EXPECT_EQ((*session)->session_name(), "session1");
Expand Down Expand Up @@ -201,7 +204,8 @@ TEST(SessionPool, MinSessionsEagerAllocation) {

SessionPoolOptions options;
options.set_min_sessions(min_sessions);
auto pool = MakeSessionPool(db, {mock}, options);
google::cloud::internal::AutomaticallyCreatedBackgroundThreads threads;
auto pool = MakeSessionPool(db, {mock}, options, threads.cq());
auto session = pool->Allocate();
}

Expand All @@ -215,7 +219,8 @@ TEST(SessionPool, MinSessionsMultipleAllocations) {

SessionPoolOptions options;
options.set_min_sessions(min_sessions);
auto pool = MakeSessionPool(db, {mock}, options);
google::cloud::internal::AutomaticallyCreatedBackgroundThreads threads;
auto pool = MakeSessionPool(db, {mock}, options, threads.cq());

// When we run out of sessions it will make this call.
EXPECT_CALL(*mock, BatchCreateSessions(_, SessionCountIs(min_sessions + 1)))
Expand Down Expand Up @@ -244,7 +249,8 @@ TEST(SessionPool, MaxSessionsFailOnExhaustion) {
SessionPoolOptions options;
options.set_max_sessions_per_channel(max_sessions_per_channel)
.set_action_on_exhaustion(ActionOnExhaustion::kFail);
auto pool = MakeSessionPool(db, {mock}, options);
google::cloud::internal::AutomaticallyCreatedBackgroundThreads threads;
auto pool = MakeSessionPool(db, {mock}, options, threads.cq());
std::vector<SessionHolder> sessions;
std::vector<std::string> session_names;
for (int i = 1; i <= 3; ++i) {
Expand All @@ -269,7 +275,8 @@ TEST(SessionPool, MaxSessionsBlockUntilRelease) {
SessionPoolOptions options;
options.set_max_sessions_per_channel(max_sessions_per_channel)
.set_action_on_exhaustion(ActionOnExhaustion::kBlock);
auto pool = MakeSessionPool(db, {mock}, options);
google::cloud::internal::AutomaticallyCreatedBackgroundThreads threads;
auto pool = MakeSessionPool(db, {mock}, options, threads.cq());
auto session = pool->Allocate();
ASSERT_STATUS_OK(session);
EXPECT_EQ((*session)->session_name(), "s1");
Expand All @@ -295,7 +302,8 @@ TEST(SessionPool, Labels) {

SessionPoolOptions options;
options.set_labels(std::move(labels));
auto pool = MakeSessionPool(db, {mock}, options);
google::cloud::internal::AutomaticallyCreatedBackgroundThreads threads;
auto pool = MakeSessionPool(db, {mock}, options, threads.cq());
auto session = pool->Allocate();
ASSERT_STATUS_OK(session);
EXPECT_EQ((*session)->session_name(), "session1");
Expand All @@ -314,7 +322,8 @@ TEST(SessionPool, MultipleChannels) {
.WillOnce(Return(ByMove(MakeSessionsResponse({"c2s2"}))))
.WillOnce(Return(ByMove(MakeSessionsResponse({"c2s3"}))));

auto pool = MakeSessionPool(db, {mock1, mock2}, {});
google::cloud::internal::AutomaticallyCreatedBackgroundThreads threads;
auto pool = MakeSessionPool(db, {mock1, mock2}, {}, threads.cq());
std::vector<SessionHolder> sessions;
std::vector<std::string> session_names;
for (int i = 1; i <= 6; ++i) {
Expand Down Expand Up @@ -345,7 +354,8 @@ TEST(SessionPool, MultipleChannelsPreAllocation) {
options.set_min_sessions(20)
.set_max_sessions_per_channel(3)
.set_action_on_exhaustion(ActionOnExhaustion::kFail);
auto pool = MakeSessionPool(db, {mock1, mock2, mock3}, options);
google::cloud::internal::AutomaticallyCreatedBackgroundThreads threads;
auto pool = MakeSessionPool(db, {mock1, mock2, mock3}, options, threads.cq());
std::vector<SessionHolder> sessions;
std::vector<std::string> session_names;
for (int i = 1; i <= 9; ++i) {
Expand All @@ -365,7 +375,8 @@ TEST(SessionPool, MultipleChannelsPreAllocation) {
TEST(SessionPool, GetStubForStublessSession) {
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
auto db = Database("project", "instance", "database");
auto pool = MakeSessionPool(db, {mock}, {});
google::cloud::internal::AutomaticallyCreatedBackgroundThreads threads;
auto pool = MakeSessionPool(db, {mock}, {}, threads.cq());
// ensure we get a stub even if we didn't allocate from the pool.
auto session = MakeDissociatedSessionHolder("session_id");
EXPECT_EQ(pool->GetStub(*session), mock);
Expand Down

0 comments on commit 6c07bb0

Please sign in to comment.