Skip to content

Commit

Permalink
storage: integrate Concurrency Manager into Replica
Browse files Browse the repository at this point in the history
Related to #41720.
Related to #44976.

This commit integrates the new concurrency package into the storage
package. Each Replica is given a concurrency manager, which replaces
its existing latch manager and txn wait queue. The change also uses
the concurrency manager to simplify the role of the intent resolver.
The intent resolver no longer directly handles WriteIntentErrors. As
a result, we are able to delete the contention queue entirely.

With this change, all requests are now sequenced through the concurrency
manager. When sequencing, latches are acquired and conflicting locks are
detected. If any locks are found, the requests wait in lock wait-queues
for the locks to be resolved. This is a major deviation from how things
currently work because today, even with the contention queue, requests
end up waiting for conflicting transactions to commit/abort in the
txnWaitQueue after at least one RPC. Now, requests wait directly next
to the intents/locks that they are waiting on and react directly to the
resolution of these intents/locks.

Once requests are sequenced by the concurrency manager, they are
theoretically fully isolated from all conflicting requests. However,
this is not strictly true today because we have not yet pulled all
replicated locks into the concurrency manager's lock table. We will
do so in a future change. Until then, the concurrency manager maintains
a notion of "intent discovery", which is integrated into the Replica-level
concurrency retry loop.

