Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): add support of using multiplexed session with ReadOnlyTransactions #10269

Merged
merged 20 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,18 +334,20 @@ type ClientConfig struct {
}

type openTelemetryConfig struct {
meterProvider metric.MeterProvider
attributeMap []attribute.KeyValue
otMetricRegistration metric.Registration
openSessionCount metric.Int64ObservableGauge
maxAllowedSessionsCount metric.Int64ObservableGauge
sessionsCount metric.Int64ObservableGauge
maxInUseSessionsCount metric.Int64ObservableGauge
getSessionTimeoutsCount metric.Int64Counter
acquiredSessionsCount metric.Int64Counter
releasedSessionsCount metric.Int64Counter
gfeLatency metric.Int64Histogram
gfeHeaderMissingCount metric.Int64Counter
meterProvider metric.MeterProvider
attributeMap []attribute.KeyValue
attributeMapWithMultiplexed []attribute.KeyValue
attributeMapWithoutMultiplexed []attribute.KeyValue
otMetricRegistration metric.Registration
openSessionCount metric.Int64ObservableGauge
maxAllowedSessionsCount metric.Int64ObservableGauge
sessionsCount metric.Int64ObservableGauge
maxInUseSessionsCount metric.Int64ObservableGauge
getSessionTimeoutsCount metric.Int64Counter
acquiredSessionsCount metric.Int64Counter
releasedSessionsCount metric.Int64Counter
gfeLatency metric.Int64Histogram
gfeHeaderMissingCount metric.Int64Counter
}

