Skip to content

Commit

Permalink
feat(spanner): support unified credentials (#7824)
Browse files Browse the repository at this point in the history
Fixes #6464.
  • Loading branch information
devbww authored Jan 6, 2022
1 parent 6d7bceb commit df5eb37
Show file tree
Hide file tree
Showing 16 changed files with 771 additions and 26 deletions.
3 changes: 3 additions & 0 deletions google/cloud/spanner/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ add_library(
internal/session.h
internal/session_pool.cc
internal/session_pool.h
internal/spanner_auth.cc
internal/spanner_auth.h
internal/spanner_stub.cc
internal/spanner_stub.h
internal/status_utils.cc
Expand Down Expand Up @@ -325,6 +327,7 @@ function (spanner_client_define_tests)
internal/partial_result_set_resume_test.cc
internal/partial_result_set_source_test.cc
internal/session_pool_test.cc
internal/spanner_auth_test.cc
internal/spanner_stub_test.cc
internal/status_utils_test.cc
internal/transaction_impl_test.cc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "google/cloud/internal/absl_str_cat_quiet.h"
#include "google/cloud/internal/getenv.h"
#include "google/cloud/internal/random.h"
#include "google/cloud/internal/unified_grpc_credentials.h"
#include "google/cloud/testing_util/timer.h"
#include "absl/memory/memory.h"
#include "absl/time/civil_time.h"
Expand Down Expand Up @@ -305,11 +306,13 @@ class ExperimentImpl {
auto opts = google::cloud::internal::MakeOptions(
spanner::ConnectionOptions().set_num_channels(num_channels));
opts = spanner_internal::DefaultOptions(std::move(opts));
auto auth = google::cloud::internal::CreateAuthenticationStrategy(
opts.get<google::cloud::GrpcCredentialOption>());
std::vector<std::shared_ptr<spanner_internal::SpannerStub>> stubs;
stubs.reserve(num_channels);
for (int channel_id = 0; channel_id < num_channels; ++channel_id) {
stubs.push_back(
spanner_internal::CreateDefaultSpannerStub(db, opts, channel_id));
stubs.push_back(spanner_internal::CreateDefaultSpannerStub(db, auth, opts,
channel_id));
}
return stubs;
}
Expand Down
8 changes: 5 additions & 3 deletions google/cloud/spanner/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,16 @@ std::shared_ptr<spanner::Connection> MakeConnection(spanner::Database const& db,
opts, __func__);
opts = spanner_internal::DefaultOptions(std::move(opts));

auto background = internal::MakeBackgroundThreadsFactory(opts)();
auto auth = internal::CreateAuthenticationStrategy(background->cq(), opts);
std::vector<std::shared_ptr<spanner_internal::SpannerStub>> stubs(
opts.get<GrpcNumChannelsOption>());
int id = 0;
std::generate(stubs.begin(), stubs.end(), [&id, db, opts] {
return spanner_internal::CreateDefaultSpannerStub(db, opts, id++);
std::generate(stubs.begin(), stubs.end(), [&db, &auth, &opts, &id] {
return spanner_internal::CreateDefaultSpannerStub(db, auth, opts, id++);
});
return std::make_shared<spanner_internal::ConnectionImpl>(
std::move(db), std::move(stubs), opts);
std::move(db), std::move(background), std::move(stubs), opts);
}

std::shared_ptr<Connection> MakeConnection(
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/spanner/google_cloud_cpp_spanner.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ google_cloud_cpp_spanner_hdrs = [
"internal/partial_result_set_source.h",
"internal/session.h",
"internal/session_pool.h",
"internal/spanner_auth.h",
"internal/spanner_stub.h",
"internal/status_utils.h",
"internal/transaction_impl.h",
Expand Down Expand Up @@ -157,6 +158,7 @@ google_cloud_cpp_spanner_srcs = [
"internal/partial_result_set_source.cc",
"internal/session.cc",
"internal/session_pool.cc",
"internal/spanner_auth.cc",
"internal/spanner_stub.cc",
"internal/status_utils.cc",
"internal/transaction_impl.cc",
Expand Down
16 changes: 16 additions & 0 deletions google/cloud/spanner/integration_tests/client_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,22 @@ TEST_F(ClientIntegrationTest, SpannerStatistics) {
}
}

/// @test Verify the use of unified credentials.
TEST_F(ClientIntegrationTest, UnifiedCredentials) {
auto options =
Options{}.set<UnifiedCredentialsOption>(MakeGoogleDefaultCredentials());
if (emulator_) {
options = Options{}
.set<UnifiedCredentialsOption>(MakeAccessTokenCredentials(
"test-only-invalid", std::chrono::system_clock::now() +
std::chrono::minutes(15)))
.set<internal::UseInsecureChannelOption>(true);
}
// Reconnect to the database using the new credentials.
client_ = absl::make_unique<Client>(MakeConnection(GetDatabase(), options));
ASSERT_NO_FATAL_FAILURE(InsertTwoSingers());
}

/// @test Verify the backwards compatibility `v1` namespace still exists.
TEST_F(ClientIntegrationTest, BackwardsCompatibility) {
auto connection = ::google::cloud::spanner::v1::MakeConnection(GetDatabase());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ TEST_F(SessionPoolIntegrationTest, SessionAsyncCRUD) {
std::make_shared<spanner::ExponentialBackoffPolicy>(
std::chrono::seconds(10), std::chrono::minutes(1), 2.0));

auto stub = CreateDefaultSpannerStub(db, opts, /*channel_id=*/0);
auto stub = CreateDefaultSpannerStub(
db, internal::CreateAuthenticationStrategy(cq, opts), opts,
/*channel_id=*/0);
auto session_pool = MakeSessionPool(db, {stub}, cq, opts);

// Make an asynchronous request, but immediately block until the response
Expand Down
8 changes: 4 additions & 4 deletions google/cloud/spanner/internal/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ Status MissingTransactionStatus(std::string const& operation) {
operation + ")");
}

ConnectionImpl::ConnectionImpl(spanner::Database db,
std::vector<std::shared_ptr<SpannerStub>> stubs,
Options const& opts)
ConnectionImpl::ConnectionImpl(
spanner::Database db, std::unique_ptr<BackgroundThreads> background_threads,
std::vector<std::shared_ptr<SpannerStub>> stubs, Options const& opts)
: db_(std::move(db)),
retry_policy_prototype_(
opts.get<spanner::SpannerRetryPolicyOption>()->clone()),
backoff_policy_prototype_(
opts.get<spanner::SpannerBackoffPolicyOption>()->clone()),
background_threads_(internal::MakeBackgroundThreadsFactory(opts)()),
background_threads_(std::move(background_threads)),
session_pool_(MakeSessionPool(db_, std::move(stubs),
background_threads_->cq(), opts)),
rpc_stream_tracing_enabled_(internal::Contains(
Expand Down
1 change: 1 addition & 0 deletions google/cloud/spanner/internal/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
class ConnectionImpl : public spanner::Connection {
public:
ConnectionImpl(spanner::Database db,
std::unique_ptr<BackgroundThreads> background_threads,
std::vector<std::shared_ptr<SpannerStub>> stubs,
Options const& opts);

Expand Down
5 changes: 3 additions & 2 deletions google/cloud/spanner/internal/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,9 @@ std::shared_ptr<ConnectionImpl> MakeConnectionImpl(
// No actual credential needed for unit tests
opts.set<GrpcCredentialOption>(grpc::InsecureChannelCredentials());
opts = spanner_internal::DefaultOptions(std::move(opts));
return std::make_shared<ConnectionImpl>(std::move(db), std::move(stubs),
std::move(opts));
auto background = internal::MakeBackgroundThreadsFactory(opts)();
return std::make_shared<ConnectionImpl>(std::move(db), std::move(background),
std::move(stubs), std::move(opts));
}

// Create a `Connection` suitable for use in tests that continue retrying
Expand Down
201 changes: 201 additions & 0 deletions google/cloud/spanner/internal/spanner_auth.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/spanner/internal/spanner_auth.h"

namespace google {
namespace cloud {
namespace spanner_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

SpannerAuth::SpannerAuth(
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
std::shared_ptr<SpannerStub> child)
: auth_(std::move(auth)), child_(std::move(child)) {}

StatusOr<google::spanner::v1::Session> SpannerAuth::CreateSession(
grpc::ClientContext& context,
google::spanner::v1::CreateSessionRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->CreateSession(context, request);
}

StatusOr<google::spanner::v1::BatchCreateSessionsResponse>
SpannerAuth::BatchCreateSessions(
grpc::ClientContext& context,
google::spanner::v1::BatchCreateSessionsRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->BatchCreateSessions(context, request);
}

StatusOr<google::spanner::v1::Session> SpannerAuth::GetSession(
grpc::ClientContext& context,
google::spanner::v1::GetSessionRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->GetSession(context, request);
}

StatusOr<google::spanner::v1::ListSessionsResponse> SpannerAuth::ListSessions(
grpc::ClientContext& context,
google::spanner::v1::ListSessionsRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->ListSessions(context, request);
}

Status SpannerAuth::DeleteSession(
grpc::ClientContext& context,
google::spanner::v1::DeleteSessionRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->DeleteSession(context, request);
}

StatusOr<google::spanner::v1::ResultSet> SpannerAuth::ExecuteSql(
grpc::ClientContext& context,
google::spanner::v1::ExecuteSqlRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->ExecuteSql(context, request);
}

std::unique_ptr<
grpc::ClientReaderInterface<google::spanner::v1::PartialResultSet>>
SpannerAuth::ExecuteStreamingSql(
grpc::ClientContext& context,
google::spanner::v1::ExecuteSqlRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) {
return absl::make_unique<ClientReaderInterfaceError>(std::move(status));
}
return child_->ExecuteStreamingSql(context, request);
}

StatusOr<google::spanner::v1::ExecuteBatchDmlResponse>
SpannerAuth::ExecuteBatchDml(
grpc::ClientContext& context,
google::spanner::v1::ExecuteBatchDmlRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->ExecuteBatchDml(context, request);
}

std::unique_ptr<
grpc::ClientReaderInterface<google::spanner::v1::PartialResultSet>>
SpannerAuth::StreamingRead(grpc::ClientContext& context,
google::spanner::v1::ReadRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) {
return absl::make_unique<ClientReaderInterfaceError>(std::move(status));
}
return child_->StreamingRead(context, request);
}

StatusOr<google::spanner::v1::Transaction> SpannerAuth::BeginTransaction(
grpc::ClientContext& context,
google::spanner::v1::BeginTransactionRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->BeginTransaction(context, request);
}

StatusOr<google::spanner::v1::CommitResponse> SpannerAuth::Commit(
grpc::ClientContext& context,
google::spanner::v1::CommitRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->Commit(context, request);
}

Status SpannerAuth::Rollback(
grpc::ClientContext& context,
google::spanner::v1::RollbackRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->Rollback(context, request);
}

StatusOr<google::spanner::v1::PartitionResponse> SpannerAuth::PartitionQuery(
grpc::ClientContext& context,
google::spanner::v1::PartitionQueryRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->PartitionQuery(context, request);
}

StatusOr<google::spanner::v1::PartitionResponse> SpannerAuth::PartitionRead(
grpc::ClientContext& context,
google::spanner::v1::PartitionReadRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->PartitionRead(context, request);
}

future<StatusOr<google::spanner::v1::BatchCreateSessionsResponse>>
SpannerAuth::AsyncBatchCreateSessions(
CompletionQueue& cq, std::unique_ptr<grpc::ClientContext> context,
google::spanner::v1::BatchCreateSessionsRequest const& request) {
using ReturnType = StatusOr<google::spanner::v1::BatchCreateSessionsResponse>;
auto child = child_;
return auth_->AsyncConfigureContext(std::move(context))
.then([cq, child,
request](future<StatusOr<std::unique_ptr<grpc::ClientContext>>>
f) mutable {
auto context = f.get();
if (!context) {
return make_ready_future(ReturnType(std::move(context).status()));
}
return child->AsyncBatchCreateSessions(cq, *std::move(context),
request);
});
}

future<Status> SpannerAuth::AsyncDeleteSession(
CompletionQueue& cq, std::unique_ptr<grpc::ClientContext> context,
google::spanner::v1::DeleteSessionRequest const& request) {
auto child = child_;
return auth_->AsyncConfigureContext(std::move(context))
.then([cq, child,
request](future<StatusOr<std::unique_ptr<grpc::ClientContext>>>
f) mutable {
auto context = f.get();
if (!context) return make_ready_future(std::move(context).status());
return child->AsyncDeleteSession(cq, *std::move(context), request);
});
}

future<StatusOr<google::spanner::v1::ResultSet>> SpannerAuth::AsyncExecuteSql(
CompletionQueue& cq, std::unique_ptr<grpc::ClientContext> context,
google::spanner::v1::ExecuteSqlRequest const& request) {
using ReturnType = StatusOr<google::spanner::v1::ResultSet>;
auto child = child_;
return auth_->AsyncConfigureContext(std::move(context))
.then([cq, child,
request](future<StatusOr<std::unique_ptr<grpc::ClientContext>>>
f) mutable {
auto context = f.get();
if (!context) {
return make_ready_future(ReturnType(std::move(context).status()));
}
return child->AsyncExecuteSql(cq, *std::move(context), request);
});
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace spanner_internal
} // namespace cloud
} // namespace google
Loading

0 comments on commit df5eb37

Please sign in to comment.