Performance numbers will be published shortly. This will be followed
by performance numbers using the SELECT FOR UPDATE locking (#40205)
improvements that this change enables.
  • Loading branch information
nvanbenschoten committed Feb 28, 2020
1 parent a986d61 commit fff4003
Show file tree
Hide file tree
Showing 30 changed files with 389 additions and 1,492 deletions.
10 changes: 0 additions & 10 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,6 @@ func (ba *BatchRequest) IsSinglePushTxnRequest() bool {
return false
}

// IsSingleQueryTxnRequest returns true iff the batch contains a single
// request, and that request is for a QueryTxn.
func (ba *BatchRequest) IsSingleQueryTxnRequest() bool {
if ba.IsSingleRequest() {
_, ok := ba.Requests[0].GetInner().(*QueryTxnRequest)
return ok
}
return false
}

// IsSingleHeartbeatTxnRequest returns true iff the batch contains a single
// request, and that request is a HeartbeatTxn.
func (ba *BatchRequest) IsSingleHeartbeatTxnRequest() bool {
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/tests/monotonic_insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/storage/concurrency"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -107,6 +108,10 @@ func testMonotonicInserts(t *testing.T, distSQLMode sessiondata.DistSQLExecMode)
st := server.ClusterSettings()
st.Manual.Store(true)
sql.DistSQLClusterExecMode.Override(&st.SV, int64(distSQLMode))
// Let transactions push immediately to detect deadlocks. The test creates a
// large amount of contention and dependency cycles, and could take a long
// time to complete without this.
concurrency.LockTableDeadlockDetectionPushDelay.Override(&st.SV, 0)
}

var clients []mtClient
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/batcheval/cmd_query_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,6 @@ func QueryTxn(
}

// Get the list of txns waiting on this txn.
reply.WaitingTxns = cArgs.EvalCtx.GetTxnWaitQueue().GetDependents(args.Txn.ID)
reply.WaitingTxns = cArgs.EvalCtx.GetConcurrencyManager().GetDependents(args.Txn.ID)
return result.Result{}, nil
}
6 changes: 3 additions & 3 deletions pkg/storage/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/abortspan"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/concurrency"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -53,7 +53,7 @@ type EvalContext interface {
Clock() *hlc.Clock
DB() *client.DB
AbortSpan() *abortspan.AbortSpan
GetTxnWaitQueue() *txnwait.Queue
GetConcurrencyManager() concurrency.Manager
GetLimiters() *Limiters

NodeID() roachpb.NodeID
Expand Down Expand Up @@ -149,7 +149,7 @@ func (m *mockEvalCtxImpl) GetLimiters() *Limiters {
func (m *mockEvalCtxImpl) AbortSpan() *abortspan.AbortSpan {
return m.MockEvalCtx.AbortSpan
}
func (m *mockEvalCtxImpl) GetTxnWaitQueue() *txnwait.Queue {
func (m *mockEvalCtxImpl) GetConcurrencyManager() concurrency.Manager {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) NodeID() roachpb.NodeID {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,7 @@ func TestStoreRangeMergeInFlightTxns(t *testing.T) {
t.Fatal(err)
}
for {
if _, ok := repl.GetTxnWaitQueue().TrackedTxns()[txn1.ID()]; ok {
if _, ok := repl.GetConcurrencyManager().TxnWaitQueue().TrackedTxns()[txn1.ID()]; ok {
break
}
select {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2283,7 +2283,7 @@ func TestStoreTxnWaitQueueEnabledOnSplit(t *testing.T) {
}

rhsRepl := store.LookupReplica(roachpb.RKey(keys.UserTableDataMin))
if !rhsRepl.IsTxnWaitQueueEnabled() {
if !rhsRepl.GetConcurrencyManager().TxnWaitQueue().IsEnabled() {
t.Errorf("expected RHS replica's push txn queue to be enabled post-split")
}
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/storage/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)
Expand Down Expand Up @@ -281,6 +282,12 @@ type MetricExporter interface {
// lockTable.
LockTableDebug() string

// TxnWaitQueue returns the concurrency manager's txnWaitQueue.
// TODO(nvanbenschoten): this doesn't really fit into this interface. It
// would be nice if the txnWaitQueue was hidden behind the concurrency
// manager abstraction entirely, but tests want to access it directly.
TxnWaitQueue() *txnwait.Queue

// TODO(nvanbenschoten): fill out this interface to provide observability
// into the state of the concurrency manager.
// LatchMetrics()
Expand Down Expand Up @@ -327,7 +334,7 @@ type Request struct {
// Guard is returned from Manager.SequenceReq. The guard is passed back in to
// Manager.FinishReq to release the request's resources when it has completed.
type Guard struct {
req Request
Req Request
lg latchGuard
ltg lockTableGuard
}
Expand Down
55 changes: 43 additions & 12 deletions pkg/storage/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/storage/spanlatch"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -55,8 +56,9 @@ type Config struct {
TxnWaitMetrics *txnwait.Metrics
SlowLatchGauge *metric.Gauge
// Configs + Knobs.
MaxLockTableSize int64
TxnWaitKnobs txnwait.TestingKnobs
MaxLockTableSize int64
DisableTxnPushing bool
TxnWaitKnobs txnwait.TestingKnobs
}

func (c *Config) initDefaults() {
Expand All @@ -82,10 +84,11 @@ func NewManager(cfg Config) Manager {
maxLocks: cfg.MaxLockTableSize,
},
ltw: &lockTableWaiterImpl{
nodeID: cfg.NodeDesc.NodeID,
st: cfg.Settings,
stopper: cfg.Stopper,
ir: cfg.IntentResolver,
nodeID: cfg.NodeDesc.NodeID,
st: cfg.Settings,
stopper: cfg.Stopper,
ir: cfg.IntentResolver,
disableTxnPushing: cfg.DisableTxnPushing,
},
// TODO(nvanbenschoten): move pkg/storage/txnwait to a new
// pkg/storage/concurrency/txnwait package.
Expand All @@ -110,7 +113,7 @@ func (m *managerImpl) SequenceReq(
log.Event(ctx, "sequencing request")
} else {
g = prev
g.assertNoLatches()
g.AssertNoLatches()
log.Event(ctx, "re-sequencing request")
}

Expand Down Expand Up @@ -156,14 +159,14 @@ func (m *managerImpl) sequenceReqWithGuard(

// Scan for conflicting locks.
log.Event(ctx, "scanning lock table for conflicting locks")
g.ltg = m.lt.ScanAndEnqueue(g.req, g.ltg)
g.ltg = m.lt.ScanAndEnqueue(g.Req, g.ltg)

// Wait on conflicting locks, if necessary.
if g.ltg.ShouldWait() {
m.lm.Release(g.moveLatchGuard())

log.Event(ctx, "waiting in lock wait-queues")
if err := m.ltw.WaitOn(ctx, g.req, g.ltg); err != nil {
if err := m.ltw.WaitOn(ctx, g.Req, g.ltg); err != nil {
return nil, err
}
continue
Expand Down Expand Up @@ -240,6 +243,11 @@ func (m *managerImpl) FinishReq(g *Guard) {
func (m *managerImpl) HandleWriterIntentError(
ctx context.Context, g *Guard, t *roachpb.WriteIntentError,
) *Guard {
if g.ltg == nil {
log.Fatalf(ctx, "cannot handle WriteIntentError %v for request without "+
"lockTableGuard; were lock spans declared for this request?", t)
}

// Add a discovered lock to lock-table for each intent and enter each lock's
// wait-queue.
for i := range t.Intents {
Expand Down Expand Up @@ -336,6 +344,11 @@ func (m *managerImpl) LockTableDebug() string {
return m.lt.String()
}

// TxnWaitQueue implements the MetricExporter interface.
func (m *managerImpl) TxnWaitQueue() *txnwait.Queue {
return m.twq.(*txnwait.Queue)
}

func (r *Request) isSingle(m roachpb.Method) bool {
if len(r.Requests) != 1 {
return false
Expand All @@ -345,11 +358,29 @@ func (r *Request) isSingle(m roachpb.Method) bool {

func newGuard(req Request) *Guard {
// TODO(nvanbenschoten): Pool these guard objects.
return &Guard{req: req}
return &Guard{Req: req}
}

// LatchSpans returns the maximal set of spans that the request will access.
func (g *Guard) LatchSpans() *spanset.SpanSet {
return g.Req.LatchSpans
}

// HoldingLatches returned whether the guard is holding latches or not.
func (g *Guard) HoldingLatches() bool {
return g != nil && g.lg != nil
}

// AssertLatches asserts that the guard is non-nil and holding latches.
func (g *Guard) AssertLatches() {
if !g.HoldingLatches() {
panic("expected latches held, found none")
}
}

func (g *Guard) assertNoLatches() {
if g.lg != nil {
// AssertNoLatches asserts that the guard is non-nil and not holding latches.
func (g *Guard) AssertNoLatches() {
if g.HoldingLatches() {
panic("unexpected latches held")
}
}
Expand Down
28 changes: 22 additions & 6 deletions pkg/storage/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type lockTableWaiterImpl struct {
st *cluster.Settings
stopper *stop.Stopper
ir IntentResolver

// When set, WriteIntentError are propagated instead of pushing.
disableTxnPushing bool
}

// IntentResolver is an interface used by lockTableWaiterImpl to push
Expand Down Expand Up @@ -159,7 +162,10 @@ func (w *lockTableWaiterImpl) WaitOn(
// had a cache of aborted transaction IDs that allowed us to notice
// and quickly resolve abandoned intents then we might be able to
// get rid of this state.
delay := LockTableLivenessPushDelay.Get(&w.st.SV)
delay := minDuration(
LockTableLivenessPushDelay.Get(&w.st.SV),
LockTableDeadlockDetectionPushDelay.Get(&w.st.SV),
)
if hasMinPriority(state.txn) || hasMaxPriority(req.Txn) {
// However, if the pushee has the minimum priority or if the
// pusher has the maximum priority, push immediately.
Expand Down Expand Up @@ -224,6 +230,12 @@ func (w *lockTableWaiterImpl) WaitOn(
}

func (w *lockTableWaiterImpl) pushTxn(ctx context.Context, req Request, ws waitingState) *Error {
if w.disableTxnPushing {
return roachpb.NewError(&roachpb.WriteIntentError{
Intents: []roachpb.Intent{roachpb.MakeIntent(ws.txn, ws.key)},
})
}

h := roachpb.Header{
Timestamp: req.Timestamp,
UserPriority: req.Priority,
Expand All @@ -242,12 +254,9 @@ func (w *lockTableWaiterImpl) pushTxn(ctx context.Context, req Request, ws waiti
// after our operation started. This allows us to not have to
// restart for uncertainty as we come back and read.
obsTS, ok := h.Txn.GetObservedTimestamp(w.nodeID)
if !ok {
// This was set earlier, so it's completely unexpected to
// not be found now.
return roachpb.NewErrorf("missing observed timestamp: %+v", h.Txn)
if ok {
h.Timestamp.Forward(obsTS)
}
h.Timestamp.Forward(obsTS)
}

var pushType roachpb.PushTxnType
Expand Down Expand Up @@ -288,3 +297,10 @@ func hasMinPriority(txn *enginepb.TxnMeta) bool {
func hasMaxPriority(txn *roachpb.Transaction) bool {
return txn != nil && txn.Priority == enginepb.MaxTxnPriority
}

func minDuration(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}
4 changes: 0 additions & 4 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,6 @@ func (r *Replica) IsQuiescent() bool {
return r.mu.quiescent
}

func (r *Replica) IsTxnWaitQueueEnabled() bool {
return r.txnWaitQueue.IsEnabled()
}

// GetQueueLastProcessed returns the last processed timestamp for the
// specified queue, or the zero timestamp if not available.
func (r *Replica) GetQueueLastProcessed(ctx context.Context, queue string) (hlc.Timestamp, error) {
Expand Down
Loading

0 comments on commit fff4003

Please sign in to comment.