-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
store/tikv,executor: redesign the latch scheduler #7711
Changes from 1 commit
65a42d1
f055cd3
6fc4640
53be08c
12e57d7
cf7ee5c
a6f83e0
9ddc345
ec5f9e6
9613fa1
2d3937f
2b140d3
4f9ef4e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
package latch | ||
|
||
import ( | ||
"bytes" | ||
"math/bits" | ||
"sort" | ||
"sync" | ||
|
@@ -22,32 +23,25 @@ import ( | |
"github.com/spaolacci/murmur3" | ||
) | ||
|
||
// latch stores a key's waiting transactions information. | ||
type latch struct { | ||
// Whether there is any transaction in waitingQueue except head. | ||
hasMoreWaiting bool | ||
// The startTS of the transaction which is the head of waiting transactions. | ||
waitingQueueHead uint64 | ||
maxCommitTS uint64 | ||
sync.Mutex | ||
} | ||
|
||
func (l *latch) isEmpty() bool { | ||
return l.waitingQueueHead == 0 && !l.hasMoreWaiting | ||
} | ||
type node struct { | ||
slotID int | ||
key []byte | ||
maxCommitTS uint64 | ||
value *Lock | ||
|
||
func (l *latch) free() { | ||
l.waitingQueueHead = 0 | ||
next *node | ||
} | ||
|
||
func (l *latch) refreshCommitTS(commitTS uint64) { | ||
l.Lock() | ||
defer l.Unlock() | ||
l.maxCommitTS = mathutil.MaxUint64(commitTS, l.maxCommitTS) | ||
// latch stores a key's waiting transactions information. | ||
type latch struct { | ||
queue *node | ||
waiting []*Lock | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not each node has a waiting queue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Waiting queue is moved from each node to the latch for those reasons:
|
||
sync.Mutex | ||
} | ||
|
||
// Lock is the locks' information required for a transaction. | ||
type Lock struct { | ||
keys [][]byte | ||
// The slot IDs of the latches(keys) that a startTS must acquire before being able to processed. | ||
requiredSlots []int | ||
// The number of latches that the transaction has acquired. For status is stale, it include the | ||
|
@@ -96,9 +90,20 @@ func (l *Lock) SetCommitTS(commitTS uint64) { | |
// but conceptually a latch is a queue, and a slot is an index to the queue | ||
type Latches struct { | ||
slots []latch | ||
// The waiting queue for each slot(slotID => slice of Lock). | ||
waitingQueues map[int][]*Lock | ||
sync.RWMutex | ||
} | ||
|
||
type bytesSlice [][]byte | ||
|
||
func (s bytesSlice) Len() int { | ||
return len(s) | ||
} | ||
|
||
func (s bytesSlice) Swap(i, j int) { | ||
s[i], s[j] = s[j], s[i] | ||
} | ||
|
||
func (s bytesSlice) Less(i, j int) bool { | ||
return bytes.Compare(s[i], s[j]) < 0 | ||
} | ||
|
||
// NewLatches create a Latches with fixed length, | ||
|
@@ -107,14 +112,15 @@ func NewLatches(size uint) *Latches { | |
powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1))) | ||
slots := make([]latch, powerOfTwoSize) | ||
return &Latches{ | ||
slots: slots, | ||
waitingQueues: make(map[int][]*Lock), | ||
slots: slots, | ||
} | ||
} | ||
|
||
// genLock generates Lock for the transaction with startTS and keys. | ||
func (latches *Latches) genLock(startTS uint64, keys [][]byte) *Lock { | ||
sort.Sort(bytesSlice(keys)) | ||
return &Lock{ | ||
keys: keys, | ||
requiredSlots: latches.genSlotIDs(keys), | ||
acquiredCount: 0, | ||
startTS: startTS, | ||
|
@@ -126,17 +132,7 @@ func (latches *Latches) genSlotIDs(keys [][]byte) []int { | |
for _, key := range keys { | ||
slots = append(slots, latches.slotID(key)) | ||
} | ||
sort.Ints(slots) | ||
if len(slots) <= 1 { | ||
return slots | ||
} | ||
dedup := slots[:1] | ||
for i := 1; i < len(slots); i++ { | ||
if slots[i] != slots[i-1] { | ||
dedup = append(dedup, slots[i]) | ||
} | ||
} | ||
return dedup | ||
return slots | ||
} | ||
|
||
// slotID return slotID for current key. | ||
|
@@ -150,8 +146,7 @@ func (latches *Latches) acquire(lock *Lock) acquireResult { | |
return acquireStale | ||
} | ||
for lock.acquiredCount < len(lock.requiredSlots) { | ||
slotID := lock.requiredSlots[lock.acquiredCount] | ||
status := latches.acquireSlot(slotID, lock) | ||
status := latches.acquireSlot(lock) | ||
if status != acquireSuccess { | ||
return status | ||
} | ||
|
@@ -161,75 +156,99 @@ func (latches *Latches) acquire(lock *Lock) acquireResult { | |
|
||
// release releases all latches owned by the `lock` and returns the wakeup list. | ||
// Preconditions: the caller must ensure the transaction's status is not locked. | ||
func (latches *Latches) release(lock *Lock, commitTS uint64, wakeupList []*Lock) []*Lock { | ||
func (latches *Latches) release(lock *Lock, wakeupList []*Lock) []*Lock { | ||
wakeupList = wakeupList[:0] | ||
for i := 0; i < lock.acquiredCount; i++ { | ||
slotID := lock.requiredSlots[i] | ||
if nextLock := latches.releaseSlot(slotID, commitTS); nextLock != nil { | ||
for lock.acquiredCount > 0 { | ||
if nextLock := latches.releaseSlot(lock); nextLock != nil { | ||
wakeupList = append(wakeupList, nextLock) | ||
} | ||
} | ||
return wakeupList | ||
} | ||
|
||
// refreshCommitTS refreshes commitTS for keys. | ||
func (latches *Latches) refreshCommitTS(keys [][]byte, commitTS uint64) { | ||
slotIDs := latches.genSlotIDs(keys) | ||
for _, slotID := range slotIDs { | ||
latches.slots[slotID].refreshCommitTS(commitTS) | ||
} | ||
} | ||
|
||
func (latches *Latches) releaseSlot(slotID int, commitTS uint64) (nextLock *Lock) { | ||
func (latches *Latches) releaseSlot(lock *Lock) (nextLock *Lock) { | ||
key := lock.keys[lock.acquiredCount-1] | ||
slotID := lock.requiredSlots[lock.acquiredCount-1] | ||
latch := &latches.slots[slotID] | ||
lock.acquiredCount-- | ||
latch.Lock() | ||
defer latch.Unlock() | ||
latch.maxCommitTS = mathutil.MaxUint64(latch.maxCommitTS, commitTS) | ||
if !latch.hasMoreWaiting { | ||
latch.free() | ||
|
||
var find *node | ||
for n := latch.queue; n != nil; n = n.next { | ||
if bytes.Compare(n.key, key) == 0 { | ||
find = n | ||
break | ||
} | ||
} | ||
|
||
if find.value != lock { | ||
panic("releaseSlot wrong") | ||
} | ||
find.maxCommitTS = mathutil.MaxUint64(find.maxCommitTS, lock.commitTS) | ||
find.value = nil | ||
if len(latch.waiting) == 0 { | ||
return nil | ||
} | ||
nextLock, latch.hasMoreWaiting = latches.popFromWaitingQueue(slotID) | ||
latch.waitingQueueHead = nextLock.startTS | ||
nextLock.acquiredCount++ | ||
if latch.maxCommitTS > nextLock.startTS { | ||
nextLock.isStale = true | ||
|
||
idx := 0 | ||
for i := 0; i < len(latch.waiting); i++ { | ||
waiting := latch.waiting[i] | ||
if bytes.Compare(waiting.keys[waiting.acquiredCount], key) == 0 { | ||
nextLock = waiting | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible that there are more than 1 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Possible! you find a bug. |
||
} else { | ||
idx++ | ||
latch.waiting[idx] = waiting | ||
} | ||
} | ||
return nextLock | ||
} | ||
latch.waiting = latch.waiting[:idx] | ||
|
||
func (latches *Latches) popFromWaitingQueue(slotID int) (front *Lock, hasMoreWaiting bool) { | ||
latches.Lock() | ||
defer latches.Unlock() | ||
waiting := latches.waitingQueues[slotID] | ||
front = waiting[0] | ||
if len(waiting) == 1 { | ||
delete(latches.waitingQueues, slotID) | ||
} else { | ||
latches.waitingQueues[slotID] = waiting[1:] | ||
hasMoreWaiting = true | ||
if nextLock != nil && find.maxCommitTS > nextLock.startTS { | ||
nextLock.isStale = true | ||
} | ||
return | ||
return nextLock | ||
} | ||
|
||
func (latches *Latches) acquireSlot(slotID int, lock *Lock) acquireResult { | ||
func (latches *Latches) acquireSlot(lock *Lock) acquireResult { | ||
key := lock.keys[lock.acquiredCount] | ||
slotID := lock.requiredSlots[lock.acquiredCount] | ||
latch := &latches.slots[slotID] | ||
latch.Lock() | ||
defer latch.Unlock() | ||
if latch.maxCommitTS > lock.startTS { | ||
|
||
var find *node | ||
for n := latch.queue; n != nil; n = n.next { | ||
if bytes.Compare(n.key, key) == 0 { | ||
find = n | ||
break | ||
} | ||
// TODO: Invalidate old data. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot merge this PR before fixing this TODO. Because without it we will get an OOM. |
||
} | ||
|
||
if find == nil { | ||
tmp := &node{ | ||
slotID: slotID, | ||
key: key, | ||
value: lock, | ||
} | ||
tmp.next = latch.queue | ||
latch.queue = tmp | ||
lock.acquiredCount++ | ||
return acquireSuccess | ||
} | ||
|
||
if find.maxCommitTS > lock.startTS { | ||
lock.isStale = true | ||
return acquireStale | ||
} | ||
|
||
if latch.isEmpty() { | ||
latch.waitingQueueHead = lock.startTS | ||
if find.value == nil { | ||
find.value = lock | ||
lock.acquiredCount++ | ||
return acquireSuccess | ||
} | ||
|
||
// Push the current transaction into waitingQueue. | ||
latch.hasMoreWaiting = true | ||
latches.Lock() | ||
defer latches.Unlock() | ||
latches.waitingQueues[slotID] = append(latches.waitingQueues[slotID], lock) | ||
latch.waiting = append(latch.waiting, lock) | ||
return acquireLocked | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use
list.List
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
list.List
will make unnecessary allocation.Use
is similar to
list.List
is a doubly linked list, while a single linked list is sufficient here.