-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathlock_table.go
1817 lines (1698 loc) · 60.8 KB
/
lock_table.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package concurrency
import (
"container/list"
"fmt"
"sort"
"strings"
"sync/atomic"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
// The kind of waiting that the request is subject to. See the detailed comment
// above for the meaning of each kind.
type stateKind int
const (
waitForDistinguished stateKind = iota
waitFor
waitElsewhere
waitSelf
doneWaiting
)
// The current waiting state of the request. See the detailed comment above.
type waitingState struct {
stateKind stateKind
// Populated for waitFor* and waitElsewhere type, and represents who the
// request is waiting for.
txn *enginepb.TxnMeta // always non-nil
ts hlc.Timestamp // the timestamp of the transaction that is causing the wait
key roachpb.Key // the key of the lock that is causing the wait
held bool // is the lock currently held?
access spanset.SpanAccess // currently only SpanReadWrite
guardAccess spanset.SpanAccess // the access method of the guard
}
// Implementation
// TODO(sbhola):
// - proper error strings and give better explanation to all panics.
// - metrics about lockTable state to export to observability debug pages:
// number of locks, number of waiting requests, wait time?, ...
// The btree for a particular SpanScope.
type treeMu struct {
mu syncutil.RWMutex // Protects everything in this struct.
lockIDSeqNum uint64
// Container for lockState structs. Locks that are not held or reserved and
// have no waiting requests are garbage collected. Additionally, locks that
// are only held with Replicated durability and have no waiting requests may
// also be garbage collected since their state can be recovered from
// persistent storage.
btree
// For constraining memory consumption. We need better memory accounting
// than this.
numLocks int64
}
// lockTableImpl is an implementation of lockTable.
//
// Concurrency: in addition to holding latches, we require for a particular
// request ScanAndEnqueue() and CurState() must be called by the same
// thread.
//
// Mutex ordering: treeMu.mu > lockState.mu > lockTableGuardImpl.mu
type lockTableImpl struct {
// A sequence number is assigned to each request seen by the lockTable. This
// is to preserve fairness despite the design choice of allowing
// out-of-order evaluation of requests with overlapping spans where the
// latter request does not encounter contention. This out-of-order
// evaluation happens because requests do not reserve spans that are
// uncontended while they wait for on contended locks after releasing their
// latches. Consider the following examples:
//
// Example 1:
// - req1 wants to write to A, B
// - req2 wants to write to B
// - lock at A is held by some other txn.
// - Even though req2 arrives later, req1 will wait only in the queue for A
// and allow req2 to proceed to evaluation.
//
// Example 2:
// - Same as example 1 but lock at A is held by txn3 and lock at B is held
// by txn4.
// - Lock at A is released so req1 acquires the reservation at A and starts
// waiting at B.
// - It is unfair for req1 to wait behind req2 at B. The sequence number
// assigned to req1 and req2 will restore the fairness by making req1
// wait before req2.
//
// Example 3: Deadlock in lock table if it did not use sequence numbers.
// - Lock at B is acquired by txn0.
// - req1 (from txn1) arrives at lockTable and wants to write to A and B.
// It queues at B.
// - req2 (from txn2) arrives at lockTable and only wants to write A.
// It proceeds to evaluation and acquires the lock at A for txn2 and then
// the request is done. The lock is still held.
// - req3 (from txn3) wants to write to A and B. It queues at A.
// - txn2 releases A. req3 is in the front of the queue at A and gets the
// reservation and starts waiting at B behind req1.
// - txn0 releases B. req1 gets the reservation at B and does another scan
// and adds itself to the queue at A, behind req3 which holds the
// reservation at A.
// Now in the queues for A and B req1 is behind req3 and vice versa and
// this deadlock has been created entirely due to the lock table's behavior.
seqNum uint64
locks [spanset.NumSpanScope]treeMu
maxLocks int64
}
func newLockTable(maxLocks int64) lockTable {
lt := &lockTableImpl{maxLocks: maxLocks}
return lt
}
var _ lockTable = &lockTableImpl{}
// lockTableGuardImpl is an implementation of lockTableGuard.
//
// The struct is a guard that is returned to the request the first time it calls
// lockTable.ScanAndEnqueue() and used in later calls to ScanAndEnqueue() and
// done(). After a call to ScanAndEnqueue() (which is made while holding
// latches), the caller must first call lockTableGuard.StartWaiting() and if it
// returns true release the latches and continue interacting with the
// lockTableGuard. If StartWaiting() returns false, the request can proceed to
// evaluation.
//
// Waiting logic: The interface hides the queues that the request is waiting on,
// and the request's position in the queue. One of the reasons for this hiding
// is that queues are not FIFO since a request that did not wait on a queue for
// key k in a preceding call to ScanAndEnqueue() (because k was not locked and
// there was no queue) may need to wait on the queue in a later call to
// ScanAndEnqueue(). So sequencing of requests arriving at the lockTable is
// partially decided by a sequence number assigned to a request when it first
// called ScanAndEnqueue() and queues are ordered by this sequence number.
// However the sequencing is not fully described by the sequence numbers -- a
// request R1 encountering contention over some keys in its span does not
// prevent a request R2 that has a higher sequence number and overlapping span
// to proceed if R2 does not encounter contention. This concurrency (that is not
// completely fair) is deemed desirable.
//
// The interface exposes an abstracted version of the waiting logic in a way
// that the request that starts waiting is considered waiting for at most one
// other request or transaction. This is exposed as a series of state
// transitions where the transitions are notified via newState() and the current
// state can be read using CurState().
//
// - The waitFor* states provide information on who the request is waiting for.
// The waitForDistinguished state is a sub-case -- a distinguished waiter is
// responsible for taking extra actions e.g. immediately pushing the transaction
// it is waiting for. The implementation ensures that if there are multiple
// requests in waitFor state waiting on the same transaction at least one will
// be a distinguished waiter.
//
// TODO(sbhola): investigate removing the waitForDistinguished state which
// will simplify the code here. All waitFor requests would wait (currently
// 50ms) before pushing the transaction (for deadlock detection) they are
// waiting on, say T. Typically T will be done before 50ms which is considered
// ok: the one exception we will need to make is if T has the min priority or
// the waiting transaction has max priority -- in both cases it will push
// immediately. The bad case is if T is ABORTED: the push will succeed after,
// and if T left N intents, each push would wait for 50ms, incurring a latency
// of 50*N ms. A cache of recently encountered ABORTED transactions on each
// Store should mitigate this latency increase. Whenever a transaction sees a
// waitFor state, it will consult this cache and if T is found, push
// immediately (if there isn't already a push in-flight) -- even if T is not
// initially in the cache, the first push will place it in the cache, so the
// maximum latency increase is 50ms.
//
// - The waitElsewhere state is a rare state that is used when the lockTable is
// under memory pressure and is clearing its internal queue state. Like the
// waitFor* states, it informs the request who it is waiting for so that
// deadlock detection works. However, sequencing information inside the
// lockTable is mostly discarded.
//
// - The waitSelf state is a rare state when a different request from the same
// transaction has a reservation. See the comment about "Reservations" in
// lockState.
//
// - The doneWaiting state is used to indicate that the request should make
// another call to ScanAndEnqueue() (that next call is more likely to return a
// lockTableGuard that returns false from StartWaiting()).
type lockTableGuardImpl struct {
seqNum uint64
// Information about this request.
txn *enginepb.TxnMeta
spans *spanset.SpanSet
ts hlc.Timestamp
// Snapshots of the trees for which this request has some spans. Note that
// the lockStates in these snapshots may have been removed from
// lockTableImpl. Additionally, it is possible that there is a new lockState
// for the same key. This can result in various harmless anomalies:
// - the request may hold a reservation on a lockState that is no longer
// in the tree. When it next does a scan, it will either find a new
// lockState where it will compete or none. Both lockStates can be in
// the mu.locks map, which is harmless.
// - the request may wait behind a reservation holder that is not the
// lock holder. This could cause a delay in pushing the lock holder.
// This is not a correctness issue (the whole system is not deadlocked)
// and we expect will not be a real performance issue.
tableSnapshot [spanset.NumSpanScope]btree
// A request whose startWait is set to true in ScanAndEnqueue is actively
// waiting at a particular key. This is the first key encountered when
// iterating through spans that it needs to wait at. A future event (lock
// release etc.) may cause the request to no longer need to wait at this
// key. It then needs to continue iterating through spans to find the next
// key to wait at (we don't want to wastefully start at the beginning since
// this request probably has a reservation at the contended keys there): sa,
// ss, index, key collectively track the current position to allow it to
// continue iterating.
// The key for the lockState.
key roachpb.Key
// The key for the lockState is contained in the Span specified by
// spans[sa][ss][index].
ss spanset.SpanScope
sa spanset.SpanAccess // Iterates from stronger to weaker strength
index int
mu struct {
syncutil.Mutex
startWait bool
state waitingState
signal chan struct{}
// locks for which this request has a reservation or is in the queue of
// writers (active or inactive) or actively waiting as a reader.
//
// TODO(sbhola): investigate whether the logic to maintain this locks map
// can be simplified so it doesn't need to be adjusted by various
// lockState methods. It adds additional bookkeeping burden that means it
// is more prone to inconsistencies. There are two main uses: (a) removing
// from various lockStates when done() is called, (b) tryActiveWait() uses
// it as an optimization to know that this request is not known to the
// lockState. (b) can be handled by other means -- the first scan the
// request won't be in the lockState and the second scan it likely will.
// (a) doesn't necessarily require this map to be consistent -- the
// request could track the places where it is has enqueued as places where
// it could be present and then do the search.
locks map[*lockState]struct{}
// If this is true, the state has changed and the channel has been
// signaled, but what the state should be has not been computed. The call
// to CurState() needs to compute that current state. Deferring the
// computation makes the waiters do this work themselves instead of making
// the call to release/update locks or release reservations do this work
// (proportional to number of waiters).
mustFindNextLockAfter bool
}
}
var _ lockTableGuard = &lockTableGuardImpl{}
func (g *lockTableGuardImpl) ShouldWait() bool {
g.mu.Lock()
defer g.mu.Unlock()
return g.mu.startWait
}
func (g *lockTableGuardImpl) NewStateChan() <-chan struct{} {
g.mu.Lock()
defer g.mu.Unlock()
return g.mu.signal
}
func (g *lockTableGuardImpl) CurState() waitingState {
g.mu.Lock()
defer g.mu.Unlock()
if !g.mu.mustFindNextLockAfter {
return g.mu.state
}
// Not actively waiting anywhere so no one else can set
// mustFindNextLockAfter to true while this method executes.
g.mu.mustFindNextLockAfter = false
g.mu.Unlock()
g.findNextLockAfter(false /* notify */)
g.mu.Lock() // Unlock deferred
return g.mu.state
}
func (g *lockTableGuardImpl) notify() {
select {
case g.mu.signal <- struct{}{}:
default:
}
}
// Called when the request is no longer actively waiting at lock l, and should
// look for the next lock to wait at. hasReservation is true iff the request
// acquired the reservation at l. Note that it will be false for requests that
// were doing a read at the key, or non-transactional writes at the key.
func (g *lockTableGuardImpl) doneWaitingAtLock(hasReservation bool, l *lockState) {
g.mu.Lock()
if !hasReservation {
delete(g.mu.locks, l)
}
g.mu.mustFindNextLockAfter = true
g.notify()
g.mu.Unlock()
}
func (g *lockTableGuardImpl) isTxn(txn *enginepb.TxnMeta) bool {
return g.txn != nil && g.txn.ID == txn.ID
}
// Finds the next lock, after the current one, to actively wait at. If it
// finds the next lock the request starts actively waiting there, else it is
// told that it is done waiting.
// Acquires g.mu.
func (g *lockTableGuardImpl) findNextLockAfter(notify bool) {
spans := g.spans.GetSpans(g.sa, g.ss)
var span *spanset.Span
resumingInSameSpan := false
if g.index == -1 || len(spans[g.index].EndKey) == 0 {
span = stepToNextSpan(g)
} else {
span = &spans[g.index]
resumingInSameSpan = true
}
for span != nil {
startKey := span.Key
if resumingInSameSpan {
startKey = g.key
}
tree := g.tableSnapshot[g.ss]
iter := tree.MakeIter()
// From here on, the use of resumingInSameSpan is just a performance
// optimization to deal with the interface limitation of btree that
// prevents us from specifying an exclusive start key. We need to check
// that the lock is not the same as our exclusive start key and only need
// to do that check once -- for the first lock.
iter.SeekGE(&lockState{key: startKey})
for ; iter.Valid() && ((len(span.EndKey) == 0 && iter.Cur().key.Equal(span.Key)) ||
(len(span.EndKey) > 0 && iter.Cur().key.Compare(span.EndKey) < 0)); iter.Next() {
l := iter.Cur()
if resumingInSameSpan {
resumingInSameSpan = false
if l.key.Equal(startKey) {
// This lock is where it stopped waiting.
continue
}
// Else, past the lock where it stopped waiting. We may not
// encounter that lock since it may have been garbage collected.
}
if l.tryActiveWait(g, g.sa, notify) {
return
}
}
resumingInSameSpan = false
span = stepToNextSpan(g)
}
g.mu.Lock()
defer g.mu.Unlock()
g.mu.state.stateKind = doneWaiting
if notify {
g.notify()
}
}
// Waiting writers in a lockState are wrapped in a queuedGuard. A waiting
// writer is typically waiting in an active state, i.e., the
// lockTableGuardImpl.key refers to this lockState. However, breaking of
// reservations (see the comment on reservations below, in lockState) can
// cause a writer to be an inactive waiter.
type queuedGuard struct {
guard *lockTableGuardImpl
active bool // protected by lockState.mu
}
// Information about a lock holder.
type lockHolderInfo struct {
// nil if there is no holder. Else this is the TxnMeta of the latest call to
// acquire/update the lock by this transaction. For a given transaction if
// the lock is continuously held by a succession of different TxnMetas, the
// epoch must be monotonic and the ts (derived from txn.WriteTimestamp for
// some calls, and request.ts for other calls) must be monotonic.
txn *enginepb.TxnMeta
// All the TxnSeqs in the current epoch at which this lock has been
// acquired. In increasing order. We track these so that if a lock is
// acquired at both seq 5 and seq 7, rollback of 7 does not cause the lock
// to be released. This is also consistent with PostgreSQL semantics
// https://www.postgresql.org/docs/12/sql-select.html#SQL-FOR-UPDATE-SHARE
seqs []enginepb.TxnSeq
// The timestamp at which the lock is held.
ts hlc.Timestamp
}
// Per lock state in lockTableImpl.
type lockState struct {
id uint64 // needed for implementing util/interval/generic type contract
endKey []byte // unused except for btree tests
// The key being locked and the scope of that key. This state is never
// mutated.
key roachpb.Key
ss spanset.SpanScope
mu syncutil.Mutex // Protects everything below.
// Invariant summary (see detailed comments below):
// - both holder.locked and waitQ.reservation != nil cannot be true.
// - if holder.locked and multiple holderInfos have txn != nil: all the
// txns must have the same txn.ID.
// - !holder.locked => waitingReaders.Len() == 0. That is, readers wait
// only if the lock is held. They do not wait for a reservation.
// - If reservation != nil, that request is not in queuedWriters.
// Information about whether the lock is held and the holder. We track
// information for each durability level separately since a transaction can
// go through multiple epochs and TxnSeq and may acquire the same lock in
// replicated and unreplicated mode at different stages.
holder struct {
locked bool
// LockStrength is always Exclusive
holder [lock.MaxDurability + 1]lockHolderInfo
}
// Information about the requests waiting on the lock.
lockWaitQueue
}
type lockWaitQueue struct {
// Reservations:
//
// A not-held lock can be "reserved". A reservation is just a claim that
// prevents multiple requests from racing when the lock is released. A
// reservation by req2 can be broken by req1 is req1 has a smaller seqNum
// than req2. Only requests that specify SpanReadWrite for a key can make
// reservations. This means a reservation can only be made when the lock is
// not held, since the reservation (which can acquire an Exclusive lock) and
// the lock holder (which is an Exclusive lock) conflict.
//
// Read reservations are not permitted due to the complexities discussed in
// the review for #43740. Additionally, reads do not queue for their turn at
// all -- they are held in the waitingReaders list while the lock is held
// and removed when the lock is not released, so they race with
// reservations. Let us consider scenarios where reads did wait in the same
// queue: the lock could be held or reserved by a write at ts=20, followed
// by a waiting writer at ts=18, writer at ts=10, reader at ts=12. That
// reader is waiting not because of a conflict with the holder, or reserver,
// or the first waiter, but because there is a waiter ahead of it which it
// conflicts with. This introduces more complexity in tracking who this
// reader should push. Also consider a scenario where a reader did not wait
// in the queue and waited on the side like in waitingReaders but acquired a
// read reservation (together with other readers) when the lock was
// released. Ignoring the unfairness of this, we can construct a deadlock
// scenario with request req1 with seqnum 1 and req2 with seqnum 2 where
// req1 and req2 both want to write at one key and so get ordered by their
// seqnums but at another key req2 wants to read and req1 wants to write and
// since req2 does not wait in the queue it acquires a read reservation
// before req1. See the discussion at the end of this comment section on how
// the behavior will extend when we start supporting Shared and Upgrade
// locks.
//
// Non-transactional requests can do both reads and writes but cannot be
// depended on since they don't have a transaction that can be pushed.
// Therefore they not only do not acquire locks, but cannot make reservations.
// The non-reservation for reads is already covered in the previous
// paragraph. For non-transactional writes, the request waits in the queue
// with other writers. The difference occurs:
// - when it gets to the front of the queue and there is no lock holder
// or reservation: instead of acquiring the reservation it removes
// itself from the lockState and proceeds to the next lock. If it
// does not need to wait for any more locks and manages to acquire
// latches before those locks are acquired by some other request, it
// will evaluate.
// - when deciding to wait at a lock: if the lock has a reservation with
// a sequence num higher than this non-transactional request it will
// ignore that reservation. Note that ignoring such reservations is
// safe since when this non-transactional request is holding latches
// those reservation holders cannot be holding latches, so they cannot
// conflict.
//
// Multiple requests from the same transaction wait independently, including
// the situation where one of the requests has a reservation and the other
// is waiting (currently this can only happen if both requests are doing
// SpanReadWrite). Making multiple requests from the same transaction
// jointly hold the reservation introduces code complexity since joint
// reservations can be partially broken (see deadlock example below), and is
// not necessarily fair to other requests. Additionally, if req1 from txn1
// is holding a a reservation and req2 from txn1 is waiting, they must
// conflict wrt latches and cannot evaluate concurrently so there isn't a
// benefit to joint reservations. However, if one of the requests acquires
// the lock the other request no longer needs to wait on this lock. This
// situation motivates the waitSelf state.
//
// Deadlock example if joint reservations were supported and we did not
// allow partial breaking of such reservations:
//
// - Keys are A, B, C, D.
// - Key D is locked by some random txn.
// - req1 from txn1 writes A, B, D. It waits at D.
// - Some other request from some random txn that writes C arrives,
// evaluates, and locks C.
// - req2 from txn2 that writes A, C. It waits at C.
// - Some other request from some random txn that writes A arrives,
// evaluates, and locks A.
// - req3 from txn1 that writes A, C. It waits at A. Note that req1 and req3
// are from the same txn.
// - A is unlocked. req3 reserves A and waits at C behind req2.
// - B is locked by some random txn.
// - D is unlocked. req1 reserves D and proceeds to scan again and finds A
// is reserved by req3 which is the same txn so becomes a joint
// reservation holder at A.
// - Since B is locked, req1 waits at B.
// - C is unlocked. req2 reserves C. It scans and finds req1+req3 holding
// the joint reservation at A. If it queues behind this joint reservation
// we have the following situation:
// reservation waiter
// A req1+req3 req2
// C req2 req3
// This is a deadlock caused by the lock table unless req2 partially
// breaks the reservation at A.
//
// Extension for Shared and Upgrade locks:
// There are 3 aspects to consider: holders; reservers; the dependencies
// that need to be captured when waiting.
//
// - Holders: only shared locks are compatible with themselves, so there can
// be one of (a) no holder (b) multiple shared lock holders, (c) one
// exclusive holder, (d) one upgrade holder. Non-locking reads will
// wait in waitingReaders for only an incompatible exclusive holder.
//
// - Reservers: This follows the same pattern as holders. Non-locking reads
// do not wait on reservers.
//
// - Queueing and dependencies: All potential lockers and non-transactional
// writers will wait in the same queue. A sequence of consecutive requests
// that have the potential to acquire a shared lock will jointly reserve
// that shared lock. Such requests cannot jump ahead of requests with a
// lower seqnum just because there is currently a shared lock reservation
// (this can cause lockTable induced deadlocks). Such joint reservations
// can be partially broken by a waiter desiring an exclusive or upgrade
// lock. Like the current code, non-transactional writes will wait for
// reservations that have a lower sequence num, but not make their own
// reservation. Additionally, they can partially break joint reservations.
//
// Reservations that are (partially or fully) broken cause requests to
// reenter the queue as inactive waiters. This is no different than the
// current behavior. Each request can specify the same key in spans for
// ReadOnly, ReadShared, ReadUpgrade, ReadWrite. The spans will be
// iterated over in decreasing order of strength, to only wait at a lock
// at the highest strength (this is similar to the current behavior using
// accessDecreasingStrength).
//
// For dependencies, a waiter desiring an exclusive or upgrade lock always
// conflicts with the holder(s) or reserver(s) so that is the dependency
// that will be captured. A waiter desiring a shared lock may encounter a
// situation where it does not conflict with the holder(s) or reserver(s)
// since those are also shared lockers. In that case it will depend on the
// first waiter since that waiter must be desiring a lock that is
// incompatible with a shared lock.
reservation *lockTableGuardImpl
// TODO(sbhola): There are a number of places where we iterate over these
// lists looking for something, as described below. If some of these turn
// out to be inefficient, consider better data-structures. One idea is that
// for cases that find a particular guard the lockTableGuardImpl.locks can be
// a map instead of a set to point directly to the *list.Element.
//
// queuedWriters:
// - to find all active queuedWriters.
// - to find the first active writer to make it distinguished.
// - to find a particular guard.
// - to find the position, based on seqNum, for inserting a particular guard.
// - to find all waiting writers with a particular txn ID.
//
// waitingReaders:
// - readers with a higher timestamp than some timestamp.
// - to find a particular guard.
// Waiters: An active waiter needs to be notified about changes in who it is
// waiting for.
// List of *queuedGuard. A subset of these are actively waiting. If
// non-empty, either the lock is held or there is a reservation.
queuedWriters list.List
// List of *lockTableGuardImpl. All of these are actively waiting. If
// non-empty, the lock must be held. By definition these cannot be in
// waitSelf state since that state is only used when there is a reservation.
waitingReaders list.List
// If there is a non-empty set of active waiters that are not waitSelf, then
// at least one must be distinguished.
distinguishedWaiter *lockTableGuardImpl
}
//go:generate ../../util/interval/generic/gen.sh *lockState concurrency
// Methods required by util/interval/generic type contract.
func (l *lockState) ID() uint64 { return l.id }
func (l *lockState) Key() []byte { return l.key }
func (l *lockState) EndKey() []byte { return l.endKey }
func (l *lockState) String() string { return string(l.key) }
func (l *lockState) New() *lockState { return new(lockState) }
func (l *lockState) SetID(v uint64) { l.id = v }
func (l *lockState) SetKey(v []byte) { l.key = v }
func (l *lockState) SetEndKey(v []byte) { l.endKey = v }
// Called for a write request when there is a reservation. Returns true iff it
// succeeds.
// REQUIRES: l.mu is locked.
func (l *lockState) tryBreakReservation(seqNum uint64) bool {
if l.reservation.seqNum > seqNum {
qg := &queuedGuard{
guard: l.reservation,
active: false,
}
l.queuedWriters.PushFront(qg)
l.reservation = nil
return true
}
return false
}
// Informs active waiters about reservation or lock holder. The reservation
// may have changed so this needs to fix any inconsistencies wrt waitSelf and
// waitForDistinguished states.
// REQUIRES: l.mu is locked.
func (l *lockState) informActiveWaiters() {
waitForTxn, waitForTs := l.getLockerInfo()
var checkForWaitSelf bool
findDistinguished := l.distinguishedWaiter == nil
if waitForTxn == nil {
checkForWaitSelf = true
waitForTxn = l.reservation.txn
waitForTs = l.reservation.ts
if !findDistinguished && l.distinguishedWaiter.isTxn(waitForTxn) {
findDistinguished = true
l.distinguishedWaiter = nil
}
}
waitForState := waitingState{
stateKind: waitFor,
txn: waitForTxn,
ts: waitForTs,
key: l.key,
held: l.holder.locked,
access: spanset.SpanReadWrite,
}
waitSelfState := waitingState{stateKind: waitSelf}
for e := l.waitingReaders.Front(); e != nil; e = e.Next() {
state := waitForState
state.guardAccess = spanset.SpanReadOnly
// Since there are waiting readers we could not have transitioned out of
// or into a state with a reservation, since readers do not wait for
// reservations.
g := e.Value.(*lockTableGuardImpl)
if findDistinguished {
l.distinguishedWaiter = g
findDistinguished = false
}
g.mu.Lock()
g.mu.state = state
if l.distinguishedWaiter == g {
g.mu.state.stateKind = waitForDistinguished
}
g.notify()
g.mu.Unlock()
}
for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
qg := e.Value.(*queuedGuard)
if !qg.active {
continue
}
g := qg.guard
var state waitingState
if checkForWaitSelf && g.isTxn(waitForTxn) {
state = waitSelfState
} else {
state = waitForState
state.guardAccess = spanset.SpanReadWrite
if findDistinguished {
l.distinguishedWaiter = g
findDistinguished = false
}
}
g.mu.Lock()
g.mu.state = state
if l.distinguishedWaiter == g {
g.mu.state.stateKind = waitForDistinguished
}
g.notify()
g.mu.Unlock()
}
}
// When the active waiters have shrunk and the distinguished waiter has gone,
// try to make a new distinguished waiter if there is at least 1 active
// waiter.
// REQUIRES: l.mu is locked.
func (l *lockState) tryMakeNewDistinguished() {
var g *lockTableGuardImpl
if l.waitingReaders.Len() > 0 {
g = l.waitingReaders.Front().Value.(*lockTableGuardImpl)
} else if l.queuedWriters.Len() > 0 {
for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
qg := e.Value.(*queuedGuard)
if qg.active && (l.reservation == nil || !qg.guard.isTxn(l.reservation.txn)) {
g = qg.guard
break
}
}
}
if g != nil {
l.distinguishedWaiter = g
g.mu.Lock()
g.mu.state.stateKind = waitForDistinguished
// The rest of g.state is already up-to-date.
g.notify()
g.mu.Unlock()
}
}
// Returns true iff the lock is currently held by the transaction with the
// given id.
// REQUIRES: l.mu is locked.
func (l *lockState) isLockedBy(id uuid.UUID) bool {
if l.holder.locked {
var holderID uuid.UUID
if l.holder.holder[lock.Unreplicated].txn != nil {
holderID = l.holder.holder[lock.Unreplicated].txn.ID
} else {
holderID = l.holder.holder[lock.Replicated].txn.ID
}
return id == holderID
}
return false
}
// Returns information about the current lock holder if the lock is held, else
// returns nil.
// REQUIRES: l.mu is locked.
func (l *lockState) getLockerInfo() (*enginepb.TxnMeta, hlc.Timestamp) {
if !l.holder.locked {
return nil, hlc.Timestamp{}
}
// If the lock is held as both replicated and unreplicated we want to
// provide the lower of the two timestamps, since the lower timestamp
// contends with more transactions. Else we provide whichever one it is held
// at.
// Start with the assumption that it is held as replicated.
index := lock.Replicated
// Condition under which we prefer the unreplicated holder.
if l.holder.holder[index].txn == nil || (l.holder.holder[lock.Unreplicated].txn != nil &&
// If we are evaluating the following clause we are sure that it is held
// as both replicated and unreplicated.
l.holder.holder[lock.Unreplicated].ts.Less(l.holder.holder[lock.Replicated].ts)) {
index = lock.Unreplicated
}
return l.holder.holder[index].txn, l.holder.holder[index].ts
}
// Decides whether the request g with access sa should actively wait at this
// lock and if yes, adjusts the data-structures appropriately. The notify
// parameter is true iff the request's new state channel should be notified --
// it is set to false when the call to tryActiveWait is happening due to an
// event for a different request or transaction (like a lock release) since in
// that case the channel is notified first and the call to tryActiveWait()
// happens later in lockTableGuard.CurState(). The return value is true iff
// it is actively waiting.
// Acquires l.mu, g.mu.
func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess, notify bool) bool {
l.mu.Lock()
defer l.mu.Unlock()
// It is possible that this lock is empty and has not yet been deleted.
if l.isEmptyLock() {
return false
}
// Lock is not empty.
waitForTxn, waitForTs := l.getLockerInfo()
if waitForTxn != nil && g.isTxn(waitForTxn) {
// Already locked by this txn.
return false
}
var reservedBySelfTxn bool
if waitForTxn == nil {
if l.reservation == g {
// Already reserved by this request.
return false
}
waitForTxn = l.reservation.txn
waitForTs = l.reservation.ts
reservedBySelfTxn = g.isTxn(waitForTxn)
// A non-transactional write request never makes or breaks reservations,
// and only waits for a reservation if the reservation has a lower seqNum.
// For reads, the non-transactional and transactional behavior is
// equivalent and handled later in this function.
if g.txn == nil && sa == spanset.SpanReadWrite && l.reservation.seqNum > g.seqNum {
// Reservation is held by a request with a higher seqNum and g is a
// non-transactional request. Ignore the reservation.
return false
}
}
if sa == spanset.SpanReadOnly {
if !l.holder.locked {
// Reads only care about locker, not a reservation.
return false
}
// Locked by some other txn.
if g.ts.Less(waitForTs) {
return false
}
g.mu.Lock()
_, alsoHasStrongerAccess := g.mu.locks[l]
g.mu.Unlock()
// If the request already has this lock in its locks map, it must also be
// writing to this key and must be either a reservation holder or inactive
// waiter at this lock. The former has already been handled above. For the
// latter, it must have had its reservation broken. Since this is a weaker
// access we defer to the stronger access and don't wait here.
//
// For non-transactional requests that have the key specified as both
// SpanReadOnly and SpanReadWrite, the request never acquires a
// reservation, so using the locks map to detect this duplication of the
// key is not possible. In the rare case, the lock is now held at a
// timestamp that is not compatible with this request and it will wait
// here -- there is no correctness issue with doing that.
if alsoHasStrongerAccess {
return false
}
}
// Incompatible with whoever is holding lock or reservation.
if l.reservation != nil && sa == spanset.SpanReadWrite && l.tryBreakReservation(g.seqNum) {
l.reservation = g
g.mu.Lock()
g.mu.locks[l] = struct{}{}
g.mu.Unlock()
// There cannot be waitingReaders, since they do not wait for
// reservations. And the set of active queuedWriters has not changed, but
// they do need to be told about the change in who they are waiting for.
l.informActiveWaiters()
return false
}
// Need to wait.
g.mu.Lock()
defer g.mu.Unlock()
if sa == spanset.SpanReadWrite {
if _, inQueue := g.mu.locks[l]; inQueue {
// Already in queue and must be in the right position, so mark as active
// waiter there. We expect this to be rare.
var qg *queuedGuard
for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
qqg := e.Value.(*queuedGuard)
if qqg.guard == g {
qg = qqg
break
}
}
if qg == nil {
panic("lockTable bug")
}
qg.active = true
} else {
// Not in queue so insert as active waiter.
qg := &queuedGuard{
guard: g,
active: true,
}
if l.queuedWriters.Len() == 0 {
l.queuedWriters.PushFront(qg)
} else {
var e *list.Element
for e = l.queuedWriters.Back(); e != nil; e = e.Prev() {
qqg := e.Value.(*queuedGuard)
if qqg.guard.seqNum < qg.guard.seqNum {
break
}
}
if e == nil {
l.queuedWriters.PushFront(qg)
} else {
l.queuedWriters.InsertAfter(qg, e)
}
}
g.mu.locks[l] = struct{}{}
}
} else {
l.waitingReaders.PushFront(g)
g.mu.locks[l] = struct{}{}
}
// Make it an active waiter.
g.key = l.key
g.mu.startWait = true
if reservedBySelfTxn {
g.mu.state = waitingState{stateKind: waitSelf}
} else {
stateType := waitFor
if l.distinguishedWaiter == nil {
l.distinguishedWaiter = g
stateType = waitForDistinguished
}
g.mu.state = waitingState{
stateKind: stateType,
txn: waitForTxn,
ts: waitForTs,
key: l.key,
held: l.holder.locked,
access: spanset.SpanReadWrite,
guardAccess: sa,
}
}
if notify {
g.notify()
}
return true
}
// Acquires this lock. Returns the list of guards that are done actively
// waiting at this key -- these will be requests from the same transaction
// that is acquiring the lock.
// Acquires l.mu.
func (l *lockState) acquireLock(
_ lock.Strength, durability lock.Durability, txn *enginepb.TxnMeta, ts hlc.Timestamp,
) error {
l.mu.Lock()
defer l.mu.Unlock()
if l.holder.locked {
// Already held.
beforeTxn, beforeTs := l.getLockerInfo()
if txn.ID != beforeTxn.ID {
return errors.Errorf("caller violated contract")
}
if l.holder.holder[durability].txn != nil && l.holder.holder[durability].txn.Epoch < txn.Epoch {
// Clear the sequences for the older epoch.
l.holder.holder[durability].seqs = l.holder.holder[durability].seqs[:0]
}
seqs := l.holder.holder[durability].seqs
add := true
if len(seqs) > 0 {
lastSeq := seqs[len(seqs)-1]
if lastSeq > txn.Sequence {
return errors.Errorf("caller violated contract")
}
if lastSeq == txn.Sequence {
// Idempotent lock acquisition.
add = false
}
}
if add {
l.holder.holder[durability].seqs = append(seqs, txn.Sequence)
}
l.holder.holder[durability].txn = txn
l.holder.holder[durability].ts = ts
_, afterTs := l.getLockerInfo()
if afterTs.Less(beforeTs) {
return errors.Errorf("caller violated contract")
} else if beforeTs.Less(afterTs) {
l.increasedLockTs(afterTs)
}
return nil
}
// Not already held, so may be reserved by this request. There is also the
// possibility that some other request has broken this reservation because