Skip to content

Commit

Permalink
Factor SessionPool out of ConnectionImpl. (googleapis/google-cloud-cp…
Browse files Browse the repository at this point in the history
…p-spanner#916)

* Factor SessionPool out of ConnectionImpl.

Make `SessionPool` its own class to avoid clutter in `ConnectionImpl`
and to make it easier to test.

Since `SessionPool` is a member of `ConnectionImpl`, which guarantees
the former cannot outlive the latter; only `ConnectionImpl` is required
to be reference counted. This simplifies lifetime management, avoiding
some subtle issues and race conditions, at the cost of requiring some
ostensibly pool-related private methods in `ConnectionImpl` for the
benefit of `SessionHolder`

There are no functional changes compared to the current implementation.

Part of googleapis/google-cloud-cpp-spanner#307

* Apparently I failed to learn my lesson about make_unique.

* address review comments

* fix build and add some comments to SessionPool

* fix the build for real this time and remove an extra {}

* Argh - I knew those {} were there for a reason (else cxx17 complains)
  • Loading branch information
mr-salty authored Oct 3, 2019
1 parent 9fab76b commit e6ff9ac
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 61 deletions.
3 changes: 3 additions & 0 deletions google/cloud/spanner/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ add_library(spanner_client
internal/retry_loop.cc
internal/retry_loop.h
internal/session.h
internal/session_pool.cc
internal/session_pool.h
internal/spanner_stub.cc
internal/spanner_stub.h
internal/time.cc
Expand Down Expand Up @@ -313,6 +315,7 @@ function (spanner_client_define_tests)
internal/partial_result_set_resume_test.cc
internal/polling_loop_test.cc
internal/retry_loop_test.cc
internal/session_pool_test.cc
internal/spanner_stub_test.cc
internal/time_format_test.cc
internal/time_test.cc
Expand Down
92 changes: 44 additions & 48 deletions google/cloud/spanner/internal/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ std::shared_ptr<ConnectionImpl> MakeConnection(
std::move(retry_policy), std::move(backoff_policy)));
}

ConnectionImpl::ConnectionImpl(Database db, std::shared_ptr<SpannerStub> stub,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy)
: db_(std::move(db)),
stub_(std::move(stub)),
retry_policy_(std::move(retry_policy)),
backoff_policy_(std::move(backoff_policy)),
session_pool_(this) {}

