Skip to content

Commit

Permalink
incorporate changes
Browse files Browse the repository at this point in the history
  • Loading branch information
rahul2393 committed Jul 19, 2024
1 parent d7bb22a commit 2e24f7e
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 30 deletions.
2 changes: 1 addition & 1 deletion spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5208,7 +5208,7 @@ func TestClient_CloseWithUnresponsiveBackend(t *testing.T) {
server.TestSpanner.Freeze()
defer server.TestSpanner.Unfreeze()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()
sp.close(ctx)

Expand Down
9 changes: 0 additions & 9 deletions spanner/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ var (
// GCLOUD_TESTS_GOLANG_SPANNER_INSTANCE_CONFIG.
instanceConfig = getInstanceConfig()

isMultiplexEnabled = getMultiplexEnableFlag()

dbNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
instanceNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '-', Short: true})
backupIDSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
Expand Down Expand Up @@ -388,13 +386,6 @@ func getInstanceConfig() string {
return os.Getenv("GCLOUD_TESTS_GOLANG_SPANNER_INSTANCE_CONFIG")
}

func getMultiplexEnableFlag() string {
if os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS") == "true" {
return "true"
}
return "false"
}

const (
str1 = "alice"
str2 = "a@example.com"
Expand Down
1 change: 1 addition & 0 deletions spanner/kokoro/presubmit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export GOCLOUD_HOME=$KOKORO_ARTIFACTS_DIR/google-cloud-go/
export PATH="$GOPATH/bin:$PATH"
export GO111MODULE=on
export GOPROXY=https://proxy.golang.org
export GOOGLE_APPLICATION_CREDENTIALS=${KOKORO_GFILE_DIR}/${GOOGLE_APPLICATION_CREDENTIALS}
# Move code into artifacts dir
mkdir -p $GOCLOUD_HOME
git clone . $GOCLOUD_HOME
Expand Down
26 changes: 13 additions & 13 deletions spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ func (sh *sessionHandle) recycle() {
tracked := sh.trackedSessionHandle
s := sh.session
sh.session = nil
if sh.client != nil {
sh.client = nil
}
sh.client = nil
sh.trackedSessionHandle = nil
sh.checkoutTime = time.Time{}
sh.lastUseTime = time.Time{}
Expand Down Expand Up @@ -160,7 +158,7 @@ func (sh *sessionHandle) getClient() *vkit.Client {
if sh.session == nil {
return nil
}
if sh.session.isMultiplexed {
if sh.client != nil {
// Use the gRPC connection from the session handle
return sh.client
}
Expand Down Expand Up @@ -200,9 +198,7 @@ func (sh *sessionHandle) destroy() {
}
tracked := sh.trackedSessionHandle
sh.session = nil
if sh.client != nil {
sh.client = nil
}
sh.client = nil
sh.trackedSessionHandle = nil
sh.checkoutTime = time.Time{}
sh.lastUseTime = time.Time{}
Expand Down Expand Up @@ -595,7 +591,7 @@ type sessionPool struct {
// idleList caches idle session IDs. Session IDs in this list can be
// allocated for use.
idleList list.List
// multiplexedSessions contains the multiplexed sessions
// multiplexedSession contains the multiplexed session
multiplexedSession *session
// mayGetSession is for broadcasting that session retrival/creation may
// proceed.
Expand All @@ -606,8 +602,8 @@ type sessionPool struct {
// sessionCreationError is the last error that occurred during session
// creation and is propagated to any waiters waiting for a session.
sessionCreationError error
// multiplexedSessionCreationError is the last error that occurred during multiplexed session
// creation and is propagated to any waiters waiting for a session.
// multiplexedSessionCreationError is the error that occurred during multiplexed session
// creation for the first time and is propagated to any waiters waiting for a session.
multiplexedSessionCreationError error
// numOpened is the total number of open sessions from the session pool.
numOpened uint64
Expand Down Expand Up @@ -924,6 +920,13 @@ func (p *sessionPool) sessionCreationFailed(err error, numSessions int32, isMult
p.mu.Lock()
defer p.mu.Unlock()
if isMultiplexed {
// Ignore the error if multiplexed session already present
if p.multiplexedSession != nil {
p.multiplexedSessionCreationError = nil
close(p.mayGetMultiplexedSession)
p.mayGetMultiplexedSession = make(chan struct{})
return
}
p.multiplexedSessionCreationError = err
close(p.mayGetMultiplexedSession)
p.mayGetMultiplexedSession = make(chan struct{})
Expand Down Expand Up @@ -1002,9 +1005,6 @@ var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canc
func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) {
sh = &sessionHandle{session: s, checkoutTime: time.Now(), lastUseTime: time.Now()}
if s.isMultiplexed {
// TODO: handle 1-qps style traffic, we can return the same client which was used for session creation in that case.

// allocate a new client for multiplexed session requests using round robin channel selection.
p.mu.Lock()
p.sc.mu.Lock()
clientOpt := option.WithGRPCConn(p.sc.connPool.Conn())
Expand Down
9 changes: 5 additions & 4 deletions spanner/test/opentelemetry/test/ot_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package test
import (
"context"
"errors"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -76,7 +77,7 @@ func TestOTMetrics_SessionPool(t *testing.T) {
expectedOpenSessionCount := int64(25)
expectedMaxInUseWithMultiplexed := int64(0)
expectedMaxInUse := int64(1)
if isMultiplexEnabled == "true" {
if isMultiplexEnabled {
expectedOpenSessionCount = 0
expectedMaxInUse = 0
expectedMaxInUseWithMultiplexed = 1
Expand Down Expand Up @@ -146,7 +147,7 @@ func TestOTMetrics_SessionPool(t *testing.T) {
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(append(getAttributes(client.ClientID()), attribute.Key("is_multiplexed").String(isMultiplexEnabled))...),
Attributes: attribute.NewSet(append(getAttributes(client.ClientID()), attribute.Key("is_multiplexed").String(strconv.FormatBool(isMultiplexEnabled)))...),
Value: 1,
},
},
Expand All @@ -164,7 +165,7 @@ func TestOTMetrics_SessionPool(t *testing.T) {
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(append(getAttributes(client.ClientID()), attribute.Key("is_multiplexed").String(isMultiplexEnabled))...),
Attributes: attribute.NewSet(append(getAttributes(client.ClientID()), attribute.Key("is_multiplexed").String(strconv.FormatBool(isMultiplexEnabled)))...),
Value: 1,
},
},
Expand Down Expand Up @@ -258,7 +259,7 @@ func TestOTMetrics_SessionPool_GetSessionTimeoutsCount(t *testing.T) {
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(append(getAttributes(client.ClientID()), attribute.Key("is_multiplexed").String(isMultiplexEnabled))...),
Attributes: attribute.NewSet(append(getAttributes(client.ClientID()), attribute.Key("is_multiplexed").String(strconv.FormatBool(isMultiplexEnabled)))...),
Value: 1,
},
},
Expand Down
6 changes: 3 additions & 3 deletions spanner/test/opentelemetry/test/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ var (
isMultiplexEnabled = getMultiplexEnableFlag()
)

func getMultiplexEnableFlag() string {
func getMultiplexEnableFlag() bool {
if os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS") == "true" {
return "true"
return true
}
return "false"
return false
}

func setupMockedTestServerWithConfig(t *testing.T, config spanner.ClientConfig) (server *stestutil.MockedSpannerInMemTestServer, client *spanner.Client, teardown func()) {
Expand Down

0 comments on commit 2e24f7e

Please sign in to comment.