From 4108a52f8c8528d9a69c96a26083a1bc919b768e Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 12 Feb 2020 22:37:27 -0500 Subject: [PATCH] storage/concurrency: implement concurrency Manager Informs #41720. Informs #44976. This PR implements the concurrency.Manager interface, which is the core structure that ties together the new concurrency package. The concurrency manager is a structure that sequences incoming requests and provides isolation between requests that intend to perform conflicting operations. During sequencing, conflicts are discovered and any found are resolved through a combination of passive queuing and active pushing. Once a request has been sequenced, it is free to evaluate without concerns of conflicting with other in-flight requests due to the isolation provided by the manager. This isolation is guaranteed for the lifetime of the request but terminates once the request completes. The manager accomplishes this by piecing together the following components in its request sequencing path: - `latchManager` - `lockTable` - `lockTableWaiter` - `txnWaitQueue` The largest part of this change is introducing the datadriven testing framework to deterministically test the concurrency manager. This proved difficult for two reasons: 1. the concurrency manager composes several components to perform it work (latchManager, lockTable, lockTableWaiter, txnWaitQueue). It was difficult to get consistent observability into each of these components in such a way that tests could be run against a set of concurrent requests interacting with them all. 2. the concurrency manager exposes a primarily blocking interface. Requests call `Sequence()` and wait for sequencing to complete. This may block in a number of different places - while waiting on latches, while waiting on locks, and while waiting on other transactions. The most important part of these tests is to assert _where_ a given request blocks based on the current state of the concurrency manager and then assert _how_ the request reacts to a state transition by another request. To address the first problem, the testing harness uses the context-carried tracing infrastructure to track the path of a request. We already had log events scattered throughout these various components, so this did not require digging testing hooks into each of them. Instead, the harness attached a trace recording span to each request and watches as events are added to the span. It then uses these events as the output of the text. To address the second problem, the testing harness introduces a monitor object which manages a collection of "monitored" goroutines. The monitor watches as these goroutines run and keeps track of their goroutine state as is reported by a goroutine dump. During each step of the datadriven test, the monitor allows all goroutines to proceed until they have either terminated or stalled due to cross-goroutine synchronization dependencies. For instance, it waits for all goroutines to stall while receiving from channels. We can be sure that the goroutine dump provides a consistent snapshot of all goroutine states and statuses because `runtime.Stack(all=true)` stops the world when called. This means that when all monitored goroutines are simultaneously stalled, we have a deadlock that can only be resolved by proceeding forward with the test and releasing latches, resolving locks, or committing transactions. This structure worked surprisingly well and has held up to long periods of stressrace. --- .../concurrency/concurrency_control.go | 18 +- .../concurrency/concurrency_manager.go | 367 +++++++++ .../concurrency/concurrency_manager_test.go | 751 ++++++++++++++++++ .../concurrency/datadriven_util_test.go | 151 ++++ pkg/storage/concurrency/latch_manager.go | 5 + pkg/storage/concurrency/lock_table_test.go | 68 -- pkg/storage/concurrency/lock_table_waiter.go | 6 +- .../concurrency/lock_table_waiter_test.go | 25 +- .../testdata/concurrency_manager/basic | 143 ++++ .../concurrency_manager/discovered_lock | 57 ++ .../testdata/concurrency_manager/no_latches | 45 ++ pkg/storage/spanset/spanset.go | 10 + pkg/storage/spanset/spanset_test.go | 40 + pkg/storage/store.go | 8 + 14 files changed, 1601 insertions(+), 93 deletions(-) create mode 100644 pkg/storage/concurrency/concurrency_manager.go create mode 100644 pkg/storage/concurrency/concurrency_manager_test.go create mode 100644 pkg/storage/concurrency/datadriven_util_test.go create mode 100644 pkg/storage/concurrency/testdata/concurrency_manager/basic create mode 100644 pkg/storage/concurrency/testdata/concurrency_manager/discovered_lock create mode 100644 pkg/storage/concurrency/testdata/concurrency_manager/no_latches diff --git a/pkg/storage/concurrency/concurrency_control.go b/pkg/storage/concurrency/concurrency_control.go index fdfb9abeb3c5..669d84bb01a9 100644 --- a/pkg/storage/concurrency/concurrency_control.go +++ b/pkg/storage/concurrency/concurrency_control.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/spanset" + "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -228,11 +229,11 @@ type ContentionHandler interface { type LockManager interface { // OnLockAcquired informs the concurrency manager that a transaction has // acquired a new lock or re-acquired an existing lock that it already held. - OnLockAcquired(context.Context, roachpb.Intent) + OnLockAcquired(context.Context, *roachpb.Intent) // OnLockUpdated informs the concurrency manager that a transaction has // updated or released a lock or range of locks that it previously held. - OnLockUpdated(context.Context, roachpb.Intent) + OnLockUpdated(context.Context, *roachpb.Intent) } // TransactionManager is concerned with tracking transactions that have their @@ -312,7 +313,7 @@ type Request struct { // Manager.FinishReq to release the request's resources when it has completed. type Guard struct { req Request - lag latchGuard + lg latchGuard ltg lockTableGuard } @@ -337,6 +338,9 @@ type latchManager interface { // Releases latches, relinquish its protection from conflicting requests. Release(latchGuard) + + // Info returns information about the state of the latchManager. + Info() (global, local storagepb.LatchManagerInfo) } // latchGuard is a handle to a set of acquired key latches. @@ -726,11 +730,3 @@ type txnWaitQueue interface { // true, future transactions may not be enqueued or waiting pushers added. Clear(disable bool) } - -// Silence unused warnings until this package is used. -var _ = Manager(nil) -var _ = latchManager(nil) -var _ = lockTableWaiter(nil) -var _ = txnWaitQueue(nil) -var _ = Guard{req: Request{}, lag: nil, ltg: nil} -var _ = latchManagerImpl{} diff --git a/pkg/storage/concurrency/concurrency_manager.go b/pkg/storage/concurrency/concurrency_manager.go new file mode 100644 index 000000000000..ceda026c4dc9 --- /dev/null +++ b/pkg/storage/concurrency/concurrency_manager.go @@ -0,0 +1,367 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package concurrency + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/storage/spanlatch" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/storage/txnwait" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +// managerImpl implements the Manager interface. +type managerImpl struct { + // Synchronizes conflicting in-flight requests. + lm latchManager + // Synchronizes conflicting in-progress transactions. + lt lockTable + // Waits for locks that conflict with a request to be released. + ltw lockTableWaiter + // Waits for transaction completion and detects deadlocks. + twq txnWaitQueue + // The Store and Range that the manager is responsible for. + str Store + rng *roachpb.RangeDescriptor +} + +// Store provides some parts of a Store without incurring a dependency. It is a +// superset of txnwait.StoreInterface. +type Store interface { + // Identification. + NodeDescriptor() *roachpb.NodeDescriptor + // Components. + DB() *client.DB + Clock() *hlc.Clock + Stopper() *stop.Stopper + IntentResolver() IntentResolver + // Knobs. + GetTxnWaitKnobs() txnwait.TestingKnobs + // Metrics. + GetTxnWaitMetrics() *txnwait.Metrics + GetSlowLatchGauge() *metric.Gauge +} + +// NewManager creates a new concurrency Manager structure. +func NewManager(store Store, rng *roachpb.RangeDescriptor) Manager { + m := new(managerImpl) + *m = managerImpl{ + // TODO(nvanbenschoten): move pkg/storage/spanlatch to a new + // pkg/storage/concurrency/latch package. Make it implement the + // latchManager interface directly, if possible. + lm: &latchManagerImpl{ + m: spanlatch.Make(store.Stopper(), store.GetSlowLatchGauge()), + }, + lt: newLockTable(10000), + ltw: &lockTableWaiterImpl{ + nodeID: store.NodeDescriptor().NodeID, + stopper: store.Stopper(), + ir: store.IntentResolver(), + dependencyCyclePushDelay: defaultDependencyCyclePushDelay, + }, + // TODO(nvanbenschoten): move pkg/storage/txnwait to a new + // pkg/storage/concurrency/txnwait package. + twq: txnwait.NewQueue(store, m), + str: store, + rng: rng, + } + return m +} + +// SequenceReq implements the RequestSequencer interface. +func (m *managerImpl) SequenceReq( + ctx context.Context, prev *Guard, req Request, +) (g *Guard, resp Response, err *Error) { + if prev == nil { + g = newGuard(req) + log.Event(ctx, "sequencing request") + } else { + g = prev + g.assertNoLatches() + log.Event(ctx, "re-sequencing request") + } + + // Ensure that we release the guard if we return a response or an error. + defer func() { + if g != nil && (resp != nil || err != nil) { + m.FinishReq(g) + g = nil + } + }() + + // Some requests don't need to acquire latches at all. + if !shouldAcquireLatches(req) { + log.Event(ctx, "not acquiring latches") + return g, nil, nil + } + + // Provide the manager with an opportunity to intercept the request. It + // may be able to server the request directly, and even if not, it may + // be able to update its internal state based on the request. + resp, err = m.maybeInterceptReq(ctx, req) + if resp != nil || err != nil { + return g, resp, err + } + + for { + // Acquire latches for the request. This synchronizes the request + // with all conflicting in-flight requests. + log.Event(ctx, "acquiring latches") + g.lg, err = m.lm.Acquire(ctx, req) + if err != nil { + return g, nil, err + } + + // Some requests don't want the wait on locks. + if !shouldWaitOnConflicts(req) { + return g, nil, nil + } + + // Scan for conflicting locks. + log.Event(ctx, "scanning lock table for conflicting locks") + g.ltg = m.lt.ScanAndEnqueue(g.req, g.ltg) + + // Wait on each newly conflicting lock, if necessary. + if g.ltg.ShouldWait() { + m.lm.Release(g.moveLatchGuard()) + + log.Event(ctx, "waiting in lock wait-queues") + if err := m.ltw.WaitOn(ctx, g.req, g.ltg); err != nil { + return g, nil, err + } + continue + } + return g, nil, nil + } +} + +// maybeInterceptReq allows the concurrency manager to intercept requests before +// sequencing and evaluation so that it can immediately act on them. This allows +// the concurrency manager to route certain concurrency control-related requests +// into queues and optionally update its internal state based on the requests. +func (m *managerImpl) maybeInterceptReq(ctx context.Context, req Request) (Response, *Error) { + switch { + case req.isSingle(roachpb.PushTxn): + // If necessary, wait in the txnWaitQueue for the pushee transaction to + // expire or to move to a finalized state. + t := req.Requests[0].GetPushTxn() + resp, err := m.twq.MaybeWaitForPush(ctx, t) + if err != nil { + return nil, err + } else if resp != nil { + return makeSingleResponse(resp), nil + } + case req.isSingle(roachpb.QueryTxn): + // If necessary, wait in the txnWaitQueue for a transaction state update + // or for a dependent transaction to change. + t := req.Requests[0].GetQueryTxn() + return nil, m.twq.MaybeWaitForQuery(ctx, t) + default: + // TODO(nvanbenschoten): in the future, use this hook to update the lock + // table to allow contending transactions to proceed. + // for _, arg := range req.Requests { + // switch t := arg.GetInner().(type) { + // case *roachpb.ResolveIntentRequest: + // _ = t + // case *roachpb.ResolveIntentRangeRequest: + // _ = t + // } + // } + } + return nil, nil +} + +// shouldAcquireLatches determines whether the request should acquire latches +// before proceeding to evaluate. Latches are used to synchronize with other +// conflicting requests, based on the Spans collected for the request. Most +// request types will want to acquire latches. +func shouldAcquireLatches(req Request) bool { + switch { + case req.ReadConsistency != roachpb.CONSISTENT: + // Only acquire latches for consistent operations. + return false + case req.isSingle(roachpb.RequestLease): + // Do not acquire latches for lease requests. These requests are run on + // replicas that do not hold the lease, so acquiring latches wouldn't + // help synchronize with other requests. + return false + } + return true +} + +// shouldWaitOnConflicts determines whether the request should wait on locks and +// wait-queues owned by other transactions before proceeding to evaluate. Most +// requests will want to wait on conflicting transactions to ensure that they +// are sufficiently isolated during their evaluation, but some "isolation aware" +// requests want to proceed to evaluation even in the presence of conflicts +// because they know how to handle them. +func shouldWaitOnConflicts(req Request) bool { + for _, ru := range req.Requests { + arg := ru.GetInner() + if roachpb.IsTransactional(arg) { + switch arg.Method() { + case roachpb.HeartbeatTxn: + case roachpb.Refresh: + case roachpb.RefreshRange: + default: + return true + } + } + } + return false +} + +// FinishReq implements the RequestSequencer interface. +func (m *managerImpl) FinishReq(g *Guard) { + if ltg := g.moveLockTableGuard(); ltg != nil { + m.lt.Dequeue(ltg) + } + if lg := g.moveLatchGuard(); lg != nil { + m.lm.Release(lg) + } +} + +// HandleWriterIntentError implements the ContentionHandler interface. +func (m *managerImpl) HandleWriterIntentError( + ctx context.Context, g *Guard, t *roachpb.WriteIntentError, +) *Guard { + // Add a discovered lock to lock-table for each intent and enter each lock's + // wait-queue. + for i := range t.Intents { + intent := &t.Intents[i] + if err := m.lt.AddDiscoveredLock(intent, g.ltg); err != nil { + log.Fatalf(ctx, "assertion failure: %s", err) + } + } + + // Release the Guard's latches but continue to remain in lock wait-queues by + // not releasing lockWaitQueueGuards. We expect the caller of this method to + // then re-sequence the Request by calling SequenceReq with the un-latched + // Guard. This is analogous to iterating through the loop in SequenceReq. + m.lm.Release(g.moveLatchGuard()) + return g +} + +// HandleTransactionPushError implements the ContentionHandler interface. +func (m *managerImpl) HandleTransactionPushError( + ctx context.Context, g *Guard, t *roachpb.TransactionPushError, +) *Guard { + m.twq.EnqueueTxn(&t.PusheeTxn) + + // Release the Guard's latches but continue to remain in lock wait-queues by + // not releasing lockWaitQueueGuards. We expect the caller of this method to + // then re-sequence the Request by calling SequenceReq with the un-latched + // Guard. This is analogous to iterating through the loop in SequenceReq. + m.lm.Release(g.moveLatchGuard()) + return g +} + +// OnLockAcquired implements the LockManager interface. +func (m *managerImpl) OnLockAcquired(ctx context.Context, in *roachpb.Intent) { + // TODO(nvanbenschoten): rename roachpb.Intent and add a lock.Durability + // field to it. + if err := m.lt.AcquireLock(&in.Txn, in.Key, lock.Exclusive, lock.Unreplicated); err != nil { + log.Fatalf(ctx, "assertion failure: %s", err) + } +} + +// OnLockUpdated implements the LockManager interface. +func (m *managerImpl) OnLockUpdated(ctx context.Context, in *roachpb.Intent) { + if err := m.lt.UpdateLocks(in); err != nil { + log.Fatalf(ctx, "assertion failure: %s", err) + } +} + +// OnTransactionUpdated implements the TransactionManager interface. +func (m *managerImpl) OnTransactionUpdated(ctx context.Context, txn *roachpb.Transaction) { + m.twq.UpdateTxn(ctx, txn) +} + +// GetDependents implements the TransactionManager interface. +func (m *managerImpl) GetDependents(txnID uuid.UUID) []uuid.UUID { + return m.twq.GetDependents(txnID) +} + +// OnDescriptorUpdated implements the RangeStateListener interface. +func (m *managerImpl) OnDescriptorUpdated(desc *roachpb.RangeDescriptor) { + m.rng = desc +} + +// OnLeaseUpdated implements the RangeStateListener interface. +func (m *managerImpl) OnLeaseUpdated(iAmTheLeaseHolder bool) { + if iAmTheLeaseHolder { + m.twq.Enable() + } else { + m.lt.Clear() + m.twq.Clear(true /* disable */) + } +} + +// OnSplit implements the RangeStateListener interface. +func (m *managerImpl) OnSplit() { + m.lt.Clear() + m.twq.Clear(false /* disable */) +} + +// OnMerge implements the RangeStateListener interface. +func (m *managerImpl) OnMerge() { + m.lt.Clear() + m.twq.Clear(true /* disable */) +} + +// ContainsKey implements the txnwait.ReplicaInterface interface. +func (m *managerImpl) ContainsKey(key roachpb.Key) bool { + return storagebase.ContainsKey(m.rng, key) +} + +func (r *Request) isSingle(m roachpb.Method) bool { + if len(r.Requests) != 1 { + return false + } + return r.Requests[0].GetInner().Method() == m +} + +func newGuard(req Request) *Guard { + // TODO(nvanbenschoten): Pool these guard objects. + return &Guard{req: req} +} + +func (g *Guard) assertNoLatches() { + if g.lg != nil { + panic("unexpected latches held") + } +} + +func (g *Guard) moveLatchGuard() latchGuard { + lg := g.lg + g.lg = nil + return lg +} + +func (g *Guard) moveLockTableGuard() lockTableGuard { + ltg := g.ltg + g.ltg = nil + return ltg +} + +func makeSingleResponse(r roachpb.Response) Response { + ru := make(Response, 1) + ru[0].MustSetInner(r) + return ru +} diff --git a/pkg/storage/concurrency/concurrency_manager_test.go b/pkg/storage/concurrency/concurrency_manager_test.go new file mode 100644 index 000000000000..26098b350554 --- /dev/null +++ b/pkg/storage/concurrency/concurrency_manager_test.go @@ -0,0 +1,751 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package concurrency + +import ( + "bytes" + "context" + "fmt" + "reflect" + "regexp" + "runtime" + "sort" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/batcheval" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/intentresolver" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" + "github.com/cockroachdb/cockroach/pkg/storage/txnwait" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uint128" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" +) + +// TestLockTableBasic verifies that sequences of requests interacting with a +// concurrency manager perform properly. +// +// The input files use the following DSL: +// +// txn name= ts=[,] epoch= +// single-request name= type= [fields=[,]...] +// batch-request name= txn=|none ts=[,] reqs=... [priority] [consistency] +// sequence req= +// finish req= +// +// handle-write-intent-error req= txn= key= +// handle-txn-push-error req= txn= key= TODO(nvanbenschoten): implement this +// +// on-lock-acquired txn= key= +// on-lock-updated txn= key=[,] +// on-txn-updated txn= +// +// on-desc-updated TODO(nvanbenschoten): implement this +// on-lease-updated TODO(nvanbenschoten): implement this +// on-split TODO(nvanbenschoten): implement this +// on-merge TODO(nvanbenschoten): implement this +// +// debug-latch-manager +// debug-lock-table +// reset +// +func TestConcurrencyManagerBasic(t *testing.T) { + defer leaktest.AfterTest(t)() + + datadriven.Walk(t, "testdata/concurrency_manager", func(t *testing.T, path string) { + c := newCluster() + m := NewManager(c, c.rangeDesc) + c.m = m + mon := newMonitor() + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "txn": + var txnName string + d.ScanArgs(t, "name", &txnName) + ts := scanTimestamp(t, d) + + var epoch int + d.ScanArgs(t, "epoch", &epoch) + + txn, ok := c.txnsByName[txnName] + var id uuid.UUID + if ok { + id = txn.ID + } else { + id = c.createTxnRecord() + } + txn = &roachpb.Transaction{ + TxnMeta: enginepb.TxnMeta{ + ID: id, + Epoch: enginepb.TxnEpoch(epoch), + WriteTimestamp: ts, + MinTimestamp: ts, + }, + ReadTimestamp: ts, + } + txn.UpdateObservedTimestamp(c.NodeDescriptor().NodeID, ts) + c.txnsByName[txnName] = txn + return "" + + case "single-request": + var reqName string + d.ScanArgs(t, "name", &reqName) + if _, ok := c.singleRequestsByName[reqName]; ok { + d.Fatalf(t, "duplicate single request: %s", reqName) + } + + req := scanSingleRequest(t, d) + c.singleRequestsByName[reqName] = req + return "" + + case "batch-request": + var batchName string + d.ScanArgs(t, "name", &batchName) + if _, ok := c.batchRequestsByName[batchName]; ok { + d.Fatalf(t, "duplicate batch request: %s", batchName) + } + + var txnName string + d.ScanArgs(t, "txn", &txnName) + txn, ok := c.txnsByName[txnName] + if !ok && txnName != "none" { + d.Fatalf(t, "unknown txn %s", txnName) + } + + ts := scanTimestamp(t, d) + if txn != nil && txn.ReadTimestamp != ts { + d.Fatalf(t, "txn read timestamp != timestamp") + } + + reqNames := scanStringSlice(t, d, "reqs") + reqs := make([]roachpb.Request, len(reqNames)) + for i, reqName := range reqNames { + req, ok := c.singleRequestsByName[reqName] + if !ok { + d.Fatalf(t, "unknown request %s", reqName) + } + reqs[i] = req + } + reqUnions := make([]roachpb.RequestUnion, len(reqs)) + for i, req := range reqs { + reqUnions[i].MustSetInner(req) + } + spans := c.collectSpans(t, txn, ts, reqs) + + readConsistency := roachpb.CONSISTENT + if d.HasArg("inconsistent") { + readConsistency = roachpb.INCONSISTENT + } + + c.batchRequestsByName[batchName] = Request{ + Txn: txn, + Timestamp: ts, + // TODO(nvanbenschoten): test Priority + ReadConsistency: readConsistency, + Requests: reqUnions, + Spans: spans, + } + return "" + + case "sequence": + var batchName string + d.ScanArgs(t, "req", &batchName) + batch, ok := c.batchRequestsByName[batchName] + if !ok { + d.Fatalf(t, "unknown request: %s", batchName) + } + + c.mu.Lock() + prev := c.guardsByReqName[batchName] + delete(c.guardsByReqName, batchName) + c.mu.Unlock() + + opName := fmt.Sprintf("sequence %s", batchName) + mon.launchMonitored(opName, func(ctx context.Context) { + guard, resp, err := m.SequenceReq(ctx, prev, batch) + if err != nil { + log.Eventf(ctx, "sequencing complete, returned error: %v", err) + } else if resp != nil { + log.Eventf(ctx, "sequencing complete, returned response: %v", resp) + } else if guard != nil { + log.Event(ctx, "sequencing complete, returned guard") + c.mu.Lock() + c.guardsByReqName[batchName] = guard + c.mu.Unlock() + } else { + log.Event(ctx, "sequencing complete, returned no guard") + } + }) + return mon.waitAndCollect(t) + + case "finish": + var batchName string + d.ScanArgs(t, "req", &batchName) + guard, ok := c.guardsByReqName[batchName] + if !ok { + d.Fatalf(t, "unknown request: %s", batchName) + } + + opName := fmt.Sprintf("finish %s", batchName) + mon.launchMonitored(opName, func(ctx context.Context) { + log.Event(ctx, "finishing request") + m.FinishReq(guard) + c.mu.Lock() + delete(c.guardsByReqName, batchName) + c.mu.Unlock() + }) + return mon.waitAndCollect(t) + + case "handle-write-intent-error": + var batchName string + d.ScanArgs(t, "req", &batchName) + guard, ok := c.guardsByReqName[batchName] + if !ok { + d.Fatalf(t, "unknown request: %s", batchName) + } + + var txnName string + d.ScanArgs(t, "txn", &txnName) + txn, ok := c.txnsByName[txnName] + if !ok { + d.Fatalf(t, "unknown txn %s", txnName) + } + + var key string + d.ScanArgs(t, "key", &key) + + opName := fmt.Sprintf("handle write intent error %s", batchName) + mon.launchMonitored(opName, func(ctx context.Context) { + err := &roachpb.WriteIntentError{Intents: []roachpb.Intent{{ + Span: roachpb.Span{Key: roachpb.Key(key)}, + Txn: txn.TxnMeta, + }}} + log.Eventf(ctx, "handling %v", err) + guard = m.HandleWriterIntentError(ctx, guard, err) + }) + return mon.waitAndCollect(t) + + case "on-lock-acquired": + var txnName string + d.ScanArgs(t, "txn", &txnName) + txn, ok := c.txnsByName[txnName] + if !ok { + d.Fatalf(t, "unknown txn %s", txnName) + } + + var key string + d.ScanArgs(t, "key", &key) + + mon.launchMonitored("acquire lock", func(ctx context.Context) { + log.Eventf(ctx, "%s @ %s", txnName, key) + m.OnLockAcquired(ctx, &roachpb.Intent{ + Span: roachpb.Span{Key: roachpb.Key(key)}, + Txn: txn.TxnMeta, + }) + }) + return mon.waitAndCollect(t) + + case "on-txn-updated": + var txnName string + d.ScanArgs(t, "txn", &txnName) + txn, ok := c.txnsByName[txnName] + if !ok { + d.Fatalf(t, "unknown txn %s", txnName) + } + + mon.launchMonitored("update txn", func(ctx context.Context) { + log.Eventf(ctx, "committing %s", txnName) + if err := c.commitTxnRecord(txn.ID); err != nil { + d.Fatalf(t, err.Error()) + } + }) + return mon.waitAndCollect(t) + + case "debug-latch-manager": + global, local := m.(*managerImpl).lm.Info() + output := []string{ + fmt.Sprintf("write count: %d", global.WriteCount+local.WriteCount), + fmt.Sprintf(" read count: %d", global.ReadCount+local.ReadCount), + } + return strings.Join(output, "\n") + + case "debug-lock-table": + return c.lockTable().String() + + case "reset": + if n := mon.numMonitored(); n > 0 { + d.Fatalf(t, "%d requests still in flight", n) + } + mon.resetSeqNums() + if err := c.reset(); err != nil { + d.Fatalf(t, "could not reset cluster: %v", err) + } + return "" + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }) + }) +} + +// cluster encapsulates the state of a running cluster and a set of requests. +// It serves as the test harness in TestConcurrencyManagerBasic - maintaining +// transaction and request declarations, recording the state of in-flight +// requests as they flow through the concurrency manager, and mocking out the +// interfaces that the concurrency manager interacts with. +type cluster struct { + nodeDesc *roachpb.NodeDescriptor + rangeDesc *roachpb.RangeDescriptor + m Manager + + // Definitions. + txnCounter uint128.Uint128 + txnsByName map[string]*roachpb.Transaction + singleRequestsByName map[string]roachpb.Request + batchRequestsByName map[string]Request + + // Request state. Cleared on reset. + mu syncutil.Mutex + guardsByReqName map[string]*Guard + txnRecord map[uuid.UUID]chan struct{} // closed on commit/abort +} + +func newCluster() *cluster { + return &cluster{ + nodeDesc: &roachpb.NodeDescriptor{NodeID: 1}, + rangeDesc: &roachpb.RangeDescriptor{RangeID: 1}, + + txnsByName: make(map[string]*roachpb.Transaction), + singleRequestsByName: make(map[string]roachpb.Request), + batchRequestsByName: make(map[string]Request), + guardsByReqName: make(map[string]*Guard), + txnRecord: make(map[uuid.UUID]chan struct{}), + } +} + +// cluster implements the Store interface. +func (c *cluster) NodeDescriptor() *roachpb.NodeDescriptor { return c.nodeDesc } +func (c *cluster) DB() *client.DB { return nil } +func (c *cluster) Clock() *hlc.Clock { return nil } +func (c *cluster) Stopper() *stop.Stopper { return nil } +func (c *cluster) IntentResolver() IntentResolver { return c } +func (c *cluster) GetTxnWaitKnobs() txnwait.TestingKnobs { return txnwait.TestingKnobs{} } +func (c *cluster) GetTxnWaitMetrics() *txnwait.Metrics { return nil } +func (c *cluster) GetSlowLatchGauge() *metric.Gauge { return nil } + +// PushTransaction implements the IntentResolver interface. +func (c *cluster) PushTransaction( + ctx context.Context, pushee *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, +) (roachpb.Transaction, *Error) { + log.Eventf(ctx, "pushing txn %s", pushee.ID) + pusheeRecord, err := c.getTxnRecord(pushee.ID) + if err != nil { + return roachpb.Transaction{}, roachpb.NewError(err) + } + // Wait for the transaction to commit. + <-pusheeRecord + return roachpb.Transaction{TxnMeta: *pushee, Status: roachpb.COMMITTED}, nil +} + +// ResolveIntent implements the IntentResolver interface. +func (c *cluster) ResolveIntent( + ctx context.Context, intent roachpb.Intent, _ intentresolver.ResolveOptions, +) *Error { + log.Eventf(ctx, "resolving intent %s for txn %s", intent.Key, intent.Txn.ID) + c.m.OnLockUpdated(ctx, &intent) + return nil +} + +// TODO(nvanbenschoten): remove the following methods once all their uses are +// possible through the external interface of the concurrency Manager. +func (c *cluster) latchManager() *latchManagerImpl { return c.m.(*managerImpl).lm.(*latchManagerImpl) } +func (c *cluster) lockTable() *lockTableImpl { return c.m.(*managerImpl).lt.(*lockTableImpl) } + +func (c *cluster) createTxnRecord() uuid.UUID { + c.mu.Lock() + defer c.mu.Unlock() + id := nextUUID(&c.txnCounter) + c.txnRecord[id] = make(chan struct{}) + return id +} + +func (c *cluster) getTxnRecord(id uuid.UUID) (chan struct{}, error) { + c.mu.Lock() + defer c.mu.Unlock() + ch, ok := c.txnRecord[id] + if !ok { + return nil, errors.Errorf("unknown txn %v: %v", id, c.txnRecord) + } + return ch, nil +} + +func (c *cluster) commitTxnRecord(id uuid.UUID) error { + c.mu.Lock() + defer c.mu.Unlock() + ch, ok := c.txnRecord[id] + if !ok { + return errors.Errorf("unknown txn %v: %v", id, c.txnRecord) + } + close(ch) + // Keep the record in the map so others can observe it. + return nil +} + +// reset clears all request state in the cluster. This avoids portions of tests +// leaking into one another and also serves as an assertion that a sequence of +// commands has completed without leaking any requests. +func (c *cluster) reset() error { + c.mu.Lock() + defer c.mu.Unlock() + // Reset all transactions to pending. + for id := range c.txnRecord { + c.txnRecord[id] = make(chan struct{}) + } + // There should be no remaining concurrency guards. + for name := range c.guardsByReqName { + return errors.Errorf("unfinished guard for request: %s", name) + } + // There should be no outstanding latches. + lm := c.latchManager() + global, local := lm.Info() + if global.ReadCount > 0 || global.WriteCount > 0 || + local.ReadCount > 0 || local.WriteCount > 0 { + return errors.Errorf("outstanding latches") + } + // Clear the lock table. + c.lockTable().Clear() + return nil +} + +// collectSpans collects the declared spans for a set of requests. +// Its logic mirrors that in Replica.collectSpans. +func (c *cluster) collectSpans( + t *testing.T, txn *roachpb.Transaction, ts hlc.Timestamp, reqs []roachpb.Request, +) *spanset.SpanSet { + spans := &spanset.SpanSet{} + h := roachpb.Header{Txn: txn, Timestamp: ts} + for _, req := range reqs { + if cmd, ok := batcheval.LookupCommand(req.Method()); ok { + cmd.DeclareKeys(c.rangeDesc, h, req, spans) + } else { + t.Fatalf("unrecognized command %s", req.Method()) + } + } + + // Commands may create a large number of duplicate spans. De-duplicate + // them to reduce the number of spans we pass to the spanlatch manager. + spans.SortAndDedup() + if err := spans.Validate(); err != nil { + t.Fatal(err) + } + return spans + +} + +// monitor tracks a set of running goroutines as they execute and captures +// tracing recordings from them. It is capable of watching its set of goroutines +// until they all mutually stall. +// +// It is NOT safe to use multiple monitors concurrently. +type monitor struct { + seq int + gs map[*monitoredGoroutine]struct{} + buf []byte // avoids allocations +} + +type monitoredGoroutine struct { + opSeq int + opName string + collect func() tracing.Recording + cancel func() + prevLines int + finished int32 +} + +func newMonitor() *monitor { + return &monitor{ + gs: make(map[*monitoredGoroutine]struct{}), + } +} + +func (m *monitor) launchMonitored(opName string, fn func(context.Context)) { + m.seq++ + ctx := context.Background() + ctx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, opName) + g := &monitoredGoroutine{ + opSeq: m.seq, + opName: opName, + collect: collect, + cancel: cancel, + } + m.gs[g] = struct{}{} + go func() { + fn(ctx) + atomic.StoreInt32(&g.finished, 1) + }() +} + +func (m *monitor) numMonitored() int { + return len(m.gs) +} + +func (m *monitor) resetSeqNums() { + m.seq = 0 +} + +func (m *monitor) waitAndCollect(t *testing.T) string { + m.waitForMonitoredGoroutinesToStall(t) + return m.collectRecordings() +} + +func (m *monitor) collectRecordings() string { + // Collect trace recordings from each goroutine. + type logRecord struct { + g *monitoredGoroutine + time time.Time + fieldIdx int + value string + } + var logs []logRecord + for g := range m.gs { + prev := g.prevLines + rec := g.collect() + for _, span := range rec { + for _, log := range span.Logs { + for i, field := range log.Fields { + if prev > 0 { + prev-- + continue + } + logs = append(logs, logRecord{ + g: g, + time: log.Time, + fieldIdx: i, + value: field.Value, + }) + g.prevLines++ + } + } + } + if atomic.LoadInt32(&g.finished) == 1 { + g.cancel() + delete(m.gs, g) + } + } + + // Sort logs by (time, fieldIdx). + // + // TODO(nvanbenschoten): this won't be enough to make the trace output + // deterministic when there is no enforced ordering between some of the + // events in two different goroutines. We may need to simply sort by + // goroutine sequence and avoid any notion of real-time ordering. + sort.Slice(logs, func(i int, j int) bool { + if logs[i].time.Equal(logs[j].time) { + return logs[i].fieldIdx < logs[j].fieldIdx + } + return logs[i].time.Before(logs[j].time) + }) + + var buf strings.Builder + for i, log := range logs { + if i > 0 { + buf.WriteByte('\n') + } + fmt.Fprintf(&buf, "[%d] %s: %s", log.g.opSeq, log.g.opName, log.value) + } + return buf.String() +} + +// waitForMonitoredGoroutinesToStall waits for all goroutines that were launched +// by the monitor's launchMonitored method to stall due to cross-goroutine +// synchronization dependencies. For instance, it waits for all goroutines to +// stall while receiving from channels. When the method returns, the caller has +// exclusive access to any memory that it shares only with monitored goroutines +// until it performs an action that may unblock any of the goroutines. +func (m *monitor) waitForMonitoredGoroutinesToStall(t *testing.T) { + // Iterate until we see two iterations in a row that both observe all + // monitored goroutines to be stalled and also both observe the exact + // same goroutine state. Waiting for two iterations to be identical + // isn't required for correctness because the goroutine dump itself + // should provide a consistent snapshot of all goroutine states and + // statuses (runtime.Stack(all=true) stops the world), but it still + // seems like a generally good idea. + var prevStatus []goroutine + filter := funcName((*monitor).launchMonitored) + testutils.SucceedsSoon(t, func() error { + status := goroutineStatus(t, filter, &m.buf) + if len(status) == 0 { + // No monitored goroutines. + return nil + } + + lastStatus := prevStatus + prevStatus = status + if lastStatus == nil { + // First iteration. + return errors.Errorf("previous status unset") + } + + // Check whether all monitored goroutines are stalled. If not, retry. + for _, g := range status { + stalled, ok := goroutineStalledStates[g.status] + if !ok { + // NB: this will help us avoid rotting on Go runtime changes. + t.Fatalf("unknown goroutine state: %s", g.status) + } + if !stalled { + return errors.Errorf("goroutine %d is not stalled; status %s\n\n%s", + g.id, g.status, g.frame) + } + } + + // Make sure the goroutines haven't changed since the last iteration. + // This ensures that the goroutines stay stable for some amount of time. + // NB: status and lastStatus are sorted by goroutine id. + if !reflect.DeepEqual(status, lastStatus) { + return errors.Errorf("goroutines rapidly changing") + } + return nil + }) +} + +func funcName(f interface{}) string { + return runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() +} + +// goroutineStalledStates maps all goroutine states as reported by runtime.Stack +// to a boolean representing whether that state indicates a stalled goroutine. A +// stalled goroutine is one that is waiting on a change on another goroutine in +// order for it to make forward progress itself. If all goroutines enter stalled +// states simultaneously, a process would encounter a deadlock. +var goroutineStalledStates = map[string]bool{ + // See gStatusStrings in runtime/traceback.go and associated comments about + // the corresponding G statuses in runtime/runtime2.go. + "idle": false, + "runnable": false, + "running": false, + "syscall": false, + "waiting": true, + "dead": false, + "copystack": false, + "???": false, // catch-all in runtime.goroutineheader + + // runtime.goroutineheader may override these G statuses with a waitReason. + // See waitReasonStrings in runtime/runtime2.go. + "GC assist marking": false, + "IO wait": false, + "chan receive (nil chan)": true, + "chan send (nil chan)": true, + "dumping heap": false, + "garbage collection": false, + "garbage collection scan": false, + "panicwait": false, + "select": true, + "select (no cases)": true, + "GC assist wait": false, + "GC sweep wait": false, + "GC scavenge wait": false, + "chan receive": true, + "chan send": true, + "finalizer wait": false, + "force gc (idle)": false, + // Perhaps surprisingly, we mark "semacquire" as a non-stalled state. This + // is because it is possible to see goroutines briefly enter this state when + // performing fine-grained memory synchronization, occasionally in the Go + // runtime itself. No request-level synchronization points use mutexes to + // wait for state transitions by other requests, so it is safe to ignore + // this state and wait for it to exit. + "semacquire": false, + "sleep": false, + "sync.Cond.Wait": true, + "timer goroutine (idle)": false, + "trace reader (blocked)": false, + "wait for GC cycle": false, + "GC worker (idle)": false, +} + +type goroutine struct { + id int + status string + frame []byte +} + +var goroutineStackStatusRE = regexp.MustCompile(`goroutine (\d+) \[(.+)\]:`) + +// goroutineStatus returns a stack trace for each goroutine whose stack frame +// matches the provided filter. It uses the provided buffer to avoid repeat +// allocations. +func goroutineStatus(t *testing.T, filter string, buf *[]byte) []goroutine { + // We don't know how big the buffer needs to be to collect all the + // goroutines. Start with 1 MB and try a few times, doubling each time. + // NB: This is inspired by runtime/pprof/pprof.go:writeGoroutineStacks. + if len(*buf) == 0 { + *buf = make([]byte, 1<<20) + } + var truncBuf []byte + for i := 0; ; i++ { + n := runtime.Stack(*buf, true /* all */) + if n < len(*buf) { + truncBuf = (*buf)[:n] + break + } + *buf = make([]byte, 2*len(*buf)) + } + + // Split up each stack frame. + frames := bytes.Split(truncBuf, []byte("\n\n")) + + // Filter down to only those being monitored by the test harness. + var filtered [][]byte + for _, f := range frames { + if bytes.Contains(f, []byte(filter)) { + filtered = append(filtered, f) + } + } + + // Parse matching goroutine statuses. + status := make([]goroutine, len(filtered)) + for i, f := range filtered { + match := goroutineStackStatusRE.FindSubmatch(f) + if len(match) != 3 { + t.Fatalf("could not find goroutine header in:\n%s", f) + } + gid, err := strconv.Atoi(string(match[1])) + if err != nil { + t.Fatal(err) + } + status[i] = goroutine{ + id: gid, + status: string(match[2]), + frame: f, + } + } + sort.Slice(status, func(i int, j int) bool { + return status[i].id < status[j].id + }) + return status +} diff --git a/pkg/storage/concurrency/datadriven_util_test.go b/pkg/storage/concurrency/datadriven_util_test.go new file mode 100644 index 000000000000..b74b45b65a22 --- /dev/null +++ b/pkg/storage/concurrency/datadriven_util_test.go @@ -0,0 +1,151 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package concurrency + +import ( + "strconv" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/uint128" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/datadriven" +) + +func nextUUID(counter *uint128.Uint128) uuid.UUID { + *counter = counter.Add(1) + return uuid.FromUint128(*counter) +} + +func scanStringSlice(t *testing.T, d *datadriven.TestData, key string) []string { + var str string + d.ScanArgs(t, key, &str) + return strings.Split(str, ",") +} + +func scanTimestamp(t *testing.T, d *datadriven.TestData) hlc.Timestamp { + var ts hlc.Timestamp + var tsS string + d.ScanArgs(t, "ts", &tsS) + parts := strings.Split(tsS, ",") + + // Find the wall time part. + tsW, err := strconv.ParseInt(parts[0], 10, 64) + if err != nil { + d.Fatalf(t, "%v", err) + } + ts.WallTime = tsW + + // Find the logical part, if there is one. + var tsL int64 + if len(parts) > 1 { + tsL, err = strconv.ParseInt(parts[1], 10, 32) + if err != nil { + d.Fatalf(t, "%v", err) + } + } + ts.Logical = int32(tsL) + return ts +} + +func getSpan(t *testing.T, d *datadriven.TestData, str string) roachpb.Span { + parts := strings.Split(str, ",") + span := roachpb.Span{Key: roachpb.Key(parts[0])} + if len(parts) > 2 { + d.Fatalf(t, "incorrect span format: %s", str) + } else if len(parts) == 2 { + span.EndKey = roachpb.Key(parts[1]) + } + return span +} + +func scanSpans(t *testing.T, d *datadriven.TestData, ts hlc.Timestamp) *spanset.SpanSet { + spans := &spanset.SpanSet{} + var spansStr string + d.ScanArgs(t, "spans", &spansStr) + parts := strings.Split(spansStr, "+") + for _, p := range parts { + if len(p) < 2 || p[1] != '@' { + d.Fatalf(t, "incorrect span with access format: %s", p) + } + c := p[0] + p = p[2:] + var sa spanset.SpanAccess + switch c { + case 'r': + sa = spanset.SpanReadOnly + case 'w': + sa = spanset.SpanReadWrite + default: + d.Fatalf(t, "incorrect span access: %c", c) + } + spans.AddMVCC(sa, getSpan(t, d, p), ts) + } + return spans +} + +func scanSingleRequest(t *testing.T, d *datadriven.TestData) roachpb.Request { + var reqTypeStr string + d.ScanArgs(t, "type", &reqTypeStr) + + fields := make(map[string]string) + if d.HasArg("fields") { + var reqFieldsStr string + d.ScanArgs(t, "fields", &reqFieldsStr) + reqFieldStrs := strings.Split(reqFieldsStr, ",") + for _, reqFieldStr := range reqFieldStrs { + split := strings.Split(reqFieldStr, "=") + if len(split) != 2 { + d.Fatalf(t, "unexpected field: %s", reqFieldStr) + } + fields[split[0]] = split[1] + } + } + mustGetField := func(f string) string { + v, ok := fields[f] + if !ok { + d.Fatalf(t, "missing required field: %s", f) + } + return v + } + + switch reqTypeStr { + case "get": + var r roachpb.GetRequest + r.Key = roachpb.Key(mustGetField("key")) + return &r + + case "scan": + var r roachpb.ScanRequest + r.Key = roachpb.Key(mustGetField("key")) + if v, ok := fields["endkey"]; ok { + r.EndKey = roachpb.Key(v) + } + return &r + + case "put": + var r roachpb.PutRequest + r.Key = roachpb.Key(mustGetField("key")) + r.Value.SetString(mustGetField("value")) + return &r + + case "request-lease": + var r roachpb.RequestLeaseRequest + return &r + + default: + d.Fatalf(t, "unknown request type: %s", reqTypeStr) + return nil + } +} diff --git a/pkg/storage/concurrency/latch_manager.go b/pkg/storage/concurrency/latch_manager.go index 9033e4d04cce..18e74fec5cb7 100644 --- a/pkg/storage/concurrency/latch_manager.go +++ b/pkg/storage/concurrency/latch_manager.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/spanlatch" + "github.com/cockroachdb/cockroach/pkg/storage/storagepb" ) // latchManagerImpl implements the latchManager interface. @@ -33,3 +34,7 @@ func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard func (m *latchManagerImpl) Release(lg latchGuard) { m.m.Release(lg.(*spanlatch.Guard)) } + +func (m *latchManagerImpl) Info() (global, local storagepb.LatchManagerInfo) { + return m.m.Info() +} diff --git a/pkg/storage/concurrency/lock_table_test.go b/pkg/storage/concurrency/lock_table_test.go index d48dcebd32dc..267b756cdefe 100644 --- a/pkg/storage/concurrency/lock_table_test.go +++ b/pkg/storage/concurrency/lock_table_test.go @@ -14,8 +14,6 @@ import ( "context" "fmt" "runtime" - "strconv" - "strings" "sync/atomic" "testing" "time" @@ -116,72 +114,6 @@ print Calls lockTable.String. */ -func scanTimestamp(t *testing.T, d *datadriven.TestData) hlc.Timestamp { - var ts hlc.Timestamp - var tsS string - d.ScanArgs(t, "ts", &tsS) - parts := strings.Split(tsS, ",") - - // Find the wall time part. - tsW, err := strconv.ParseInt(parts[0], 10, 64) - if err != nil { - d.Fatalf(t, "%v", err) - } - ts.WallTime = tsW - - // Find the logical part, if there is one. - var tsL int64 - if len(parts) > 1 { - tsL, err = strconv.ParseInt(parts[1], 10, 32) - if err != nil { - d.Fatalf(t, "%v", err) - } - } - ts.Logical = int32(tsL) - return ts -} - -func nextUUID(counter *uint128.Uint128) uuid.UUID { - *counter = counter.Add(1) - return uuid.FromUint128(*counter) -} - -func getSpan(t *testing.T, d *datadriven.TestData, str string) roachpb.Span { - parts := strings.Split(str, ",") - span := roachpb.Span{Key: roachpb.Key(parts[0])} - if len(parts) > 2 { - d.Fatalf(t, "incorrect span format: %s", str) - } else if len(parts) == 2 { - span.EndKey = roachpb.Key(parts[1]) - } - return span -} - -func scanSpans(t *testing.T, d *datadriven.TestData, ts hlc.Timestamp) *spanset.SpanSet { - spans := &spanset.SpanSet{} - var spansStr string - d.ScanArgs(t, "spans", &spansStr) - parts := strings.Split(spansStr, "+") - for _, p := range parts { - if len(p) < 2 || p[1] != '@' { - d.Fatalf(t, "incorrect span with access format: %s", p) - } - c := p[0] - p = p[2:] - var sa spanset.SpanAccess - switch c { - case 'r': - sa = spanset.SpanReadOnly - case 'w': - sa = spanset.SpanReadWrite - default: - d.Fatalf(t, "incorrect span access: %c", c) - } - spans.AddMVCC(sa, getSpan(t, d, p), ts) - } - return spans -} - func TestLockTableBasic(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/storage/concurrency/lock_table_waiter.go b/pkg/storage/concurrency/lock_table_waiter.go index bb40e068cb1b..b0dc92eba291 100644 --- a/pkg/storage/concurrency/lock_table_waiter.go +++ b/pkg/storage/concurrency/lock_table_waiter.go @@ -34,17 +34,17 @@ type lockTableWaiterImpl struct { stopper *stop.Stopper // Used to push conflicting transactions and resolve conflicting intents. - ir intentResolver + ir IntentResolver // How long to wait until pushing conflicting transactions to detect // dependency cycles. dependencyCyclePushDelay time.Duration } -// intentResolver is an interface used by lockTableWaiterImpl to push +// IntentResolver is an interface used by lockTableWaiterImpl to push // transactions and to resolve intents. It contains only the subset of the // intentresolver.IntentResolver interface that lockTableWaiterImpl needs. -type intentResolver interface { +type IntentResolver interface { // PushTransaction pushes the provided transaction. The method will push the // provided pushee transaction immediately, if possible. Otherwise, it will // block until the pushee transaction is finalized or eventually can be diff --git a/pkg/storage/concurrency/lock_table_waiter_test.go b/pkg/storage/concurrency/lock_table_waiter_test.go index f3988837c51a..cc1724cea292 100644 --- a/pkg/storage/concurrency/lock_table_waiter_test.go +++ b/pkg/storage/concurrency/lock_table_waiter_test.go @@ -27,20 +27,20 @@ import ( ) type mockIntentResolver struct { - pushTxn func(*enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType) (roachpb.Transaction, *Error) - resolveIntent func(roachpb.Intent) *Error + pushTxn func(context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType) (roachpb.Transaction, *Error) + resolveIntent func(context.Context, roachpb.Intent) *Error } func (m *mockIntentResolver) PushTransaction( - _ context.Context, txn *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, + ctx context.Context, txn *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, ) (roachpb.Transaction, *Error) { - return m.pushTxn(txn, h, pushType) + return m.pushTxn(ctx, txn, h, pushType) } func (m *mockIntentResolver) ResolveIntent( - _ context.Context, intent roachpb.Intent, _ intentresolver.ResolveOptions, + ctx context.Context, intent roachpb.Intent, _ intentresolver.ResolveOptions, ) *Error { - return m.resolveIntent(intent) + return m.resolveIntent(ctx, intent) } type mockLockTableGuard struct { @@ -284,7 +284,10 @@ func testWaitPush(t *testing.T, k stateKind, makeReq func() Request, expPushTS h req := makeReq() ir.pushTxn = func( - pusheeArg *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, + _ context.Context, + pusheeArg *enginepb.TxnMeta, + h roachpb.Header, + pushType roachpb.PushTxnType, ) (roachpb.Transaction, *Error) { require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg) require.Equal(t, req.Txn, h.Txn) @@ -301,7 +304,7 @@ func testWaitPush(t *testing.T, k stateKind, makeReq func() Request, expPushTS h // we know the holder is ABORTED. Otherwide, immediately // tell the request to stop waiting. if lockHeld { - ir.resolveIntent = func(intent roachpb.Intent) *Error { + ir.resolveIntent = func(_ context.Context, intent roachpb.Intent) *Error { require.Equal(t, keyA, intent.Key) require.Equal(t, pusheeTxn.ID, intent.Txn.ID) require.Equal(t, roachpb.ABORTED, intent.Status) @@ -354,7 +357,7 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) { // Errors are propagated when observed while pushing transactions. g.notify() ir.pushTxn = func( - _ *enginepb.TxnMeta, _ roachpb.Header, _ roachpb.PushTxnType, + _ context.Context, _ *enginepb.TxnMeta, _ roachpb.Header, _ roachpb.PushTxnType, ) (roachpb.Transaction, *Error) { return roachpb.Transaction{}, err1 } @@ -364,11 +367,11 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) { // Errors are propagated when observed while resolving intents. g.notify() ir.pushTxn = func( - _ *enginepb.TxnMeta, _ roachpb.Header, _ roachpb.PushTxnType, + _ context.Context, _ *enginepb.TxnMeta, _ roachpb.Header, _ roachpb.PushTxnType, ) (roachpb.Transaction, *Error) { return roachpb.Transaction{}, nil } - ir.resolveIntent = func(intent roachpb.Intent) *Error { + ir.resolveIntent = func(_ context.Context, intent roachpb.Intent) *Error { return err2 } err = w.WaitOn(ctx, req, g) diff --git a/pkg/storage/concurrency/testdata/concurrency_manager/basic b/pkg/storage/concurrency/testdata/concurrency_manager/basic new file mode 100644 index 000000000000..136c7defb729 --- /dev/null +++ b/pkg/storage/concurrency/testdata/concurrency_manager/basic @@ -0,0 +1,143 @@ +txn name=txn1 ts=10,1 epoch=0 +---- + +txn name=txn2 ts=12,1 epoch=0 +---- + +txn name=txn3 ts=14,1 epoch=0 +---- + +single-request name=get1 type=get fields=key=k +---- + +single-request name=scan1 type=scan fields=key=k,endkey=k2 +---- + +single-request name=put1 type=put fields=key=k,value=v +---- + +# ------------------------------------------------------------- +# Simple read-only request +# ------------------------------------------------------------- + +batch-request name=req1 txn=txn1 ts=10,1 reqs=get1,scan1 +---- + +sequence req=req1 +---- +[1] sequence req1: sequencing request +[1] sequence req1: acquiring latches +[1] sequence req1: scanning lock table for conflicting locks +[1] sequence req1: sequencing complete, returned guard + +debug-latch-manager +---- +write count: 0 + read count: 1 + +finish req=req1 +---- +[2] finish req1: finishing request + +reset +---- + +# ------------------------------------------------------------- +# Simple read-write request that acquires a lock +# ------------------------------------------------------------- + +batch-request name=req2 txn=txn2 ts=12,1 reqs=put1 +---- + +sequence req=req2 +---- +[1] sequence req2: sequencing request +[1] sequence req2: acquiring latches +[1] sequence req2: scanning lock table for conflicting locks +[1] sequence req2: sequencing complete, returned guard + +on-lock-acquired txn=txn2 key=k +---- +[2] acquire lock: txn2 @ k + +debug-lock-table +---- +global: num=1 + lock: "k" + holder: txn: 00000000-0000-0000-0000-000000000002, ts: 0.000000012,1 +local: num=0 + +finish req=req2 +---- +[3] finish req2: finishing request + +reset +---- + +# Demonstrate that 'reset' clears the lock table. +debug-lock-table +---- +global: num=0 +local: num=0 + +# ------------------------------------------------------------- +# 1. Acquire a lock +# 2. Read-only requests blocks on lock +# 3. Lock is released, read-only request proceeds +# 4. Read-write request blocks on latches +# 5. Requests proceed in order +# ------------------------------------------------------------- + +on-lock-acquired txn=txn2 key=k +---- +[1] acquire lock: txn2 @ k + +batch-request name=req3 txn=txn3 ts=14,1 reqs=get1,scan1 +---- + +sequence req=req3 +---- +[2] sequence req3: sequencing request +[2] sequence req3: acquiring latches +[2] sequence req3: scanning lock table for conflicting locks +[2] sequence req3: waiting in lock wait-queues +[2] sequence req3: pushing txn 00000000-0000-0000-0000-000000000002 + +on-txn-updated txn=txn2 +---- +[3] update txn: committing txn2 +[2] sequence req3: resolving intent "k" for txn 00000000-0000-0000-0000-000000000002 +[2] sequence req3: acquiring latches +[2] sequence req3: scanning lock table for conflicting locks +[2] sequence req3: sequencing complete, returned guard + +debug-lock-table +---- +global: num=0 +local: num=0 + +batch-request name=req4 txn=txn1 ts=10,1 reqs=put1 +---- + +sequence req=req4 +---- +[4] sequence req4: sequencing request +[4] sequence req4: acquiring latches + +debug-latch-manager +---- +write count: 1 + read count: 1 + +finish req=req3 +---- +[5] finish req3: finishing request +[4] sequence req4: scanning lock table for conflicting locks +[4] sequence req4: sequencing complete, returned guard + +finish req=req4 +---- +[6] finish req4: finishing request + +reset +---- diff --git a/pkg/storage/concurrency/testdata/concurrency_manager/discovered_lock b/pkg/storage/concurrency/testdata/concurrency_manager/discovered_lock new file mode 100644 index 000000000000..70fada31738c --- /dev/null +++ b/pkg/storage/concurrency/testdata/concurrency_manager/discovered_lock @@ -0,0 +1,57 @@ +# ------------------------------------------------------------- +# Read-only request runs into replicated intent. It informs the +# lock table and waits for the intent to be resolved. +# ------------------------------------------------------------- + +txn name=txn1 ts=10,1 epoch=0 +---- + +txn name=txn2 ts=12,1 epoch=0 +---- + +single-request name=get1 type=get fields=key=k +---- + +batch-request name=req1 txn=txn2 ts=12,1 reqs=get1 +---- + +sequence req=req1 +---- +[1] sequence req1: sequencing request +[1] sequence req1: acquiring latches +[1] sequence req1: scanning lock table for conflicting locks +[1] sequence req1: sequencing complete, returned guard + +handle-write-intent-error req=req1 txn=txn1 key=k +---- +[2] handle write intent error req1: handling conflicting intents on "k" + +debug-lock-table +---- +global: num=1 + lock: "k" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,1 +local: num=0 + +sequence req=req1 +---- +[3] sequence req1: re-sequencing request +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: waiting in lock wait-queues +[3] sequence req1: pushing txn 00000000-0000-0000-0000-000000000001 + +on-txn-updated txn=txn1 +---- +[4] update txn: committing txn1 +[3] sequence req1: resolving intent "k" for txn 00000000-0000-0000-0000-000000000001 +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: sequencing complete, returned guard + +finish req=req1 +---- +[5] finish req1: finishing request + +reset +---- diff --git a/pkg/storage/concurrency/testdata/concurrency_manager/no_latches b/pkg/storage/concurrency/testdata/concurrency_manager/no_latches new file mode 100644 index 000000000000..c7c27e33e419 --- /dev/null +++ b/pkg/storage/concurrency/testdata/concurrency_manager/no_latches @@ -0,0 +1,45 @@ +# ------------------------------------------------------------- +# Inconsistent reads do not acquire latches +# ------------------------------------------------------------- + +single-request name=get1 type=get fields=key=k +---- + +batch-request name=inconsistentReq txn=none ts=10,1 reqs=get1 inconsistent +---- + +sequence req=inconsistentReq +---- +[1] sequence inconsistentReq: sequencing request +[1] sequence inconsistentReq: not acquiring latches +[1] sequence inconsistentReq: sequencing complete, returned guard + +finish req=inconsistentReq +---- +[2] finish inconsistentReq: finishing request + +reset +---- + +# ------------------------------------------------------------- +# Lease requests do not acquire latches +# ------------------------------------------------------------- + +single-request name=reqlease1 type=request-lease +---- + +batch-request name=leaseReq txn=none ts=10,1 reqs=reqlease1 +---- + +sequence req=leaseReq +---- +[1] sequence leaseReq: sequencing request +[1] sequence leaseReq: not acquiring latches +[1] sequence leaseReq: sequencing complete, returned guard + +finish req=leaseReq +---- +[2] finish leaseReq: finishing request + +reset +---- diff --git a/pkg/storage/spanset/spanset.go b/pkg/storage/spanset/spanset.go index da68a2479ee5..ac01b7ca0065 100644 --- a/pkg/storage/spanset/spanset.go +++ b/pkg/storage/spanset/spanset.go @@ -132,6 +132,16 @@ func (s *SpanSet) AddMVCC(access SpanAccess, span roachpb.Span, timestamp hlc.Ti s.spans[access][scope] = append(s.spans[access][scope], Span{Span: span, Timestamp: timestamp}) } +// Merge merges all spans in s2 into s. s2 is not modified. +func (s *SpanSet) Merge(s2 *SpanSet) { + for sa := SpanAccess(0); sa < NumSpanAccess; sa++ { + for ss := SpanScope(0); ss < NumSpanScope; ss++ { + s.spans[sa][ss] = append(s.spans[sa][ss], s2.spans[sa][ss]...) + } + } + s.SortAndDedup() +} + // SortAndDedup sorts the spans in the SpanSet and removes any duplicates. func (s *SpanSet) SortAndDedup() { for sa := SpanAccess(0); sa < NumSpanAccess; sa++ { diff --git a/pkg/storage/spanset/spanset_test.go b/pkg/storage/spanset/spanset_test.go index 45912d5f4cac..23bb527cd8e1 100644 --- a/pkg/storage/spanset/spanset_test.go +++ b/pkg/storage/spanset/spanset_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" ) // Test that spans are properly classified as global or local and that @@ -48,6 +49,45 @@ func TestSpanSetGetSpansScope(t *testing.T) { } } +func TestSpanSetMerge(t *testing.T) { + defer leaktest.AfterTest(t)() + + spA := roachpb.Span{Key: roachpb.Key("a")} + spBC := roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")} + spCE := roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("e")} + spBE := roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("e")} + spLocal := roachpb.Span{Key: keys.RangeLastGCKey(1)} + + var ss SpanSet + ss.AddNonMVCC(SpanReadOnly, spLocal) + ss.AddNonMVCC(SpanReadOnly, spA) + ss.AddNonMVCC(SpanReadWrite, spBC) + require.Equal(t, []Span{{Span: spLocal}}, ss.GetSpans(SpanReadOnly, SpanLocal)) + require.Equal(t, []Span{{Span: spA}}, ss.GetSpans(SpanReadOnly, SpanGlobal)) + require.Equal(t, []Span{{Span: spBC}}, ss.GetSpans(SpanReadWrite, SpanGlobal)) + + var ss2 SpanSet + ss2.AddNonMVCC(SpanReadWrite, spCE) + require.Nil(t, ss2.GetSpans(SpanReadOnly, SpanLocal)) + require.Nil(t, ss2.GetSpans(SpanReadOnly, SpanGlobal)) + require.Equal(t, []Span{{Span: spCE}}, ss2.GetSpans(SpanReadWrite, SpanGlobal)) + + // Merge merges all spans. Notice the new spBE span. + ss2.Merge(&ss) + require.Equal(t, []Span{{Span: spLocal}}, ss2.GetSpans(SpanReadOnly, SpanLocal)) + require.Equal(t, []Span{{Span: spA}}, ss2.GetSpans(SpanReadOnly, SpanGlobal)) + require.Equal(t, []Span{{Span: spBE}}, ss2.GetSpans(SpanReadWrite, SpanGlobal)) + + // The source set is not mutated on future changes to the merged set. + ss2.AddNonMVCC(SpanReadOnly, spCE) + require.Equal(t, []Span{{Span: spLocal}}, ss.GetSpans(SpanReadOnly, SpanLocal)) + require.Equal(t, []Span{{Span: spA}}, ss.GetSpans(SpanReadOnly, SpanGlobal)) + require.Equal(t, []Span{{Span: spBC}}, ss.GetSpans(SpanReadWrite, SpanGlobal)) + require.Equal(t, []Span{{Span: spLocal}}, ss2.GetSpans(SpanReadOnly, SpanLocal)) + require.Equal(t, []Span{{Span: spA}, {Span: spCE}}, ss2.GetSpans(SpanReadOnly, SpanGlobal)) + require.Equal(t, []Span{{Span: spBE}}, ss2.GetSpans(SpanReadWrite, SpanGlobal)) +} + // Test that CheckAllowed properly enforces span boundaries. func TestSpanSetCheckAllowedBoundaries(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 3a142014cd50..18296b763c12 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -2173,6 +2173,14 @@ func (s *Store) Metrics() *StoreMetrics { return s.metrics } +// NodeDescriptor returns the NodeDescriptor of the node that holds the Store. +func (s *Store) NodeDescriptor() *roachpb.NodeDescriptor { + return s.nodeDesc +} + +// Silence unused warning. +var _ = (*Store).NodeDescriptor + // Descriptor returns a StoreDescriptor including current store // capacity information. func (s *Store) Descriptor(useCached bool) (*roachpb.StoreDescriptor, error) {