StatusOr<ResultSet> ConnectionImpl::Read(ReadParams rp) {
return internal::Visit(
std::move(rp.transaction),
Expand Down Expand Up @@ -185,7 +194,7 @@ StatusOr<ResultSet> ConnectionImpl::ReadImpl(
SessionHolder& session, spanner_proto::TransactionSelector& s,
ReadParams rp) {
if (!session) {
auto session_or = GetSession();
auto session_or = AllocateSession();
if (!session_or) {
return std::move(session_or).status();
}
Expand Down Expand Up @@ -241,7 +250,7 @@ StatusOr<std::vector<ReadPartition>> ConnectionImpl::PartitionReadImpl(
if (!session) {
// Since the session may be sent to other machines, it should not be
// returned to the pool when the Transaction is destroyed.
auto session_or = GetSession(/*dissociate_from_pool=*/true);
auto session_or = AllocateSession(/*dissociate_from_pool=*/true);
if (!session_or) {
return std::move(session_or).status();
}
Expand Down Expand Up @@ -289,7 +298,7 @@ StatusOr<ResultSet> ConnectionImpl::ExecuteSqlImpl(
SessionHolder& session, spanner_proto::TransactionSelector& s,
std::int64_t seqno, ExecuteSqlParams esp) {
if (!session) {
auto session_or = GetSession();
auto session_or = AllocateSession();
if (!session_or) {
return std::move(session_or).status();
}
Expand Down Expand Up @@ -333,7 +342,7 @@ StatusOr<PartitionedDmlResult> ConnectionImpl::ExecutePartitionedDmlImpl(
SessionHolder& session, spanner_proto::TransactionSelector& s,
std::int64_t seqno, ExecutePartitionedDmlParams epdp) {
if (!session) {
auto session_or = GetSession();
auto session_or = AllocateSession();
if (!session_or) {
return std::move(session_or).status();
}
Expand Down Expand Up @@ -379,7 +388,7 @@ StatusOr<std::vector<QueryPartition>> ConnectionImpl::PartitionQueryImpl(
if (!session) {
// Since the session may be sent to other machines, it should not be
// returned to the pool when the Transaction is destroyed.
auto session_or = GetSession(/*dissociate_from_pool=*/true);
auto session_or = AllocateSession(/*dissociate_from_pool=*/true);
if (!session_or) {
return std::move(session_or).status();
}
Expand Down Expand Up @@ -425,7 +434,7 @@ StatusOr<BatchDmlResult> ConnectionImpl::ExecuteBatchDmlImpl(
SessionHolder& session, spanner_proto::TransactionSelector& s,
std::int64_t seqno, ExecuteBatchDmlParams params) {
if (!session) {
auto session_or = GetSession();
auto session_or = AllocateSession();
if (!session_or) {
return std::move(session_or).status();
}
Expand Down Expand Up @@ -468,7 +477,7 @@ StatusOr<CommitResult> ConnectionImpl::CommitImpl(
SessionHolder& session, spanner_proto::TransactionSelector& s,
CommitParams cp) {
if (!session) {
auto session_or = GetSession();
auto session_or = AllocateSession();
if (!session_or) {
return std::move(session_or).status();
}
Expand Down Expand Up @@ -508,7 +517,7 @@ StatusOr<CommitResult> ConnectionImpl::CommitImpl(
Status ConnectionImpl::RollbackImpl(SessionHolder& session,
spanner_proto::TransactionSelector& s) {
if (!session) {
auto session_or = GetSession();
auto session_or = AllocateSession();
if (!session_or) {
return std::move(session_or).status();
}
Expand Down Expand Up @@ -536,50 +545,20 @@ Status ConnectionImpl::RollbackImpl(SessionHolder& session,
request, __func__);
}

/**
* Get a session from the pool, or create one if the pool is empty.
* @returns an error if session creation fails; always returns a valid
* `SessionHolder` (never `nullptr`) on success.
*
* The `SessionHolder` usually returns the session to the pool when it is
* destroyed. However, if `dissociate_from_pool` is true the session will not
* be returned to the session pool. This is used in partitioned operations,
* since we don't know when all parties are done using the session.
*/
StatusOr<SessionHolder> ConnectionImpl::GetSession(bool dissociate_from_pool) {
std::unique_ptr<Session> session;
std::unique_lock<std::mutex> lk(mu_);
if (!sessions_.empty()) {
session = std::move(sessions_.back());
sessions_.pop_back();
} else {
// Release the mutex because we won't be doing any more changes to
// `sessions_` in this function and holding mutexes while making RPCs is
// (generally) a bad practice.
lk.unlock();
spanner_proto::CreateSessionRequest request;
request.set_database(db_.FullName());
auto response = RetryLoop(
retry_policy_->clone(), backoff_policy_->clone(), true,
[this](grpc::ClientContext& context,
spanner_proto::CreateSessionRequest const& request) {
return stub_->CreateSession(context, request);
},
request, __func__);
if (!response) {
return response.status();
}
session = google::cloud::internal::make_unique<Session>(
std::move(*response->mutable_name()));
StatusOr<SessionHolder> ConnectionImpl::AllocateSession(
bool dissociate_from_pool) {
auto session = session_pool_.Allocate();
if (!session.ok()) {
return std::move(session).status();
}

if (dissociate_from_pool) {
// Uses the default deleter; the Session is not returned to the pool.
return {std::move(session)};
return {*std::move(session)};
}

std::weak_ptr<ConnectionImpl> connection = shared_from_this();
return SessionHolder(session.release(), [connection](Session* session) {
return SessionHolder(session->release(), [connection](Session* session) {
auto shared_connection = connection.lock();
// If `connection` is still alive, release the `Session` to its pool;
// otherwise just delete the `Session`.
Expand All @@ -591,9 +570,26 @@ StatusOr<SessionHolder> ConnectionImpl::GetSession(bool dissociate_from_pool) {
});
}

void ConnectionImpl::ReleaseSession(Session* session) {
std::lock_guard<std::mutex> lk(mu_);
sessions_.emplace_back(session);
StatusOr<std::vector<std::unique_ptr<Session>>> ConnectionImpl::CreateSessions(
size_t /*num_sessions*/) {
// TODO(#307) use BatchCreateSessions. For now, `num_sessions` is ignored.
spanner_proto::CreateSessionRequest request;
request.set_database(db_.FullName());
auto response = RetryLoop(
retry_policy_->clone(), backoff_policy_->clone(), true,
[this](grpc::ClientContext& context,
spanner_proto::CreateSessionRequest const& request) {
return stub_->CreateSession(context, request);
},
request, __func__);
if (!response) {
return response.status();
}
std::vector<std::unique_ptr<Session>> ret;
ret.reserve(1);
ret.push_back(google::cloud::internal::make_unique<Session>(
std::move(*response->mutable_name())));
return {std::move(ret)};
}

} // namespace internal
Expand Down
42 changes: 29 additions & 13 deletions google/cloud/spanner/internal/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "google/cloud/spanner/connection.h"
#include "google/cloud/spanner/database.h"
#include "google/cloud/spanner/internal/session.h"
#include "google/cloud/spanner/internal/session_pool.h"
#include "google/cloud/spanner/internal/spanner_stub.h"
#include "google/cloud/spanner/retry_policy.h"
#include "google/cloud/spanner/version.h"
Expand All @@ -28,6 +29,7 @@
#include <memory>
#include <mutex>
#include <string>
#include <vector>

namespace google {
namespace cloud {
Expand Down Expand Up @@ -62,6 +64,7 @@ std::shared_ptr<ConnectionImpl> MakeConnection(
* and returns instances of this class.
*/
class ConnectionImpl : public Connection,
public SessionManager,
public std::enable_shared_from_this<ConnectionImpl> {
public:
StatusOr<ResultSet> Read(ReadParams) override;
Expand All @@ -83,11 +86,7 @@ class ConnectionImpl : public Connection,
std::unique_ptr<BackoffPolicy>);
ConnectionImpl(Database db, std::shared_ptr<SpannerStub> stub,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy)
: db_(std::move(db)),
stub_(std::move(stub)),
retry_policy_(std::move(retry_policy)),
backoff_policy_(std::move(backoff_policy)) {}
std::unique_ptr<BackoffPolicy> backoff_policy);

StatusOr<ResultSet> ReadImpl(SessionHolder& session,
google::spanner::v1::TransactionSelector& s,
Expand Down Expand Up @@ -120,19 +119,36 @@ class ConnectionImpl : public Connection,
Status RollbackImpl(SessionHolder& session,
google::spanner::v1::TransactionSelector& s);

StatusOr<SessionHolder> GetSession(bool dissociate_from_pool = false);
void ReleaseSession(Session* session);
/**
* Get a session from the pool, or create one if the pool is empty.
* @returns an error if session creation fails; always returns a valid
* `SessionHolder` (never `nullptr`) on success.
*
* The `SessionHolder` usually returns the session to the pool when it is
* destroyed. However, if `dissociate_from_pool` is true the session will not
* be returned to the session pool. This is used in partitioned operations,
* since we don't know when all parties are done using the session.
*/
StatusOr<SessionHolder> AllocateSession(bool dissociate_from_pool = false);

// Forwards calls for the `SessionPool`; used in the `SessionHolder` deleter
// so it can hold a `weak_ptr` to `ConnectionImpl` (it's already reference
// counted, and manages the lifetime of `SessionPool`).
void ReleaseSession(Session* session) {
session_pool_.Release(std::unique_ptr<Session>(session));
}

// `SessionManager` methods; used by the `SessionPool`
StatusOr<std::vector<std::unique_ptr<Session>>> CreateSessions(
size_t num_sessions) override;

Database db_;
std::shared_ptr<internal::SpannerStub> stub_;

// The current session pool.
// TODO(#307) - improve session refresh and expiration.
std::mutex mu_;
std::vector<std::unique_ptr<Session>> sessions_; // GUARDED_BY(mu_)
std::shared_ptr<SpannerStub> stub_;

std::unique_ptr<RetryPolicy> retry_policy_;
std::unique_ptr<BackoffPolicy> backoff_policy_;

SessionPool session_pool_;
};

} // namespace internal
Expand Down
54 changes: 54 additions & 0 deletions google/cloud/spanner/internal/session_pool.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/spanner/internal/session_pool.h"
#include "google/cloud/spanner/internal/connection_impl.h"
#include "google/cloud/spanner/internal/session.h"
#include <memory>

namespace google {
namespace cloud {
namespace spanner {
inline namespace SPANNER_CLIENT_NS {
namespace internal {

StatusOr<std::unique_ptr<Session>> SessionPool::Allocate() {
{
std::lock_guard<std::mutex> lk(mu_);
if (!sessions_.empty()) {
auto session = std::move(sessions_.back());
sessions_.pop_back();
return {std::move(session)};
}
}

auto sessions = manager_->CreateSessions(/*num_sessions=*/1);
if (!sessions.ok()) {
return std::move(sessions).status();
}
// TODO(#307) for now, assume CreateSessions() returns exactly one Session on
// success (as we requested). Rewrite this to accommodate multiple sessions.
return {std::move((*sessions)[0])};
}

void SessionPool::Release(std::unique_ptr<Session> session) {
std::lock_guard<std::mutex> lk(mu_);
sessions_.push_back(std::move(session));
}

} // namespace internal
} // namespace SPANNER_CLIENT_NS
} // namespace spanner
} // namespace cloud
} // namespace google
Loading

0 comments on commit e6ff9ac

Please sign in to comment.