Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

feat: support multiple channels in SessionPool #1063

Merged
merged 5 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion google/cloud/spanner/connection_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ConnectionOptions::ConnectionOptions(
std::shared_ptr<grpc::ChannelCredentials> credentials)
: credentials_(std::move(credentials)),
endpoint_("spanner.googleapis.com"),
num_channels_(1),
num_channels_(4),
user_agent_prefix_(internal::BaseUserAgentPrefix()),
background_threads_factory_([] {
return google::cloud::internal::make_unique<
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/spanner/connection_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ConnectionOptions {
* thus increases the number of operations that can be in progress in
* parallel.
*
* The default value is 1. TODO(#307) increase this.
* The default value is 4.
*/
int num_channels() const { return num_channels_; }

Expand Down
158 changes: 92 additions & 66 deletions google/cloud/spanner/internal/session_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "google/cloud/internal/make_unique.h"
#include "google/cloud/status.h"
#include <algorithm>
#include <random>

namespace google {
namespace cloud {
Expand All @@ -28,39 +29,59 @@ namespace internal {

namespace spanner_proto = ::google::spanner::v1;

namespace {

// Ensure the options have sensible values.
SessionPoolOptions SanitizeOptions(SessionPoolOptions options) {
options.min_sessions = (std::max)(options.min_sessions, 0);
options.max_sessions = (std::max)(options.max_sessions, options.min_sessions);
options.max_sessions = (std::max)(options.max_sessions, 1);
options.max_idle_sessions = (std::max)(options.max_idle_sessions, 0);
options.write_sessions_fraction =
(std::max)(options.write_sessions_fraction, 0.0);
options.write_sessions_fraction =
(std::min)(options.write_sessions_fraction, 1.0);
return options;
}

} // namespace

SessionPool::SessionPool(Database db,
std::vector<std::shared_ptr<SpannerStub>> stubs,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy,
SessionPoolOptions options)
: db_(std::move(db)),
stub_(std::move(stubs[0])), // TODO(#307): use all the stubs
retry_policy_prototype_(std::move(retry_policy)),
backoff_policy_prototype_(std::move(backoff_policy)),
options_(options) {
// Ensure the options have sensible values.
options_.min_sessions = (std::max)(options_.min_sessions, 0);
options_.max_sessions = (std::max)(options_.max_sessions, 1);
if (options_.max_sessions < options_.min_sessions) {
options_.max_sessions = options_.min_sessions;
options_(SanitizeOptions(options)),
next_dissociated_stub_index_(0) {
channels_.reserve(stubs.size());
devbww marked this conversation as resolved.
Show resolved Hide resolved
for (auto& stub : stubs) {
channels_.emplace_back(std::move(stub));
}
options_.max_idle_sessions = (std::max)(options_.max_idle_sessions, 0);
options_.write_sessions_fraction =
(std::max)(options_.write_sessions_fraction, 0.0);
options_.write_sessions_fraction =
(std::min)(options_.write_sessions_fraction, 1.0);
next_channel_for_create_sessions_ = &channels_[0];

// Eagerly initialize the pool with `min_sessions` sessions.
if (options_.min_sessions == 0) {
return;
}
auto sessions = CreateSessions(options_.min_sessions);
if (sessions.ok()) {
std::unique_lock<std::mutex> lk(mu_);
total_sessions_ += static_cast<int>(sessions->size());
sessions_.insert(sessions_.end(),
std::make_move_iterator(sessions->begin()),
std::make_move_iterator(sessions->end()));

// Eagerly initialize the pool with `min_sessions` sessions.
std::unique_lock<std::mutex> lk(mu_);
int num_channels = static_cast<int>(channels_.size());
int sessions_per_channel = options_.min_sessions / num_channels;
// If the number of sessions doesn't divide evenly by the number of channels,
// add one extra session to the first `extra_sessions` channels.
int extra_sessions = options_.min_sessions % num_channels;
for (auto& channel : channels_) {
int num_sessions = sessions_per_channel;
if (extra_sessions > 0) {
++num_sessions;
--extra_sessions;
}
// Just ignore failures; we'll try again when the caller requests a
// session, and we'll be in a position to return an error at that time.
(void)CreateSessions(lk, channel, num_sessions);
}
}

Expand Down Expand Up @@ -105,40 +126,13 @@ StatusOr<SessionHolder> SessionPool::Allocate(bool dissociate_from_pool) {
// subject to the `max_sessions` cap.
int sessions_to_create = (std::min)(
options_.min_sessions + 1, options_.max_sessions - total_sessions_);
create_in_progress_ = true;
lk.unlock();
// TODO(#307) do we need to limit the call rate here?
auto sessions = CreateSessions(sessions_to_create);
lk.lock();
create_in_progress_ = false;
if (!sessions.ok() || sessions->empty()) {
// If the error was anything other than ResourceExhausted, return it.
if (sessions.status().code() != StatusCode::kResourceExhausted) {
return sessions.status();
}
// Fail if we're supposed to, otherwise try again.
if (options_.action_on_exhaustion == ActionOnExhaustion::FAIL) {
return Status(StatusCode::kResourceExhausted, "session pool exhausted");
}
continue;
}

total_sessions_ += static_cast<int>(sessions->size());
// Return one of the sessions and add the rest to the pool.
auto session = std::move(sessions->back());
sessions->pop_back();
if (dissociate_from_pool) {
--total_sessions_;
}
if (!sessions->empty()) {
sessions_.insert(sessions_.end(),
std::make_move_iterator(sessions->begin()),
std::make_move_iterator(sessions->end()));
lk.unlock();
// Wake up everyone that was waiting for a session.
cond_.notify_all();
ChannelInfo& channel = *next_channel_for_create_sessions_;
auto create_status = CreateSessions(lk, channel, sessions_to_create);
if (!create_status.ok()) {
return create_status;
}
return {MakeSessionHolder(std::move(session), dissociate_from_pool)};
// Wake up everyone that was waiting for a session.
cond_.notify_all();
}
}

Expand All @@ -147,8 +141,12 @@ std::shared_ptr<SpannerStub> SessionPool::GetStub(Session const& session) {
if (stub) return stub;

// Sessions that were created for partitioned Reads/Queries do not have
// their own stub, so return one to use.
return stub_;
// their own stub; return one to use by round-robining between the channels.
std::unique_lock<std::mutex> lk(mu_);
int index = next_dissociated_stub_index_;
next_dissociated_stub_index_ =
(next_dissociated_stub_index_ + 1) % static_cast<int>(channels_.size());
return channels_[index].stub;
}

void SessionPool::Release(Session* session) {
Expand All @@ -162,29 +160,57 @@ void SessionPool::Release(Session* session) {
}
}

StatusOr<std::vector<std::unique_ptr<Session>>> SessionPool::CreateSessions(
int num_sessions) {
// Creates `num_sessions` on `channel` and adds them to the pool.
//
// Requires `lk` has locked `mu_` prior to this call. `lk` will be dropped
// while the RPC is in progress and then reacquired.
devbww marked this conversation as resolved.
Show resolved Hide resolved
Status SessionPool::CreateSessions(std::unique_lock<std::mutex>& lk,
ChannelInfo& channel, int num_sessions) {
create_in_progress_ = true;
lk.unlock();
spanner_proto::BatchCreateSessionsRequest request;
request.set_database(db_.FullName());
request.set_session_count(std::int32_t{num_sessions});
const auto& stub = channel.stub;
devbww marked this conversation as resolved.
Show resolved Hide resolved
auto response = RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
true,
[this](grpc::ClientContext& context,
spanner_proto::BatchCreateSessionsRequest const& request) {
return stub_->BatchCreateSessions(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::BatchCreateSessionsRequest const& request) {
return stub->BatchCreateSessions(context, request);
},
request, __func__);
if (!response) {
lk.lock();
create_in_progress_ = false;
if (!response.ok()) {
return response.status();
}
std::vector<std::unique_ptr<Session>> sessions;
sessions.reserve(response->session_size());
// Add sessions to the pool and update counters for `channel` and the pool.
int sessions_created = response->session_size();
channel.session_count += sessions_created;
total_sessions_ += sessions_created;
sessions_.reserve(sessions_.size() + sessions_created);
for (auto& session : *response->mutable_session()) {
sessions.push_back(google::cloud::internal::make_unique<Session>(
std::move(*session.mutable_name()), stub_));
sessions_.push_back(google::cloud::internal::make_unique<Session>(
std::move(*session.mutable_name()), stub));
}
// Shuffle the pool so we distribute returned sessions across channels.
std::shuffle(sessions_.begin(), sessions_.end(),
std::mt19937(std::random_device()()));
if (next_channel_for_create_sessions_ == &channel) {
UpdateNextChannelForCreateSessions();
}
return Status();
}

void SessionPool::UpdateNextChannelForCreateSessions() {
next_channel_for_create_sessions_ = &channels_[0];
for (auto& channel : channels_) {
if (channel.session_count <
next_channel_for_create_sessions_->session_count) {
next_channel_for_create_sessions_ = &channel;
}
}
return {std::move(sessions)};
}

SessionHolder SessionPool::MakeSessionHolder(std::unique_ptr<Session> session,
Expand Down
28 changes: 21 additions & 7 deletions google/cloud/spanner/internal/session_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
std::shared_ptr<SpannerStub> GetStub(Session const& session);

private:
struct ChannelInfo {
explicit ChannelInfo(std::shared_ptr<SpannerStub> stub_param)
: stub(std::move(stub_param)) {}
std::shared_ptr<SpannerStub> const stub;
int session_count = 0;
};

/**
* Release session back to the pool.
*
Expand All @@ -127,23 +134,30 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
*/
void Release(Session* session);

StatusOr<std::vector<std::unique_ptr<Session>>> CreateSessions(
int num_sessions);
Status CreateSessions(std::unique_lock<std::mutex>& lk, ChannelInfo& channel,
int num_sessions); // EXCLUSIVE_LOCKS_REQUIRED(mu_)

SessionHolder MakeSessionHolder(std::unique_ptr<Session> session,
bool dissociate_from_pool);
std::vector<ChannelInfo> CreateChannelInfo(
std::vector<std::shared_ptr<SpannerStub>> stubs);

void UpdateNextChannelForCreateSessions(); // EXCLUSIVE_LOCKS_REQUIRED(mu_)

Database const db_;
std::unique_ptr<RetryPolicy const> retry_policy_prototype_;
std::unique_ptr<BackoffPolicy const> backoff_policy_prototype_;
SessionPoolOptions const options_;

std::mutex mu_;
std::condition_variable cond_;
std::vector<std::unique_ptr<Session>> sessions_; // GUARDED_BY(mu_)
int total_sessions_ = 0; // GUARDED_BY(mu_)
bool create_in_progress_ = false; // GUARDED_BY(mu_)

Database db_;
std::shared_ptr<SpannerStub> stub_;
std::unique_ptr<RetryPolicy const> retry_policy_prototype_;
std::unique_ptr<BackoffPolicy const> backoff_policy_prototype_;
SessionPoolOptions options_;
std::vector<ChannelInfo> channels_; // GUARDED_BY(mu_)
ChannelInfo* next_channel_for_create_sessions_; // GUARDED_BY(mu_)
int next_dissociated_stub_index_; // GUARDED_BY(mu_)
devbww marked this conversation as resolved.
Show resolved Hide resolved
};

} // namespace internal
Expand Down
70 changes: 68 additions & 2 deletions google/cloud/spanner/internal/session_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ using ::testing::ByMove;
using ::testing::HasSubstr;
using ::testing::Invoke;
using ::testing::Return;
using ::testing::UnorderedElementsAre;

namespace spanner_proto = ::google::spanner::v1;

Expand Down Expand Up @@ -175,12 +176,15 @@ TEST(SessionPool, MinSessionsMultipleAllocations) {
EXPECT_CALL(*mock, BatchCreateSessions(_, SessionCountIs(min_sessions + 1)))
.WillOnce(Return(ByMove(MakeSessionsResponse({"s7", "s6", "s5", "s4"}))));
std::vector<SessionHolder> sessions;
std::vector<std::string> session_names;
for (int i = 1; i <= 7; ++i) {
auto session = pool->Allocate();
ASSERT_STATUS_OK(session);
EXPECT_EQ((*session)->session_name(), "s" + std::to_string(i));
session_names.push_back((*session)->session_name());
sessions.push_back(*std::move(session));
}
EXPECT_THAT(session_names,
UnorderedElementsAre("s1", "s2", "s3", "s4", "s5", "s6", "s7"));
}

TEST(SessionPool, MaxSessionsFailOnExhaustion) {
Expand All @@ -197,12 +201,14 @@ TEST(SessionPool, MaxSessionsFailOnExhaustion) {
options.action_on_exhaustion = ActionOnExhaustion::FAIL;
auto pool = MakeSessionPool(db, {mock}, options);
std::vector<SessionHolder> sessions;
std::vector<std::string> session_names;
for (int i = 1; i <= 3; ++i) {
auto session = pool->Allocate();
ASSERT_STATUS_OK(session);
EXPECT_EQ((*session)->session_name(), "s" + std::to_string(i));
session_names.push_back((*session)->session_name());
sessions.push_back(*std::move(session));
}
EXPECT_THAT(session_names, UnorderedElementsAre("s1", "s2", "s3"));
auto session = pool->Allocate();
EXPECT_EQ(session.status().code(), StatusCode::kResourceExhausted);
EXPECT_EQ(session.status().message(), "session pool exhausted");
Expand Down Expand Up @@ -234,6 +240,66 @@ TEST(SessionPool, MaxSessionsBlockUntilRelease) {
t.join();
}

TEST(SessionPool, MultipleChannels) {
auto mock1 = std::make_shared<spanner_testing::MockSpannerStub>();
auto mock2 = std::make_shared<spanner_testing::MockSpannerStub>();
auto db = Database("project", "instance", "database");
EXPECT_CALL(*mock1, BatchCreateSessions(_, _))
.WillOnce(Return(ByMove(MakeSessionsResponse({"c1s1"}))))
.WillOnce(Return(ByMove(MakeSessionsResponse({"c1s2"}))))
.WillOnce(Return(ByMove(MakeSessionsResponse({"c1s3"}))));
EXPECT_CALL(*mock2, BatchCreateSessions(_, _))
.WillOnce(Return(ByMove(MakeSessionsResponse({"c2s1"}))))
.WillOnce(Return(ByMove(MakeSessionsResponse({"c2s2"}))))
.WillOnce(Return(ByMove(MakeSessionsResponse({"c2s3"}))));

auto pool = MakeSessionPool(db, {mock1, mock2});
std::vector<SessionHolder> sessions;
std::vector<std::string> session_names;
for (int i = 1; i <= 6; ++i) {
auto session = pool->Allocate();
ASSERT_STATUS_OK(session);
session_names.push_back((*session)->session_name());
sessions.push_back(*std::move(session));
}
EXPECT_THAT(session_names, UnorderedElementsAre("c1s1", "c1s2", "c1s3",
"c2s1", "c2s2", "c2s3"));
}

TEST(SessionPool, MultipleChannelsPreAllocation) {
auto mock1 = std::make_shared<spanner_testing::MockSpannerStub>();
auto mock2 = std::make_shared<spanner_testing::MockSpannerStub>();
auto mock3 = std::make_shared<spanner_testing::MockSpannerStub>();
auto db = Database("project", "instance", "database");
EXPECT_CALL(*mock1, BatchCreateSessions(_, _))
.WillOnce(Return(
ByMove(MakeSessionsResponse({"c1s1", "c1s2", "c1s3", "c1s4"}))));
EXPECT_CALL(*mock2, BatchCreateSessions(_, _))
.WillOnce(Return(ByMove(MakeSessionsResponse({"c2s1", "c2s2", "c2s3"}))));
EXPECT_CALL(*mock3, BatchCreateSessions(_, _))
.WillOnce(Return(ByMove(MakeSessionsResponse({"c3s1", "c3s2", "c3s3"}))));

SessionPoolOptions options;
options.min_sessions = 10;
options.max_sessions = 10;
options.action_on_exhaustion = ActionOnExhaustion::FAIL;
auto pool = MakeSessionPool(db, {mock1, mock2, mock3}, options);
std::vector<SessionHolder> sessions;
std::vector<std::string> session_names;
for (int i = 1; i <= 10; ++i) {
auto session = pool->Allocate();
ASSERT_STATUS_OK(session);
session_names.push_back((*session)->session_name());
sessions.push_back(*std::move(session));
}
EXPECT_THAT(session_names,
UnorderedElementsAre("c1s1", "c1s2", "c1s3", "c1s4", "c2s1",
"c2s2", "c2s3", "c3s1", "c3s2", "c3s3"));
auto session = pool->Allocate();
EXPECT_EQ(session.status().code(), StatusCode::kResourceExhausted);
EXPECT_EQ(session.status().message(), "session pool exhausted");
}

TEST(SessionPool, GetStubForStublessSession) {
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
auto db = Database("project", "instance", "database");
Expand Down