Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

feat: support multiple channels in SessionPool #1063

Merged
merged 5 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion google/cloud/spanner/connection_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ConnectionOptions::ConnectionOptions(
std::shared_ptr<grpc::ChannelCredentials> credentials)
: credentials_(std::move(credentials)),
endpoint_("spanner.googleapis.com"),
num_channels_(1),
num_channels_(4),
user_agent_prefix_(internal::BaseUserAgentPrefix()),
background_threads_factory_([] {
return google::cloud::internal::make_unique<
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/spanner/connection_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ConnectionOptions {
* thus increases the number of operations that can be in progress in
* parallel.
*
* The default value is 1. TODO(#307) increase this.
* The default value is 4.
*/
int num_channels() const { return num_channels_; }

Expand Down
164 changes: 95 additions & 69 deletions google/cloud/spanner/internal/session_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "google/cloud/internal/make_unique.h"
#include "google/cloud/status.h"
#include <algorithm>
#include <random>

namespace google {
namespace cloud {
Expand All @@ -28,39 +29,59 @@ namespace internal {

namespace spanner_proto = ::google::spanner::v1;

namespace {

// Ensure the options have sensible values.
SessionPoolOptions SanitizeOptions(SessionPoolOptions options) {
options.min_sessions = (std::max)(options.min_sessions, 0);
options.max_sessions = (std::max)(options.max_sessions, options.min_sessions);
options.max_sessions = (std::max)(options.max_sessions, 1);
options.max_idle_sessions = (std::max)(options.max_idle_sessions, 0);
options.write_sessions_fraction =
(std::max)(options.write_sessions_fraction, 0.0);
options.write_sessions_fraction =
(std::min)(options.write_sessions_fraction, 1.0);
return options;
}

} // namespace

SessionPool::SessionPool(Database db,
std::vector<std::shared_ptr<SpannerStub>> stubs,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy,
SessionPoolOptions options)
: db_(std::move(db)),
stub_(std::move(stubs[0])), // TODO(#307): use all the stubs
retry_policy_prototype_(std::move(retry_policy)),
backoff_policy_prototype_(std::move(backoff_policy)),
options_(options) {
// Ensure the options have sensible values.
options_.min_sessions = (std::max)(options_.min_sessions, 0);
options_.max_sessions = (std::max)(options_.max_sessions, 1);
if (options_.max_sessions < options_.min_sessions) {
options_.max_sessions = options_.min_sessions;
options_(SanitizeOptions(options)) {
channels_.reserve(stubs.size());
devbww marked this conversation as resolved.
Show resolved Hide resolved
for (auto& stub : stubs) {
channels_.emplace_back(std::move(stub));
}
options_.max_idle_sessions = (std::max)(options_.max_idle_sessions, 0);
options_.write_sessions_fraction =
(std::max)(options_.write_sessions_fraction, 0.0);
options_.write_sessions_fraction =
(std::min)(options_.write_sessions_fraction, 1.0);
next_dissociated_stub_channel_ = channels_.begin();
next_channel_for_create_sessions_ = channels_.begin();

// Eagerly initialize the pool with `min_sessions` sessions.
if (options_.min_sessions == 0) {
return;
}
auto sessions = CreateSessions(options_.min_sessions);
if (sessions.ok()) {
std::unique_lock<std::mutex> lk(mu_);
total_sessions_ += static_cast<int>(sessions->size());
sessions_.insert(sessions_.end(),
std::make_move_iterator(sessions->begin()),
std::make_move_iterator(sessions->end()));

// Eagerly initialize the pool with `min_sessions` sessions.
std::unique_lock<std::mutex> lk(mu_);
int num_channels = static_cast<int>(channels_.size());
int sessions_per_channel = options_.min_sessions / num_channels;
// If the number of sessions doesn't divide evenly by the number of channels,
// add one extra session to the first `extra_sessions` channels.
int extra_sessions = options_.min_sessions % num_channels;
for (auto& channel : channels_) {
int num_sessions = sessions_per_channel;
if (extra_sessions > 0) {
++num_sessions;
--extra_sessions;
}
// Just ignore failures; we'll try again when the caller requests a
// session, and we'll be in a position to return an error at that time.
(void)CreateSessions(lk, channel, num_sessions);
}
}

Expand Down Expand Up @@ -105,50 +126,28 @@ StatusOr<SessionHolder> SessionPool::Allocate(bool dissociate_from_pool) {
// subject to the `max_sessions` cap.
int sessions_to_create = (std::min)(
options_.min_sessions + 1, options_.max_sessions - total_sessions_);
create_in_progress_ = true;
lk.unlock();
// TODO(#307) do we need to limit the call rate here?
auto sessions = CreateSessions(sessions_to_create);
lk.lock();
create_in_progress_ = false;
if (!sessions.ok() || sessions->empty()) {
// If the error was anything other than ResourceExhausted, return it.
if (sessions.status().code() != StatusCode::kResourceExhausted) {
return sessions.status();
}
// Fail if we're supposed to, otherwise try again.
if (options_.action_on_exhaustion == ActionOnExhaustion::FAIL) {
return Status(StatusCode::kResourceExhausted, "session pool exhausted");
}
continue;
}

total_sessions_ += static_cast<int>(sessions->size());
// Return one of the sessions and add the rest to the pool.
auto session = std::move(sessions->back());
sessions->pop_back();
if (dissociate_from_pool) {
--total_sessions_;
}
if (!sessions->empty()) {
sessions_.insert(sessions_.end(),
std::make_move_iterator(sessions->begin()),
std::make_move_iterator(sessions->end()));
lk.unlock();
// Wake up everyone that was waiting for a session.
cond_.notify_all();
ChannelInfo& channel = *next_channel_for_create_sessions_;
auto create_status = CreateSessions(lk, channel, sessions_to_create);
if (!create_status.ok()) {
return create_status;
}
return {MakeSessionHolder(std::move(session), dissociate_from_pool)};
// Wake up everyone that was waiting for a session.
cond_.notify_all();
}
}

std::shared_ptr<SpannerStub> SessionPool::GetStub(Session const& session) {
std::shared_ptr<SpannerStub> stub = session.stub();
if (stub) return stub;

// Sessions that were created for partitioned Reads/Queries do not have
// their own stub, so return one to use.
return stub_;
if (!stub) {
// Sessions that were created for partitioned Reads/Queries do not have
// their own stub; return one to use by round-robining between the channels.
std::unique_lock<std::mutex> lk(mu_);
stub = next_dissociated_stub_channel_->stub;
if (++next_dissociated_stub_channel_ == channels_.end()) {
next_dissociated_stub_channel_ = channels_.begin();
}
}
return stub;
}

void SessionPool::Release(Session* session) {
Expand All @@ -162,29 +161,56 @@ void SessionPool::Release(Session* session) {
}
}

StatusOr<std::vector<std::unique_ptr<Session>>> SessionPool::CreateSessions(
int num_sessions) {
// Creates `num_sessions` on `channel` and adds them to the pool.
//
// Requires `lk` has locked `mu_` prior to this call. `lk` will be dropped
// while the RPC is in progress and then reacquired.
devbww marked this conversation as resolved.
Show resolved Hide resolved
Status SessionPool::CreateSessions(std::unique_lock<std::mutex>& lk,
ChannelInfo& channel, int num_sessions) {
create_in_progress_ = true;
lk.unlock();
spanner_proto::BatchCreateSessionsRequest request;
request.set_database(db_.FullName());
request.set_session_count(std::int32_t{num_sessions});
const auto& stub = channel.stub;
devbww marked this conversation as resolved.
Show resolved Hide resolved
auto response = RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
true,
[this](grpc::ClientContext& context,
spanner_proto::BatchCreateSessionsRequest const& request) {
return stub_->BatchCreateSessions(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::BatchCreateSessionsRequest const& request) {
return stub->BatchCreateSessions(context, request);
},
request, __func__);
if (!response) {
lk.lock();
create_in_progress_ = false;
if (!response.ok()) {
return response.status();
}
std::vector<std::unique_ptr<Session>> sessions;
sessions.reserve(response->session_size());
// Add sessions to the pool and update counters for `channel` and the pool.
int sessions_created = response->session_size();
channel.session_count += sessions_created;
total_sessions_ += sessions_created;
sessions_.reserve(sessions_.size() + sessions_created);
for (auto& session : *response->mutable_session()) {
sessions.push_back(google::cloud::internal::make_unique<Session>(
std::move(*session.mutable_name()), stub_));
sessions_.push_back(google::cloud::internal::make_unique<Session>(
std::move(*session.mutable_name()), stub));
}
// Shuffle the pool so we distribute returned sessions across channels.
std::shuffle(sessions_.begin(), sessions_.end(),
std::mt19937(std::random_device()()));
if (&*next_channel_for_create_sessions_ == &channel) {
UpdateNextChannelForCreateSessions();
}
return Status();
}

void SessionPool::UpdateNextChannelForCreateSessions() {
next_channel_for_create_sessions_ = channels_.begin();
for (auto it = channels_.begin(); it != channels_.end(); ++it) {
if (it->session_count < next_channel_for_create_sessions_->session_count) {
next_channel_for_create_sessions_ = it;
}
}
return {std::move(sessions)};
}

SessionHolder SessionPool::MakeSessionHolder(std::unique_ptr<Session> session,
Expand Down
32 changes: 24 additions & 8 deletions google/cloud/spanner/internal/session_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
*
* The parameters allow the `SessionPool` to make remote calls needed to
* manage the pool, and to associate `Session`s with the stubs used to
* create them.
* create them. `stubs` must not be empty.
*/
SessionPool(Database db, std::vector<std::shared_ptr<SpannerStub>> stubs,
std::unique_ptr<RetryPolicy> retry_policy,
Expand All @@ -118,6 +118,13 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
std::shared_ptr<SpannerStub> GetStub(Session const& session);

private:
struct ChannelInfo {
explicit ChannelInfo(std::shared_ptr<SpannerStub> stub_param)
: stub(std::move(stub_param)) {}
std::shared_ptr<SpannerStub> const stub;
int session_count = 0;
};

/**
* Release session back to the pool.
*
Expand All @@ -127,23 +134,32 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
*/
void Release(Session* session);

StatusOr<std::vector<std::unique_ptr<Session>>> CreateSessions(
int num_sessions);
Status CreateSessions(std::unique_lock<std::mutex>& lk, ChannelInfo& channel,
int num_sessions); // EXCLUSIVE_LOCKS_REQUIRED(mu_)

SessionHolder MakeSessionHolder(std::unique_ptr<Session> session,
bool dissociate_from_pool);
std::vector<ChannelInfo> CreateChannelInfo(
std::vector<std::shared_ptr<SpannerStub>> stubs);

void UpdateNextChannelForCreateSessions(); // EXCLUSIVE_LOCKS_REQUIRED(mu_)

Database const db_;
std::unique_ptr<RetryPolicy const> retry_policy_prototype_;
std::unique_ptr<BackoffPolicy const> backoff_policy_prototype_;
SessionPoolOptions const options_;

std::mutex mu_;
std::condition_variable cond_;
std::vector<std::unique_ptr<Session>> sessions_; // GUARDED_BY(mu_)
int total_sessions_ = 0; // GUARDED_BY(mu_)
bool create_in_progress_ = false; // GUARDED_BY(mu_)

Database db_;
std::shared_ptr<SpannerStub> stub_;
std::unique_ptr<RetryPolicy const> retry_policy_prototype_;
std::unique_ptr<BackoffPolicy const> backoff_policy_prototype_;
SessionPoolOptions options_;
std::vector<ChannelInfo> channels_; // GUARDED_BY(mu_)
std::vector<ChannelInfo>::iterator
next_channel_for_create_sessions_; // GUARDED_BY(mu_)
std::vector<ChannelInfo>::iterator
next_dissociated_stub_channel_; // GUARDED_BY(mu_)
};

} // namespace internal
Expand Down
Loading