-
Notifications
You must be signed in to change notification settings - Fork 229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
txnkv: add new API for lock->put optimization #748
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -185,6 +185,8 @@ type memBufferMutations struct { | |
// MSB LSB | ||
// [13 bits: Op][1 bit: assertNotExist][1 bit: assertExist][1 bit: isPessimisticLock] | ||
handles []unionstore.MemKeyHandle | ||
// overlay of mutation values | ||
overlay map[unionstore.MemKeyHandle][]byte | ||
} | ||
|
||
func newMemBufferMutations(sizeHint int, storage *unionstore.MemDB) *memBufferMutations { | ||
|
@@ -211,7 +213,13 @@ func (m *memBufferMutations) GetKeys() [][]byte { | |
} | ||
|
||
func (m *memBufferMutations) GetValue(i int) []byte { | ||
v, _ := m.storage.GetValueByHandle(m.handles[i]) | ||
h := m.handles[i] | ||
if m.overlay != nil { | ||
if v, ok := m.overlay[h]; ok { | ||
return v | ||
} | ||
} | ||
v, _ := m.storage.GetValueByHandle(h) | ||
return v | ||
} | ||
|
||
|
@@ -235,10 +243,11 @@ func (m *memBufferMutations) Slice(from, to int) CommitterMutations { | |
return &memBufferMutations{ | ||
handles: m.handles[from:to], | ||
storage: m.storage, | ||
overlay: m.overlay, | ||
} | ||
} | ||
|
||
func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, assertNotExist bool, handle unionstore.MemKeyHandle) { | ||
func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, assertNotExist bool, handle unionstore.MemKeyHandle, value []byte) { | ||
// See comments of `m.handles` field about the format of the user data `aux`. | ||
aux := uint16(op) << 3 | ||
if isPessimisticLock { | ||
|
@@ -252,6 +261,18 @@ func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, | |
} | ||
handle.UserData = aux | ||
m.handles = append(m.handles, handle) | ||
if len(value) > 0 { | ||
if op != kvrpcpb.Op_Put { | ||
panic("op must be PUT when pushing with value") | ||
} | ||
if !isPessimisticLock { | ||
panic("key must be locked when pushing with value") | ||
} | ||
if m.overlay == nil { | ||
m.overlay = make(map[unionstore.MemKeyHandle][]byte) | ||
} | ||
m.overlay[handle] = value | ||
} | ||
} | ||
|
||
// CommitterMutationFlags represents various bit flags of mutations. | ||
|
@@ -493,7 +514,7 @@ func (c *twoPhaseCommitter) checkSchemaOnAssertionFail(ctx context.Context, asse | |
} | ||
|
||
func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { | ||
var size, putCnt, delCnt, lockCnt, checkCnt int | ||
var size, putCnt, delCnt, lockCnt, checkCnt, putFromLockCnt int | ||
|
||
txn := c.txn | ||
memBuf := txn.GetMemBuffer() | ||
|
@@ -508,15 +529,25 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { | |
_ = err | ||
key := it.Key() | ||
flags := it.Flags() | ||
var value []byte | ||
var op kvrpcpb.Op | ||
var ( | ||
value []byte | ||
cachedValue []byte = nil | ||
op kvrpcpb.Op | ||
) | ||
|
||
if !it.HasValue() { | ||
if !flags.HasLocked() { | ||
continue | ||
} | ||
op = kvrpcpb.Op_Lock | ||
lockCnt++ | ||
if val, ok := txn.getValueByLockedKey(key); ok && len(val) > 0 && c.isPessimistic { | ||
// Change the LOCK into PUT if the value of this key has a cached value. | ||
cachedValue = val | ||
op = kvrpcpb.Op_Put | ||
putFromLockCnt++ | ||
} else { | ||
op = kvrpcpb.Op_Lock | ||
lockCnt++ | ||
} | ||
} else { | ||
value = it.Value() | ||
var isUnnecessaryKV bool | ||
|
@@ -581,8 +612,8 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { | |
if c.txn.schemaAmender != nil || c.txn.assertionLevel == kvrpcpb.AssertionLevel_Off { | ||
mustExist, mustNotExist, hasAssertUnknown = false, false, false | ||
} | ||
c.mutations.Push(op, isPessimistic, mustExist, mustNotExist, it.Handle()) | ||
size += len(key) + len(value) | ||
c.mutations.Push(op, isPessimistic, mustExist, mustNotExist, it.Handle(), cachedValue) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some comments are left here, maybe avoiding using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
size += len(key) + len(value) + len(cachedValue) | ||
|
||
if c.txn.assertionLevel != kvrpcpb.AssertionLevel_Off { | ||
// Check mutations for pessimistic-locked keys with the read results of pessimistic lock requests. | ||
|
@@ -635,6 +666,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { | |
zap.Int("dels", delCnt), | ||
zap.Int("locks", lockCnt), | ||
zap.Int("checks", checkCnt), | ||
zap.Int("putsFromLocks", putFromLockCnt), | ||
zap.Uint64("txnStartTS", txn.startTS)) | ||
} | ||
|
||
|
@@ -1758,7 +1790,7 @@ func (c *twoPhaseCommitter) tryAmendTxn(ctx context.Context, startInfoSchema Sch | |
return false, err | ||
} | ||
handle := c.txn.GetMemBuffer().IterWithFlags(key, nil).Handle() | ||
c.mutations.Push(op, addMutations.IsPessimisticLock(i), addMutations.IsAssertExists(i), addMutations.IsAssertNotExist(i), handle) | ||
c.mutations.Push(op, addMutations.IsPessimisticLock(i), addMutations.IsAssertExists(i), addMutations.IsAssertNotExist(i), handle, nil) | ||
} | ||
} | ||
return false, nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could assert
isPessimisticLock == true
here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean panic on assertion failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as it may prevent some data inconsistency risks.