func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD, disableRouteToLeader bool) context.Context {
Expand Down
367 changes: 319 additions & 48 deletions spanner/client_test.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion spanner/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/metric v1.24.0
golang.org/x/oauth2 v0.21.0
golang.org/x/sync v0.7.0
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028
google.golang.org/api v0.189.0
google.golang.org/genproto v0.0.0-20240722135656-d784300faade
Expand Down Expand Up @@ -45,7 +46,6 @@ require (
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions spanner/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ var (
// GCLOUD_TESTS_GOLANG_SPANNER_INSTANCE_CONFIG.
instanceConfig = getInstanceConfig()

isMultiplexEnabled = getMultiplexEnableFlag()

dbNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
instanceNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '-', Short: true})
backupIDSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
Expand Down Expand Up @@ -386,6 +388,10 @@ func getInstanceConfig() string {
return os.Getenv("GCLOUD_TESTS_GOLANG_SPANNER_INSTANCE_CONFIG")
}

func getMultiplexEnableFlag() bool {
return os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS") == "true"
}

const (
str1 = "alice"
str2 = "a@example.com"
Expand Down
21 changes: 16 additions & 5 deletions spanner/internal/testutil/inmem_spanner_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,11 @@ func (s *inMemSpannerServer) initDefaults() {
s.transactionCounters = make(map[string]*uint64)
}

func (s *inMemSpannerServer) generateSessionNameLocked(database string) string {
func (s *inMemSpannerServer) generateSessionNameLocked(database string, isMultiplexed bool) string {
s.sessionCounter++
if isMultiplexed {
return fmt.Sprintf("%s/sessions/multiplexed-%d", database, s.sessionCounter)
}
return fmt.Sprintf("%s/sessions/%d", database, s.sessionCounter)
}

Expand Down Expand Up @@ -705,13 +708,21 @@ func (s *inMemSpannerServer) CreateSession(ctx context.Context, req *spannerpb.C
if s.maxSessionsReturnedByServerInTotal > int32(0) && int32(len(s.sessions)) == s.maxSessionsReturnedByServerInTotal {
return nil, gstatus.Error(codes.ResourceExhausted, "No more sessions available")
}
sessionName := s.generateSessionNameLocked(req.Database)
ts := getCurrentTimestamp()
var creatorRole string
var (
creatorRole string
isMultiplexed bool
)
if req.Session != nil {
creatorRole = req.Session.CreatorRole
isMultiplexed = req.Session.Multiplexed
}
sessionName := s.generateSessionNameLocked(req.Database, isMultiplexed)
header := metadata.New(map[string]string{"server-timing": "gfet4t7; dur=123"})
if err := grpc.SendHeader(ctx, header); err != nil {
return nil, gstatus.Errorf(codes.Internal, "unable to send 'server-timing' header")
}
session := &spannerpb.Session{Name: sessionName, CreateTime: ts, ApproximateLastUseTime: ts, CreatorRole: creatorRole}
session := &spannerpb.Session{Name: sessionName, CreateTime: ts, ApproximateLastUseTime: ts, CreatorRole: creatorRole, Multiplexed: isMultiplexed}
s.totalSessionsCreated++
s.sessions[sessionName] = session
return session, nil
Expand Down Expand Up @@ -742,7 +753,7 @@ func (s *inMemSpannerServer) BatchCreateSessions(ctx context.Context, req *spann
}
sessions := make([]*spannerpb.Session, sessionsToCreate)
for i := int32(0); i < sessionsToCreate; i++ {
sessionName := s.generateSessionNameLocked(req.Database)
sessionName := s.generateSessionNameLocked(req.Database, false)
ts := getCurrentTimestamp()
var creatorRole string
if req.SessionTemplate != nil {
Expand Down
111 changes: 111 additions & 0 deletions spanner/kokoro/presubmit.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#!/bin/bash
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License..

# Fail on any error
set -eo pipefail

# Display commands being run
set -x

# cd to project dir on Kokoro instance
cd github/google-cloud-go

go version

export GOCLOUD_HOME=$KOKORO_ARTIFACTS_DIR/google-cloud-go/
export PATH="$GOPATH/bin:$PATH"
export GO111MODULE=on
export GOPROXY=https://proxy.golang.org
export GOOGLE_APPLICATION_CREDENTIALS=${KOKORO_GFILE_DIR}/${GOOGLE_APPLICATION_CREDENTIALS}
# Move code into artifacts dir
mkdir -p $GOCLOUD_HOME
git clone . $GOCLOUD_HOME
cd $GOCLOUD_HOME

try3() { eval "$*" || eval "$*" || eval "$*"; }

# All packages, including +build tools, are fetched.
try3 go mod download

set +e # Run all tests, don't stop after the first failure.
exit_code=0

case $JOB_TYPE in
integration-with-multiplexed-session )
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS=true
echo "running presubmit with multiplexed sessions enabled: $GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"
;;
esac

# Run tests in the current directory and tee output to log file,
# to be pushed to GCS as artifact.
runPresubmitTests() {
if [[ $PWD == *"/internal/"* ]] ||
[[ $PWD == *"/third_party/"* ]]; then
# internal tools only expected to work with latest go version
return
fi

if [ -z ${RUN_INTEGRATION_TESTS} ]; then
GOWORK=off go test -race -v -timeout 15m -short ./... 2>&1 |
tee sponge_log.log
else
GOWORK=off go test -race -v -timeout 45m ./... 2>&1 |
tee sponge_log.log
fi

# Skip running integration tests since Emulator does not support Multiplexed sessions
# Run integration tests against an emulator.
# if [ -f "emulator_test.sh" ]; then
# ./emulator_test.sh
# fi
# Takes the kokoro output log (raw stdout) and creates a machine-parseable
# xUnit XML file.
cat sponge_log.log |
go-junit-report -set-exit-code >sponge_log.xml
# Add the exit codes together so we exit non-zero if any module fails.
exit_code=$(($exit_code + $?))
if [[ $PWD != *"/internal/"* ]]; then
GOWORK=off go build ./...
fi
exit_code=$(($exit_code + $?))
}

SIGNIFICANT_CHANGES=$(git --no-pager diff --name-only origin/main...$KOKORO_GIT_COMMIT_google_cloud_go |
grep -Ev '(\.md$|^\.github|\.json$|\.yaml$)' | xargs dirname | sort -u || true)

if [ -z $SIGNIFICANT_CHANGES ]; then
echo "No changes detected, skipping tests"
exit 0
fi

# CHANGED_DIRS is the list of significant top-level directories that changed,
# but weren't deleted by the current PR. CHANGED_DIRS will be empty when run on main.
CHANGED_DIRS=$(echo "$SIGNIFICANT_CHANGES" | tr ' ' '\n' | cut -d/ -f1 | sort -u |
tr '\n' ' ' | xargs ls -d 2>/dev/null || true)

echo "Running tests only in changed submodules: $CHANGED_DIRS"
for d in $CHANGED_DIRS; do
# run tests only if spanner module is part of $CHANGED_DIRS
if [[ $CHANGED_DIRS =~ spanner ]];then
for i in $(find "$d" -name go.mod); do
pushd $(dirname $i)
runPresubmitTests
popd
done
fi
done

exit $exit_code
33 changes: 28 additions & 5 deletions spanner/oc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ func TestOCStats(t *testing.T) {
func TestOCStats_SessionPool(t *testing.T) {
skipForPGTest(t)
DisableGfeLatencyAndHeaderMissingCountViews()
// expectedValues is a map of expected values for different configurations of
// multiplexed session env="GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS".
expectedValues := map[string]map[bool]string{
"open_session_count": {
false: "25",
// since we are doing only R/O operations and MinOpened=0, we should have only one session.
true: "1",
},
"max_in_use_sessions": {
false: "1",
true: "0",
},
}
for _, test := range []struct {
name string
view *view.View
Expand All @@ -62,7 +75,7 @@ func TestOCStats_SessionPool(t *testing.T) {
"OpenSessionCount",
OpenSessionCountView,
"open_session_count",
"25",
expectedValues["open_session_count"][isMultiplexEnabled],
},
{
"MaxAllowedSessionsCount",
Expand All @@ -74,7 +87,7 @@ func TestOCStats_SessionPool(t *testing.T) {
"MaxInUseSessionsCount",
MaxInUseSessionsCountView,
"max_in_use_sessions",
"1",
expectedValues["max_in_use_sessions"][isMultiplexEnabled],
},
{
"AcquiredSessionsCount",
Expand Down Expand Up @@ -167,11 +180,15 @@ func TestOCStats_SessionPool_SessionsCount(t *testing.T) {
})
client.Single().ReadRow(context.Background(), "Users", Key{"alice"}, []string{"email"})

expectedSpans := 2
if isMultiplexEnabled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would you mind adding a comment here that explains which span will be skipped when multiplexed sessions are enabled?

expectedSpans = 1
}
// Wait for a while to see all exported metrics.
waitFor(t, func() error {
select {
case stat := <-te.Stats:
if len(stat.Rows) >= 2 {
if len(stat.Rows) >= expectedSpans {
return nil
}
}
Expand All @@ -183,7 +200,7 @@ func TestOCStats_SessionPool_SessionsCount(t *testing.T) {
case stat := <-te.Stats:
// There are 4 types for this metric, so we should see at least four
// rows.
if len(stat.Rows) < 2 {
if len(stat.Rows) < expectedSpans {
t.Fatal("No enough metrics are exported")
}
if got, want := stat.View.Measure.Name(), statsPrefix+"num_sessions_in_pool"; got != want {
Expand Down Expand Up @@ -216,6 +233,9 @@ func TestOCStats_SessionPool_SessionsCount(t *testing.T) {
}

func TestOCStats_SessionPool_GetSessionTimeoutsCount(t *testing.T) {
if isMultiplexEnabled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should skip this test when multiplexed sessions are enabled (or otherwise, we should add a separate test for multiplexed sessions). This test verifies the behavior of the client if for whatever reason the creation of session(s) is taking longer than expected. That could also happen with multiplexed sessions.

t.Skip("Skipping test as multiplexed sessions will be available from background thread if enabled as soon as client is created")
}
DisableGfeLatencyAndHeaderMissingCountViews()
te := testutil.NewTestExporter(GetSessionTimeoutsCountView)
defer te.Unregister()
Expand All @@ -227,7 +247,10 @@ func TestOCStats_SessionPool_GetSessionTimeoutsCount(t *testing.T) {
stestutil.SimulatedExecutionTime{
MinimumExecutionTime: 2 * time.Millisecond,
})

server.TestSpanner.PutExecutionTime(stestutil.MethodCreateSession,
stestutil.SimulatedExecutionTime{
MinimumExecutionTime: 2 * time.Millisecond,
})
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
client.Single().ReadRow(ctx, "Users", Key{"alice"}, []string{"email"})
Expand Down
28 changes: 18 additions & 10 deletions spanner/ot_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ const OtInstrumentationScope = "cloud.google.com/go"
const metricsPrefix = "spanner/"

var (
attributeKeyClientID = attribute.Key("client_id")
attributeKeyDatabase = attribute.Key("database")
attributeKeyInstance = attribute.Key("instance_id")
attributeKeyLibVersion = attribute.Key("library_version")
attributeKeyType = attribute.Key("type")
attributeKeyMethod = attribute.Key("grpc_client_method")
attributeKeyClientID = attribute.Key("client_id")
attributeKeyDatabase = attribute.Key("database")
attributeKeyInstance = attribute.Key("instance_id")
attributeKeyLibVersion = attribute.Key("library_version")
attributeKeyType = attribute.Key("type")
attributeKeyMethod = attribute.Key("grpc_client_method")
attributeKeyIsMultiplexed = attribute.Key("is_multiplexed")

attributeNumInUseSessions = attributeKeyType.String("num_in_use_sessions")
attributeNumSessions = attributeKeyType.String("num_sessions")
Expand Down Expand Up @@ -69,6 +70,12 @@ func createOpenTelemetryConfig(mp metric.MeterProvider, logger *log.Logger, sess
}
config.attributeMap = append(config.attributeMap, attributeMap...)

config.attributeMapWithMultiplexed = append(config.attributeMapWithMultiplexed, attributeMap...)
config.attributeMapWithMultiplexed = append(config.attributeMapWithMultiplexed, attributeKeyIsMultiplexed.String("true"))

config.attributeMapWithoutMultiplexed = append(config.attributeMapWithoutMultiplexed, attributeMap...)
config.attributeMapWithoutMultiplexed = append(config.attributeMapWithoutMultiplexed, attributeKeyIsMultiplexed.String("false"))

setOpenTelemetryMetricProvider(config, mp, logger)
return config, nil
}
Expand Down Expand Up @@ -197,13 +204,14 @@ func registerSessionPoolOTMetrics(pool *sessionPool) error {
func(ctx context.Context, o metric.Observer) error {
pool.mu.Lock()
defer pool.mu.Unlock()

if pool.multiplexedSession != nil {
o.ObserveInt64(otConfig.openSessionCount, int64(1), metric.WithAttributes(otConfig.attributeMapWithMultiplexed...))
}
o.ObserveInt64(otConfig.openSessionCount, int64(pool.numOpened), metric.WithAttributes(attributes...))
o.ObserveInt64(otConfig.maxAllowedSessionsCount, int64(pool.SessionPoolConfig.MaxOpened), metric.WithAttributes(attributes...))
o.ObserveInt64(otConfig.sessionsCount, int64(pool.numInUse), metric.WithAttributes(attributesInUseSessions...))
o.ObserveInt64(otConfig.sessionsCount, int64(pool.numInUse), metric.WithAttributes(append(attributesInUseSessions, attribute.Key("is_multiplexed").String("false"))...))
o.ObserveInt64(otConfig.sessionsCount, int64(pool.numSessions), metric.WithAttributes(attributesAvailableSessions...))
o.ObserveInt64(otConfig.maxInUseSessionsCount, int64(pool.maxNumInUse), metric.WithAttributes(attributes...))

o.ObserveInt64(otConfig.maxInUseSessionsCount, int64(pool.maxNumInUse), metric.WithAttributes(append(attributes, attribute.Key("is_multiplexed").String("false"))...))
return nil
},
otConfig.openSessionCount,
Expand Down
14 changes: 11 additions & 3 deletions spanner/pdml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,12 @@ func TestPartitionedUpdate_Aborted(t *testing.T) {
if err != nil {
t.Fatal(err)
}
id1 := gotReqs[2].(*sppb.ExecuteSqlRequest).Transaction.GetId()
id2 := gotReqs[4].(*sppb.ExecuteSqlRequest).Transaction.GetId()
muxCreateBuffer := 0
if isMultiplexEnabled {
muxCreateBuffer = 1
}
id1 := gotReqs[2+muxCreateBuffer].(*sppb.ExecuteSqlRequest).Transaction.GetId()
id2 := gotReqs[4+muxCreateBuffer].(*sppb.ExecuteSqlRequest).Transaction.GetId()
if bytes.Equal(id1, id2) {
t.Errorf("same transaction used twice, expected two different transactions\ngot tx1: %q\ngot tx2: %q", id1, id2)
}
Expand Down Expand Up @@ -196,8 +200,12 @@ func TestPartitionedUpdate_ExcludeTxnFromChangeStreams(t *testing.T) {
&sppb.ExecuteSqlRequest{}}, requests); err != nil {
t.Fatal(err)
}
muxCreateBuffer := 0
if isMultiplexEnabled {
muxCreateBuffer = 1
}

if !requests[1].(*sppb.BeginTransactionRequest).GetOptions().GetExcludeTxnFromChangeStreams() {
if !requests[1+muxCreateBuffer].(*sppb.BeginTransactionRequest).GetOptions().GetExcludeTxnFromChangeStreams() {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}
Loading
Loading