-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
plan,table : support write operator for cache table and mock lockwrite state #29444
Changes from all commits
4c7367b
0c0b299
2bac92d
ca60120
017850d
e24779b
f1d777f
faf5b48
f0a943a
8cced0b
5076c90
e41f910
8dae28c
d96a4da
25d5ae0
fe918f9
d0c5420
b6b4b37
2f9efb8
09bda9f
bb07c3b
9afdc15
47dc7fc
e819a23
b3da641
37665d2
3a9c68c
1b2ccb2
123d4f2
9feed27
90aceb1
da93bda
7390fb4
a1dad99
04fc688
767791a
58a53cb
ad3b990
aa91573
edfd442
f35d4f6
90ccb11
bb65656
20586b2
1c4f494
0f2ab17
3748f4f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,13 +15,17 @@ | |
package tables | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
"context" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/pingcap/errors" | ||
"github.com/pingcap/tidb/kv" | ||
"github.com/pingcap/tidb/sessionctx" | ||
"github.com/pingcap/tidb/table" | ||
"github.com/pingcap/tidb/tablecodec" | ||
"github.com/pingcap/tidb/types" | ||
"github.com/tikv/client-go/v2/oracle" | ||
"github.com/tikv/client-go/v2/tikv" | ||
) | ||
|
||
|
@@ -30,74 +34,148 @@ var _ table.CachedTable = &cachedTable{} | |
|
||
type cachedTable struct { | ||
TableCommon | ||
kv.MemBuffer | ||
mu sync.RWMutex | ||
cacheData atomic.Value | ||
handle StateRemote | ||
} | ||
|
||
func leaseFromTS(ts uint64) uint64 { | ||
// TODO make this configurable in the following PRs | ||
const defaultLeaseDuration time.Duration = 3 * time.Second | ||
tiancaiamao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
physicalTime := oracle.GetTimeFromTS(ts) | ||
lease := oracle.GoTimeToTS(physicalTime.Add(defaultLeaseDuration)) | ||
return lease | ||
} | ||
|
||
func (c *cachedTable) TryGetMemcache(ts uint64) (kv.MemBuffer, bool) { | ||
c.mu.RLock() | ||
defer c.mu.RUnlock() | ||
if c.isReadFromCache(ts) { | ||
return c.MemBuffer, true | ||
tmp := c.cacheData.Load() | ||
if tmp == nil { | ||
return nil, false | ||
} | ||
data := tmp.(*table.CacheData) | ||
if data.Lease > ts { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if ts is a very old one. For example:
If some data updated during sleeping and then tidb obtained a read lock before select. Will the last select return the wrong data that is from cache? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In MVCC transaction, a transaction comes with a concept of version (ts in TiDB)
This transaction should read the data of 1h ago, even some other transaction modify the data later, it doesn't break the snapshot isolation semantic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What I mean is that for an old ts, it can get a wrong result from cache. Suppose the start ts is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
sorry my mistake。 There are such problems when using locks, but if only atomic.Value is used, the cacheData one hour ago has been fixed, even if there is an update later, it is not the update There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lcwangchao The correct implement should use atomic operation, instead of lock based. Because the lock here do not prevent the "ABA" problem as you mentioned. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we are referring the same problem... That is my test result for this pr:
Two select results in session1 should be the same (both empty), but they are not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah. this will modify in next pr. see https://github.com/pingcap/tidb/pull/29443/files#diff-dc8c4c0ab93631d336a3e92583a64a1837135319cef7337758cd13ab86992086R144-R147 |
||
return data.MemBuffer, true | ||
} | ||
return nil, false | ||
} | ||
func (c *cachedTable) isReadFromCache(ts uint64) bool { | ||
// If first read cache table. directly return false, the backend goroutine will help us update the lock information | ||
// and read the data from the original table at the same time | ||
// TODO : Use lease and ts judge whether it is readable. | ||
// TODO : If the cache is not readable. MemBuffer become invalid. | ||
return c.MemBuffer != nil | ||
} | ||
|
||
var mockStateRemote = struct { | ||
Ch chan remoteTask | ||
Data *mockStateRemoteData | ||
}{} | ||
|
||
// NewCachedTable creates a new CachedTable Instance | ||
func NewCachedTable(tbl *TableCommon) (table.Table, error) { | ||
return &cachedTable{ | ||
if mockStateRemote.Data == nil { | ||
mockStateRemote.Data = newMockStateRemoteData() | ||
mockStateRemote.Ch = make(chan remoteTask, 100) | ||
go mockRemoteService(mockStateRemote.Data, mockStateRemote.Ch) | ||
} | ||
ret := &cachedTable{ | ||
TableCommon: *tbl, | ||
}, nil | ||
handle: &mockStateRemoteHandle{mockStateRemote.Ch}, | ||
} | ||
|
||
return ret, nil | ||
} | ||
|
||
func (c *cachedTable) loadDataFromOriginalTable(ctx sessionctx.Context) error { | ||
func (c *cachedTable) loadDataFromOriginalTable(ctx sessionctx.Context, lease uint64) (kv.MemBuffer, error) { | ||
prefix := tablecodec.GenTablePrefix(c.tableID) | ||
txn, err := ctx.Txn(true) | ||
if err != nil { | ||
return err | ||
return nil, err | ||
} | ||
if txn.StartTS() >= lease { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can start a new txn with the same startTs with the old one to avoid this error There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i will modify it in the next pr. loadDataFromOriginalTable will execute a new txn. use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree to implement it in next pr. But I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use |
||
return nil, errors.New("the loaded data is outdate for caching") | ||
} | ||
|
||
buffTxn, err := ctx.GetStore().BeginWithOption(tikv.DefaultStartTSOption().SetStartTS(0)) | ||
if err != nil { | ||
return err | ||
return nil, err | ||
} | ||
|
||
buffer := buffTxn.GetMemBuffer() | ||
it, err := txn.Iter(prefix, prefix.PrefixNext()) | ||
if err != nil { | ||
return err | ||
return nil, err | ||
} | ||
defer it.Close() | ||
if !it.Valid() { | ||
return nil | ||
} | ||
for it.Valid() && it.Key().HasPrefix(prefix) { | ||
value := it.Value() | ||
err = buffer.Set(it.Key(), value) | ||
if err != nil { | ||
return err | ||
return nil, err | ||
} | ||
err = it.Next() | ||
if err != nil { | ||
return err | ||
return nil, err | ||
} | ||
} | ||
c.mu.Lock() | ||
c.MemBuffer = buffer | ||
c.mu.Unlock() | ||
return nil | ||
|
||
return buffer, nil | ||
} | ||
|
||
func (c *cachedTable) UpdateLockForRead(ctx sessionctx.Context, ts uint64) error { | ||
// Now only the data is re-load here, and the lock information is not updated. any read-lock information update will in the next pr. | ||
err := c.loadDataFromOriginalTable(ctx) | ||
// Load data from original table and the update lock information. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. okay. i will modify it in the next pr. |
||
tid := c.Meta().ID | ||
lease := leaseFromTS(ts) | ||
succ, err := c.handle.LockForRead(tid, ts, lease) | ||
if err != nil { | ||
return fmt.Errorf("reload data error") | ||
return errors.Trace(err) | ||
} | ||
if succ { | ||
mb, err := c.loadDataFromOriginalTable(ctx, lease) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
||
c.cacheData.Store(&table.CacheData{ | ||
Lease: lease, | ||
MemBuffer: mb, | ||
}) | ||
} | ||
// Current status is not suitable to cache. | ||
return nil | ||
} | ||
|
||
// AddRecord implements the AddRecord method for the table.Table interface. | ||
func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { | ||
txn, err := ctx.Txn(true) | ||
if err != nil { | ||
return nil, err | ||
} | ||
now := txn.StartTS() | ||
err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
return c.TableCommon.AddRecord(ctx, r, opts...) | ||
|
||
} | ||
|
||
// UpdateRecord implements table.Table | ||
func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { | ||
txn, err := sctx.Txn(true) | ||
if err != nil { | ||
return err | ||
} | ||
now := txn.StartTS() | ||
err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) | ||
JayLZhou marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
return c.TableCommon.UpdateRecord(ctx, sctx, h, oldData, newData, touched) | ||
} | ||
|
||
// RemoveRecord implements table.Table RemoveRecord interface. | ||
func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error { | ||
txn, err := ctx.Txn(true) | ||
if err != nil { | ||
return err | ||
} | ||
now := txn.StartTS() | ||
err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
return c.TableCommon.RemoveRecord(ctx, h, r) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will raise a error. so i add it.