Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

feat: support multiple channels in SessionPool #1063

Merged
merged 5 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion google/cloud/spanner/connection_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ConnectionOptions::ConnectionOptions(
std::shared_ptr<grpc::ChannelCredentials> credentials)
: credentials_(std::move(credentials)),
endpoint_("spanner.googleapis.com"),
num_channels_(1),
num_channels_(4),
user_agent_prefix_(internal::BaseUserAgentPrefix()),
background_threads_factory_([] {
return google::cloud::internal::make_unique<
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/spanner/connection_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ConnectionOptions {
* thus increases the number of operations that can be in progress in
* parallel.
*
* The default value is 1. TODO(#307) increase this.
* The default value is 4.
*/
int num_channels() const { return num_channels_; }

Expand Down
170 changes: 101 additions & 69 deletions google/cloud/spanner/internal/session_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "google/cloud/internal/make_unique.h"
#include "google/cloud/status.h"
#include <algorithm>
#include <random>

namespace google {
namespace cloud {
Expand All @@ -28,39 +29,65 @@ namespace internal {

namespace spanner_proto = ::google::spanner::v1;

namespace {

// Ensure the options have sensible values.
SessionPoolOptions SanitizeOptions(SessionPoolOptions options) {
options.min_sessions = (std::max)(options.min_sessions, 0);
options.max_sessions = (std::max)(options.max_sessions, options.min_sessions);
options.max_sessions = (std::max)(options.max_sessions, 1);
options.max_idle_sessions = (std::max)(options.max_idle_sessions, 0);
options.write_sessions_fraction =
(std::max)(options.write_sessions_fraction, 0.0);
options.write_sessions_fraction =
(std::min)(options.write_sessions_fraction, 1.0);
return options;
}

} // namespace

SessionPool::SessionPool(Database db,
std::vector<std::shared_ptr<SpannerStub>> stubs,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy,
SessionPoolOptions options)
: db_(std::move(db)),
stub_(std::move(stubs[0])), // TODO(#307): use all the stubs
retry_policy_prototype_(std::move(retry_policy)),
backoff_policy_prototype_(std::move(backoff_policy)),
options_(options) {
// Ensure the options have sensible values.
options_.min_sessions = (std::max)(options_.min_sessions, 0);
options_.max_sessions = (std::max)(options_.max_sessions, 1);
if (options_.max_sessions < options_.min_sessions) {
options_.max_sessions = options_.min_sessions;
options_(SanitizeOptions(options)) {
if (stubs.empty()) {
google::cloud::internal::ThrowInvalidArgument(
"SessionPool requires a non-empty set of stubs");
}
options_.max_idle_sessions = (std::max)(options_.max_idle_sessions, 0);
options_.write_sessions_fraction =
(std::max)(options_.write_sessions_fraction, 0.0);
options_.write_sessions_fraction =
(std::min)(options_.write_sessions_fraction, 1.0);

// Eagerly initialize the pool with `min_sessions` sessions.
channels_.reserve(stubs.size());
devbww marked this conversation as resolved.
Show resolved Hide resolved
for (auto& stub : stubs) {
channels_.emplace_back(std::move(stub));
}
// `channels_` is never resized after this point.
next_dissociated_stub_channel_ = channels_.begin();
next_channel_for_create_sessions_ = channels_.begin();

if (options_.min_sessions == 0) {
return;
}
auto sessions = CreateSessions(options_.min_sessions);
if (sessions.ok()) {
std::unique_lock<std::mutex> lk(mu_);
total_sessions_ += static_cast<int>(sessions->size());
sessions_.insert(sessions_.end(),
std::make_move_iterator(sessions->begin()),
std::make_move_iterator(sessions->end()));

// Eagerly initialize the pool with `min_sessions` sessions.
std::unique_lock<std::mutex> lk(mu_);
int num_channels = static_cast<int>(channels_.size());
int sessions_per_channel = options_.min_sessions / num_channels;
// If the number of sessions doesn't divide evenly by the number of channels,
// add one extra session to the first `extra_sessions` channels.
int extra_sessions = options_.min_sessions % num_channels;
for (auto& channel : channels_) {
int num_sessions = sessions_per_channel;
if (extra_sessions > 0) {
++num_sessions;
--extra_sessions;
}
// Just ignore failures; we'll try again when the caller requests a
// session, and we'll be in a position to return an error at that time.
(void)CreateSessions(lk, channel, num_sessions);
}
}

Expand Down Expand Up @@ -105,50 +132,28 @@ StatusOr<SessionHolder> SessionPool::Allocate(bool dissociate_from_pool) {
// subject to the `max_sessions` cap.
int sessions_to_create = (std::min)(
options_.min_sessions + 1, options_.max_sessions - total_sessions_);
create_in_progress_ = true;
lk.unlock();
// TODO(#307) do we need to limit the call rate here?
auto sessions = CreateSessions(sessions_to_create);
lk.lock();
create_in_progress_ = false;
if (!sessions.ok() || sessions->empty()) {
// If the error was anything other than ResourceExhausted, return it.
if (sessions.status().code() != StatusCode::kResourceExhausted) {
return sessions.status();
}
// Fail if we're supposed to, otherwise try again.
if (options_.action_on_exhaustion == ActionOnExhaustion::FAIL) {
return Status(StatusCode::kResourceExhausted, "session pool exhausted");
}
continue;
}

total_sessions_ += static_cast<int>(sessions->size());
// Return one of the sessions and add the rest to the pool.
auto session = std::move(sessions->back());
sessions->pop_back();
if (dissociate_from_pool) {
--total_sessions_;
ChannelInfo& channel = *next_channel_for_create_sessions_;
auto create_status = CreateSessions(lk, channel, sessions_to_create);
if (!create_status.ok()) {
return create_status;
}
if (!sessions->empty()) {
sessions_.insert(sessions_.end(),
std::make_move_iterator(sessions->begin()),
std::make_move_iterator(sessions->end()));
lk.unlock();
// Wake up everyone that was waiting for a session.
cond_.notify_all();
}
return {MakeSessionHolder(std::move(session), dissociate_from_pool)};
// Wake up everyone that was waiting for a session.
cond_.notify_all();
}
}

std::shared_ptr<SpannerStub> SessionPool::GetStub(Session const& session) {
std::shared_ptr<SpannerStub> stub = session.stub();
if (stub) return stub;

// Sessions that were created for partitioned Reads/Queries do not have
// their own stub, so return one to use.
return stub_;
if (!stub) {
// Sessions that were created for partitioned Reads/Queries do not have
// their own stub; return one to use by round-robining between the channels.
std::unique_lock<std::mutex> lk(mu_);
stub = next_dissociated_stub_channel_->stub;
if (++next_dissociated_stub_channel_ == channels_.end()) {
next_dissociated_stub_channel_ = channels_.begin();
}
}
return stub;
}

void SessionPool::Release(Session* session) {
Expand All @@ -162,29 +167,56 @@ void SessionPool::Release(Session* session) {
}
}

StatusOr<std::vector<std::unique_ptr<Session>>> SessionPool::CreateSessions(
int num_sessions) {
// Creates `num_sessions` on `channel` and adds them to the pool.
//
// Requires `lk` has locked `mu_` prior to this call. `lk` will be dropped
// while the RPC is in progress and then reacquired.
devbww marked this conversation as resolved.
Show resolved Hide resolved
Status SessionPool::CreateSessions(std::unique_lock<std::mutex>& lk,
ChannelInfo& channel, int num_sessions) {
create_in_progress_ = true;
lk.unlock();
spanner_proto::BatchCreateSessionsRequest request;
request.set_database(db_.FullName());
request.set_session_count(std::int32_t{num_sessions});
const auto& stub = channel.stub;
devbww marked this conversation as resolved.
Show resolved Hide resolved
auto response = RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
true,
[this](grpc::ClientContext& context,
spanner_proto::BatchCreateSessionsRequest const& request) {
return stub_->BatchCreateSessions(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::BatchCreateSessionsRequest const& request) {
return stub->BatchCreateSessions(context, request);
},
request, __func__);
if (!response) {
lk.lock();
create_in_progress_ = false;
if (!response.ok()) {
return response.status();
}
std::vector<std::unique_ptr<Session>> sessions;
sessions.reserve(response->session_size());
// Add sessions to the pool and update counters for `channel` and the pool.
int sessions_created = response->session_size();
channel.session_count += sessions_created;
total_sessions_ += sessions_created;
sessions_.reserve(sessions_.size() + sessions_created);
for (auto& session : *response->mutable_session()) {
sessions.push_back(google::cloud::internal::make_unique<Session>(
std::move(*session.mutable_name()), stub_));
sessions_.push_back(google::cloud::internal::make_unique<Session>(
std::move(*session.mutable_name()), stub));
}
// Shuffle the pool so we distribute returned sessions across channels.
std::shuffle(sessions_.begin(), sessions_.end(),
std::mt19937(std::random_device()()));
if (&*next_channel_for_create_sessions_ == &channel) {
UpdateNextChannelForCreateSessions();
}
return Status();
}

void SessionPool::UpdateNextChannelForCreateSessions() {
next_channel_for_create_sessions_ = channels_.begin();
for (auto it = channels_.begin(); it != channels_.end(); ++it) {
if (it->session_count < next_channel_for_create_sessions_->session_count) {
next_channel_for_create_sessions_ = it;
}
}
return {std::move(sessions)};
}

SessionHolder SessionPool::MakeSessionHolder(std::unique_ptr<Session> session,
Expand Down
33 changes: 25 additions & 8 deletions google/cloud/spanner/internal/session_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
*
* The parameters allow the `SessionPool` to make remote calls needed to
* manage the pool, and to associate `Session`s with the stubs used to
* create them.
* create them. `stubs` must not be empty.
*/
SessionPool(Database db, std::vector<std::shared_ptr<SpannerStub>> stubs,
std::unique_ptr<RetryPolicy> retry_policy,
Expand All @@ -118,6 +118,13 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
std::shared_ptr<SpannerStub> GetStub(Session const& session);

private:
struct ChannelInfo {
explicit ChannelInfo(std::shared_ptr<SpannerStub> stub_param)
: stub(std::move(stub_param)) {}
std::shared_ptr<SpannerStub> const stub;
int session_count = 0;
};

/**
* Release session back to the pool.
*
Expand All @@ -127,23 +134,33 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
*/
void Release(Session* session);

StatusOr<std::vector<std::unique_ptr<Session>>> CreateSessions(
int num_sessions);
Status CreateSessions(std::unique_lock<std::mutex>& lk, ChannelInfo& channel,
int num_sessions); // EXCLUSIVE_LOCKS_REQUIRED(mu_)

SessionHolder MakeSessionHolder(std::unique_ptr<Session> session,
bool dissociate_from_pool);

void UpdateNextChannelForCreateSessions(); // EXCLUSIVE_LOCKS_REQUIRED(mu_)

Database const db_;
std::unique_ptr<RetryPolicy const> retry_policy_prototype_;
std::unique_ptr<BackoffPolicy const> backoff_policy_prototype_;
SessionPoolOptions const options_;

std::mutex mu_;
std::condition_variable cond_;
std::vector<std::unique_ptr<Session>> sessions_; // GUARDED_BY(mu_)
int total_sessions_ = 0; // GUARDED_BY(mu_)
bool create_in_progress_ = false; // GUARDED_BY(mu_)

Database db_;
std::shared_ptr<SpannerStub> stub_;
std::unique_ptr<RetryPolicy const> retry_policy_prototype_;
std::unique_ptr<BackoffPolicy const> backoff_policy_prototype_;
SessionPoolOptions options_;
// `channels_` is guaranteed to be non-empty and will not be resized after
// the constructor runs (so the iterators are guaranteed to always be valid).
// TODO(#566) replace `vector` with `absl::FixedArray` when available.
std::vector<ChannelInfo> channels_; // GUARDED_BY(mu_)
std::vector<ChannelInfo>::iterator
next_channel_for_create_sessions_; // GUARDED_BY(mu_)
std::vector<ChannelInfo>::iterator
next_dissociated_stub_channel_; // GUARDED_BY(mu_)
};

} // namespace internal
Expand Down
Loading