Skip to content

Commit

Permalink
sqlliveness: session expiry callbacks must be async
Browse files Browse the repository at this point in the history
Previously, the sqlliveness session expiry callbacks were called in the
heartbeatLoop thread which executed the renew/expiry logic. This could
cause deadlock since session expiration is used to trigger a shutdown of
the SQL instance via `stopper.Stop()`. The stopper would wait for all
async tasks to quiesce, but the `heartbeatLoop` would continue, waiting
for the callbacks to finish running. In addition, this task would hold a
lock on `l.mu` while waiting for the callbacks to run causing other
threads to wait if they needed to retrieve the session.

This change invokes each callback in its own goroutine to prevent this
deadlock.

Resolves cockroachdb#71292

Release note: None
  • Loading branch information
dhartunian authored and ericharmeling committed Oct 20, 2021
1 parent 9b2c9db commit 383baf6
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/sql/sqlliveness/slinstance/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/slstorage",
"//pkg/testutils",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/sqlliveness/slinstance/slinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func (s *session) Expiration() hlc.Timestamp {
return s.mu.exp
}

// RegisterCallbackForSessionExpiry adds the given function to the list
// of functions called after a session expires. The functions are
// executed in a goroutine.
func (s *session) RegisterCallbackForSessionExpiry(sExp func(context.Context)) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -86,7 +89,7 @@ func (s *session) invokeSessionExpiryCallbacks(ctx context.Context) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, callback := range s.mu.sessionExpiryCallbacks {
callback(ctx)
go callback(ctx)
}
}

Expand Down
43 changes: 43 additions & 0 deletions pkg/sql/sqlliveness/slinstance/slinstance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package slinstance_test

import (
"context"
"errors"
"testing"
"time"

Expand All @@ -20,13 +21,55 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/stretchr/testify/require"
)

func TestSQLInstance_invokesSessionExpiryCallbacksInGoroutine(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()

t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
mClock := hlc.NewManualClock(t0.UnixNano())
clock := hlc.NewClock(mClock.UnixNano, time.Nanosecond)
settings := cluster.MakeTestingClusterSettings()

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

session, err := sqlInstance.Session(ctx)
require.NoError(t, err)

// Simulating what happens in `instanceprovider.shutdownSQLInstance`
session.RegisterCallbackForSessionExpiry(func(ctx context.Context) {
stopper.Stop(ctx)
})

// Removing the session will run the callback above, which will have to
// wait for async tasks to stop. The async tasks include the
// sqlInstance `heartbeatLoop` function.
require.NoError(t, fakeStorage.Delete(ctx, session.ID()))

// Clock needs to advance for expiry we trigger below to be valid
mClock.Increment(int64(slinstance.DefaultTTL.Get(&settings.SV)))

testutils.SucceedsSoon(t, func() error {
select {
case <-stopper.IsStopped():
return nil
default:
return errors.New("not stopped")
}
})
}

func TestSQLInstance(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit 383baf6

Please sign in to comment.