From 2e24f7e2fa45b6fead0d5d3d2e7a281e25cbcca5 Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Fri, 19 Jul 2024 10:55:37 +0530 Subject: [PATCH] incorporate changes --- spanner/client_test.go | 2 +- spanner/integration_test.go | 9 ------- spanner/kokoro/presubmit.sh | 1 + spanner/session.go | 26 +++++++++---------- .../opentelemetry/test/ot_metrics_test.go | 9 ++++--- spanner/test/opentelemetry/test/test_util.go | 6 ++--- 6 files changed, 23 insertions(+), 30 deletions(-) diff --git a/spanner/client_test.go b/spanner/client_test.go index 6d775b917966..9400cf4e81b4 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -5208,7 +5208,7 @@ func TestClient_CloseWithUnresponsiveBackend(t *testing.T) { server.TestSpanner.Freeze() defer server.TestSpanner.Unfreeze() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) defer cancel() sp.close(ctx) diff --git a/spanner/integration_test.go b/spanner/integration_test.go index ef7e13622622..2e1d215a2c4d 100644 --- a/spanner/integration_test.go +++ b/spanner/integration_test.go @@ -91,8 +91,6 @@ var ( // GCLOUD_TESTS_GOLANG_SPANNER_INSTANCE_CONFIG. instanceConfig = getInstanceConfig() - isMultiplexEnabled = getMultiplexEnableFlag() - dbNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true}) instanceNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '-', Short: true}) backupIDSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true}) @@ -388,13 +386,6 @@ func getInstanceConfig() string { return os.Getenv("GCLOUD_TESTS_GOLANG_SPANNER_INSTANCE_CONFIG") } -func getMultiplexEnableFlag() string { - if os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS") == "true" { - return "true" - } - return "false" -} - const ( str1 = "alice" str2 = "a@example.com" diff --git a/spanner/kokoro/presubmit.sh b/spanner/kokoro/presubmit.sh index 2ae5fc1b66d7..701f85a778cb 100755 --- a/spanner/kokoro/presubmit.sh +++ b/spanner/kokoro/presubmit.sh @@ -28,6 +28,7 @@ export GOCLOUD_HOME=$KOKORO_ARTIFACTS_DIR/google-cloud-go/ export PATH="$GOPATH/bin:$PATH" export GO111MODULE=on export GOPROXY=https://proxy.golang.org +export GOOGLE_APPLICATION_CREDENTIALS=${KOKORO_GFILE_DIR}/${GOOGLE_APPLICATION_CREDENTIALS} # Move code into artifacts dir mkdir -p $GOCLOUD_HOME git clone . $GOCLOUD_HOME diff --git a/spanner/session.go b/spanner/session.go index 02bbfdd26ca2..ee90a1150b6f 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -123,9 +123,7 @@ func (sh *sessionHandle) recycle() { tracked := sh.trackedSessionHandle s := sh.session sh.session = nil - if sh.client != nil { - sh.client = nil - } + sh.client = nil sh.trackedSessionHandle = nil sh.checkoutTime = time.Time{} sh.lastUseTime = time.Time{} @@ -160,7 +158,7 @@ func (sh *sessionHandle) getClient() *vkit.Client { if sh.session == nil { return nil } - if sh.session.isMultiplexed { + if sh.client != nil { // Use the gRPC connection from the session handle return sh.client } @@ -200,9 +198,7 @@ func (sh *sessionHandle) destroy() { } tracked := sh.trackedSessionHandle sh.session = nil - if sh.client != nil { - sh.client = nil - } + sh.client = nil sh.trackedSessionHandle = nil sh.checkoutTime = time.Time{} sh.lastUseTime = time.Time{} @@ -595,7 +591,7 @@ type sessionPool struct { // idleList caches idle session IDs. Session IDs in this list can be // allocated for use. idleList list.List - // multiplexedSessions contains the multiplexed sessions + // multiplexedSession contains the multiplexed session multiplexedSession *session // mayGetSession is for broadcasting that session retrival/creation may // proceed. @@ -606,8 +602,8 @@ type sessionPool struct { // sessionCreationError is the last error that occurred during session // creation and is propagated to any waiters waiting for a session. sessionCreationError error - // multiplexedSessionCreationError is the last error that occurred during multiplexed session - // creation and is propagated to any waiters waiting for a session. + // multiplexedSessionCreationError is the error that occurred during multiplexed session + // creation for the first time and is propagated to any waiters waiting for a session. multiplexedSessionCreationError error // numOpened is the total number of open sessions from the session pool. numOpened uint64 @@ -924,6 +920,13 @@ func (p *sessionPool) sessionCreationFailed(err error, numSessions int32, isMult p.mu.Lock() defer p.mu.Unlock() if isMultiplexed { + // Ignore the error if multiplexed session already present + if p.multiplexedSession != nil { + p.multiplexedSessionCreationError = nil + close(p.mayGetMultiplexedSession) + p.mayGetMultiplexedSession = make(chan struct{}) + return + } p.multiplexedSessionCreationError = err close(p.mayGetMultiplexedSession) p.mayGetMultiplexedSession = make(chan struct{}) @@ -1002,9 +1005,6 @@ var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canc func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) { sh = &sessionHandle{session: s, checkoutTime: time.Now(), lastUseTime: time.Now()} if s.isMultiplexed { - // TODO: handle 1-qps style traffic, we can return the same client which was used for session creation in that case. - - // allocate a new client for multiplexed session requests using round robin channel selection. p.mu.Lock() p.sc.mu.Lock() clientOpt := option.WithGRPCConn(p.sc.connPool.Conn()) diff --git a/spanner/test/opentelemetry/test/ot_metrics_test.go b/spanner/test/opentelemetry/test/ot_metrics_test.go index e96aa2b4bd9c..105b5a8e367a 100644 --- a/spanner/test/opentelemetry/test/ot_metrics_test.go +++ b/spanner/test/opentelemetry/test/ot_metrics_test.go @@ -21,6 +21,7 @@ package test import ( "context" "errors" + "strconv" "testing" "time" @@ -76,7 +77,7 @@ func TestOTMetrics_SessionPool(t *testing.T) { expectedOpenSessionCount := int64(25) expectedMaxInUseWithMultiplexed := int64(0) expectedMaxInUse := int64(1) - if isMultiplexEnabled == "true" { + if isMultiplexEnabled { expectedOpenSessionCount = 0 expectedMaxInUse = 0 expectedMaxInUseWithMultiplexed = 1 @@ -146,7 +147,7 @@ func TestOTMetrics_SessionPool(t *testing.T) { Data: metricdata.Sum[int64]{ DataPoints: []metricdata.DataPoint[int64]{ { - Attributes: attribute.NewSet(append(getAttributes(client.ClientID()), attribute.Key("is_multiplexed").String(isMultiplexEnabled))...), + Attributes: attribute.NewSet(append(getAttributes(client.ClientID()), attribute.Key("is_multiplexed").String(strconv.FormatBool(isMultiplexEnabled)))...), Value: 1, }, }, @@ -164,7 +165,7 @@ func TestOTMetrics_SessionPool(t *testing.T) { Data: metricdata.Sum[int64]{ DataPoints: []metricdata.DataPoint[int64]{ { - Attributes: attribute.NewSet(append(getAttributes(client.ClientID()), attribute.Key("is_multiplexed").String(isMultiplexEnabled))...), + Attributes: attribute.NewSet(append(getAttributes(client.ClientID()), attribute.Key("is_multiplexed").String(strconv.FormatBool(isMultiplexEnabled)))...), Value: 1, }, }, @@ -258,7 +259,7 @@ func TestOTMetrics_SessionPool_GetSessionTimeoutsCount(t *testing.T) { Data: metricdata.Sum[int64]{ DataPoints: []metricdata.DataPoint[int64]{ { - Attributes: attribute.NewSet(append(getAttributes(client.ClientID()), attribute.Key("is_multiplexed").String(isMultiplexEnabled))...), + Attributes: attribute.NewSet(append(getAttributes(client.ClientID()), attribute.Key("is_multiplexed").String(strconv.FormatBool(isMultiplexEnabled)))...), Value: 1, }, }, diff --git a/spanner/test/opentelemetry/test/test_util.go b/spanner/test/opentelemetry/test/test_util.go index 094c2227949a..65c44e7d3f3e 100644 --- a/spanner/test/opentelemetry/test/test_util.go +++ b/spanner/test/opentelemetry/test/test_util.go @@ -34,11 +34,11 @@ var ( isMultiplexEnabled = getMultiplexEnableFlag() ) -func getMultiplexEnableFlag() string { +func getMultiplexEnableFlag() bool { if os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS") == "true" { - return "true" + return true } - return "false" + return false } func setupMockedTestServerWithConfig(t *testing.T, config spanner.ClientConfig) (server *stestutil.MockedSpannerInMemTestServer, client *spanner.Client, teardown func()) {