From 04dafe9df231679ebc37a20372c9c4012cbc3cfd Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Wed, 13 Jul 2016 17:11:29 -0400 Subject: [PATCH] storage: deflake TestStoreMetrics This was a tough one. Several problems were addressed, all variations on the same theme: - DistSenders in multiTestContext use a shared global stopper, but they may be called on goroutines which belong to a Store-level task. If that Store wants to quiesce and the DistSender can't finish its task because that same Store is already in quiescing mode, deadlocks occurred. The unfortunate solution is plugging in a channel which draws from two Stoppers, one of which may be quiesced and replaced multiple times. - Additional deadlocks were caused due to multiTestContext's transport, which acquired a read lock that was formerly held in write mode throughout mtc.stopStore() (circumvented by dropping the lock there while quiescing). - verifyStats was stopping individual Stores to perform computations without moving parts. Stopping individual Stores is tough when their tasks may be stuck on other Stores but can't complete while their own Store is already quiescing. Instead, verifyStats stops *all stores* simultaneously, regardless of which Store is actively being investigated. Prior to these changes, failed in a few hundred to a few thousand iters (depending on how many of the above were partially addressed): ``` $ make stressrace PKG=./storage TESTS=TestStoreMetrics TESTTIMEOUT=10s STRESSFLAGS='-maxfails 1 -stderr -p 128 -timeout 15m' 15784 runs so far, 0 failures, over 8m0s ``` Fixes #7678. --- docs/RFCS/drain_modes.md | 2 +- server/server.go | 2 +- storage/client_metrics_test.go | 114 ++++++++++++++++++++------------- storage/client_test.go | 70 ++++++++++++++++++-- storage/replica_range_lease.go | 2 +- 5 files changed, 139 insertions(+), 51 deletions(-) diff --git a/docs/RFCS/drain_modes.md b/docs/RFCS/drain_modes.md index 9713537d4930..989e1327e783 100644 --- a/docs/RFCS/drain_modes.md +++ b/docs/RFCS/drain_modes.md @@ -25,7 +25,7 @@ running to `drain-clients` to both. In our Stopper usage, we've taken to fairly ruthlessly shutting down service of many components once `(*Stopper).Quiesce()` is called. In code, this manifests -itself through copious use of the `(*Stopper).ShouldDrain()` channel even when +itself through copious use of the `(*Stopper).ShouldQuiesce()` channel even when not running inside of a task (or running long-running operations inside tasks). This was motivated mainly by endless amounts of test failures around leaked diff --git a/server/server.go b/server/server.go index 66110bc67b0a..7bd19fbee37a 100644 --- a/server/server.go +++ b/server/server.go @@ -144,7 +144,7 @@ func NewServer(ctx Context, stopper *stop.Stopper) (*Server, error) { s.stopper, ) - // A custom RetryOptions is created which uses stopper.ShouldDrain() as + // A custom RetryOptions is created which uses stopper.ShouldQuiesce() as // the Closer. This prevents infinite retry loops from occurring during // graceful server shutdown // diff --git a/storage/client_metrics_test.go b/storage/client_metrics_test.go index 39f1f57d852c..b9a15aab8bc6 100644 --- a/storage/client_metrics_test.go +++ b/storage/client_metrics_test.go @@ -17,6 +17,7 @@ package storage_test import ( + "sync" "testing" "github.com/cockroachdb/cockroach/internal/client" @@ -54,56 +55,83 @@ func checkCounter(t *testing.T, s *storage.Store, key string, e int64) { } } -func verifyStats(t *testing.T, mtc *multiTestContext, storeIdx int) { - // Get the current store at storeIdx. - s := mtc.stores[storeIdx] - // Stop the store at the given index, while keeping the reference to the - // store object. ComputeMVCCStats() still works on a stopped store (it needs +func verifyStats(t *testing.T, mtc *multiTestContext, storeIdxSlice ...int) { + var stores []*storage.Store + var wg sync.WaitGroup + + mtc.mu.RLock() + numStores := len(mtc.stores) + // We need to stop the stores at the given indexes, while keeping the reference to the + // store objects. ComputeMVCCStats() still works on a stopped store (it needs // only the engine, which is still open), and the most recent stats are still // available on the stopped store object; however, no further information can // be committed to the store while it is stopped, preventing any races during // verification. - mtc.stopStore(storeIdx) + for _, storeIdx := range storeIdxSlice { + stores = append(stores, mtc.stores[storeIdx]) + } + mtc.mu.RUnlock() + + wg.Add(numStores) + // We actually stop *all* of the Stores. Stopping only a few is riddled + // with deadlocks since operations can span nodes, but stoppers don't + // know about this - taking all of them down at the same time is the + // only sane way of guaranteeing that nothing interesting happens, at + // least when bringing down the nodes jeopardizes majorities. + for i := 0; i < numStores; i++ { + go func(i int) { + defer wg.Done() + mtc.stopStore(i) + }(i) + } + wg.Wait() + + for _, s := range stores { + fatalf := func(msg string, args ...interface{}) { + prefix := s.Ident.String() + ": " + t.Fatalf(prefix+msg, args...) + } + // Compute real total MVCC statistics from store. + realStats, err := s.ComputeMVCCStats() + if err != nil { + t.Fatal(err) + } - // Compute real total MVCC statistics from store. - realStats, err := s.ComputeMVCCStats() - if err != nil { - t.Fatal(err) - } + // Sanity regression check for bug #4624: ensure intent count is zero. + if a := realStats.IntentCount; a != 0 { + fatalf("expected intent count to be zero, was %d", a) + } - // Sanity regression check for bug #4624: ensure intent count is zero. - if a := realStats.IntentCount; a != 0 { - t.Fatalf("Expected intent count to be zero, was %d", a) - } + // Sanity check: LiveBytes is not zero (ensures we don't have + // zeroed out structures.) + if liveBytes := getGauge(t, s, "livebytes"); liveBytes == 0 { + fatalf("expected livebytes to be non-zero, was zero") + } - // Sanity check: LiveBytes is not zero (ensures we don't have - // zeroed out structures.) - if liveBytes := getGauge(t, s, "livebytes"); liveBytes == 0 { - t.Fatal("Expected livebytes to be non-zero, was zero") + // Ensure that real MVCC stats match computed stats. + checkGauge(t, s, "livebytes", realStats.LiveBytes) + checkGauge(t, s, "keybytes", realStats.KeyBytes) + checkGauge(t, s, "valbytes", realStats.ValBytes) + checkGauge(t, s, "intentbytes", realStats.IntentBytes) + checkGauge(t, s, "livecount", realStats.LiveCount) + checkGauge(t, s, "keycount", realStats.KeyCount) + checkGauge(t, s, "valcount", realStats.ValCount) + checkGauge(t, s, "intentcount", realStats.IntentCount) + checkGauge(t, s, "sysbytes", realStats.SysBytes) + checkGauge(t, s, "syscount", realStats.SysCount) + // "Ages" will be different depending on how much time has passed. Even with + // a manual clock, this can be an issue in tests. Therefore, we do not + // verify them in this test. + + if t.Failed() { + fatalf("verifyStats failed, aborting test.") + } } - // Ensure that real MVCC stats match computed stats. - checkGauge(t, s, "livebytes", realStats.LiveBytes) - checkGauge(t, s, "keybytes", realStats.KeyBytes) - checkGauge(t, s, "valbytes", realStats.ValBytes) - checkGauge(t, s, "intentbytes", realStats.IntentBytes) - checkGauge(t, s, "livecount", realStats.LiveCount) - checkGauge(t, s, "keycount", realStats.KeyCount) - checkGauge(t, s, "valcount", realStats.ValCount) - checkGauge(t, s, "intentcount", realStats.IntentCount) - checkGauge(t, s, "sysbytes", realStats.SysBytes) - checkGauge(t, s, "syscount", realStats.SysCount) - // "Ages" will be different depending on how much time has passed. Even with - // a manual clock, this can be an issue in tests. Therefore, we do not - // verify them in this test. - - if t.Failed() { - t.Log(errors.Errorf("verifyStats failed, aborting test.")) - t.FailNow() + // Restart all Stores. + for i := 0; i < numStores; i++ { + mtc.restartStore(i) } - - // Restart the store at the provided index. - mtc.restartStore(storeIdx) } func verifyRocksDBStats(t *testing.T, s *storage.Store) { @@ -182,8 +210,7 @@ func TestStoreMetrics(t *testing.T) { mtc.waitForValues(roachpb.Key("z"), []int64{5, 5, 5}) // Verify all stats on store 0 and 1 after addition. - verifyStats(t, mtc, 0) - verifyStats(t, mtc, 1) + verifyStats(t, mtc, 0, 1) // Create a transaction statement that fails, but will add an entry to the // sequence cache. Regression test for #4969. @@ -211,8 +238,7 @@ func TestStoreMetrics(t *testing.T) { checkCounter(t, mtc.stores[1], "replicas", 1) // Verify all stats on store0 and store1 after range is removed. - verifyStats(t, mtc, 0) - verifyStats(t, mtc, 1) + verifyStats(t, mtc, 0, 1) verifyRocksDBStats(t, mtc.stores[0]) verifyRocksDBStats(t, mtc.stores[1]) diff --git a/storage/client_test.go b/storage/client_test.go index dd426d795d6a..3b44d1e4a465 100644 --- a/storage/client_test.go +++ b/storage/client_test.go @@ -537,9 +537,57 @@ func (m *multiTestContext) addStore() { ), ) } + + stopper := stop.NewStopper() if len(m.dbs) <= idx { retryOpts := base.DefaultRetryOptions() - retryOpts.Closer = m.clientStopper.ShouldQuiesce() + retryOpts.Closer = func() chan struct{} { + ch := make(chan struct{}) + // Feed the channel periodically as long as our "real" stopper + // is quiescing. Close it when clientStopper closes (which means + // we're done). This is awkward, but necessary. We allow stopping + // and restarting stores, but their DistSender needs to respect + // that as well (since they may be involved in Store tasks). + if m.clientStopper.RunAsyncTask(func() { + feed := func() bool { // true when closed (and we're done) + select { + case ch <- struct{}{}: + case <-m.clientStopper.ShouldQuiesce(): + if ch != nil { + close(ch) + ch = nil + } + return true + } + return false + } + + for { + var grabbedStopper *stop.Stopper + // While the stopper is nil, this store is down. + for grabbedStopper != nil { + m.mu.RLock() + if len(m.stoppers) <= idx { + grabbedStopper = m.stoppers[idx] + } + m.mu.RUnlock() + if feed() { + return + } + } + select { + case <-m.clientStopper.ShouldQuiesce(): + feed() // to make sure chan is closed + return + case <-grabbedStopper.ShouldQuiesce(): + feed() + } + } + }) != nil { + close(ch) + } + return ch + }() m.distSenders = append(m.distSenders, kv.NewDistSender(&kv.DistSenderContext{ Clock: m.clock, @@ -552,7 +600,6 @@ func (m *multiTestContext) addStore() { m.dbs = append(m.dbs, client.NewDB(sender)) } - stopper := stop.NewStopper() ctx := m.makeContext(idx) nodeID := roachpb.NodeID(idx + 1) store := storage.NewStore(ctx, eng, &roachpb.NodeDescriptor{NodeID: nodeID}) @@ -631,11 +678,26 @@ func (m *multiTestContext) gossipNodeDesc(g *gossip.Gossip, nodeID roachpb.NodeI // StopStore stops a store but leaves the engine intact. // All stopped stores must be restarted before multiTestContext.Stop is called. func (m *multiTestContext) stopStore(i int) { + m.mu.Lock() + // Stopping when multiple stoppers (which are not aware of each other) is + // messy. + // multiTestContextKVTransport needs a read lock to access its stopper and + // it's already in a task, so if we simply grabbed a write lock here while + // stopping we could deadlock (see #7678). + // So we initiate stopping under a write lock, and then release the lock + // until that has finished. + stopper := m.stoppers[i] + m.stoppers[i] = nil + go func() { + stopper.Quiesce() + }() + <-stopper.ShouldQuiesce() + m.mu.Unlock() + stopper.Stop() + m.mu.Lock() defer m.mu.Unlock() m.senders[i].RemoveStore(m.stores[i]) - m.stoppers[i].Stop() - m.stoppers[i] = nil m.stores[i] = nil } diff --git a/storage/replica_range_lease.go b/storage/replica_range_lease.go index 34c9a2777473..f3da1acd3ece 100644 --- a/storage/replica_range_lease.go +++ b/storage/replica_range_lease.go @@ -69,7 +69,7 @@ func (p *pendingLeaseRequest) RequestPending() *roachpb.Lease { // opposed to an extension, or acquiring the lease when none is held). // // Note: Once this function gets a context to be used for cancellation, instead -// of replica.store.Stopper().ShouldDrain(), care will be needed for cancelling +// of replica.store.Stopper().ShouldQuiesce(), care will be needed for cancelling // the Raft command, similar to replica.addWriteCmd. func (p *pendingLeaseRequest) InitOrJoinRequest( replica *Replica,