Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plan,table : support write operator for cache table and mock lockwrite state #29444

Merged
merged 47 commits into from
Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
4c7367b
dml_cache_read
JayLZhou Oct 27, 2021
0c0b299
review_modify_1
JayLZhou Oct 29, 2021
2bac92d
fix_ci
JayLZhou Oct 29, 2021
ca60120
fix_ci
JayLZhou Oct 29, 2021
017850d
fix_review
JayLZhou Nov 2, 2021
e24779b
fix_twice
JayLZhou Nov 2, 2021
f1d777f
Merge branch 'master' into dml_read_cache_table
JayLZhou Nov 3, 2021
faf5b48
Merge branch 'master' of https://github.com/pingcap/tidb into dml_rea…
JayLZhou Nov 3, 2021
f0a943a
save_conflit
JayLZhou Nov 3, 2021
8cced0b
Merge branch 'master' into dml_read_cache_table
JayLZhou Nov 3, 2021
5076c90
fix_ci_and_add_membuffer_point
JayLZhou Nov 3, 2021
e41f910
Merge branch 'master' of https://github.com/pingcap/tidb into dml_rea…
JayLZhou Nov 3, 2021
8dae28c
fix_conflict
JayLZhou Nov 3, 2021
d96a4da
save_write
JayLZhou Nov 4, 2021
25d5ae0
fix_conflit
JayLZhou Nov 4, 2021
fe918f9
draft_write
JayLZhou Nov 4, 2021
d0c5420
draft_write
JayLZhou Nov 4, 2021
b6b4b37
draft_write
JayLZhou Nov 4, 2021
2f9efb8
fix
JayLZhou Nov 4, 2021
09bda9f
fix
JayLZhou Nov 4, 2021
bb07c3b
fix
JayLZhou Nov 5, 2021
9afdc15
fix
JayLZhou Nov 5, 2021
47dc7fc
fix_check_comment
JayLZhou Nov 5, 2021
e819a23
fix_check_comment
JayLZhou Nov 5, 2021
b3da641
fix_check_comment
JayLZhou Nov 5, 2021
37665d2
fix_check_comment
JayLZhou Nov 5, 2021
3a9c68c
fix_check_comment
JayLZhou Nov 5, 2021
1b2ccb2
fix_check_comment
JayLZhou Nov 5, 2021
123d4f2
fix_check_comment
JayLZhou Nov 5, 2021
9feed27
fix_check_comment
JayLZhou Nov 5, 2021
90aceb1
fix_check_comment
JayLZhou Nov 5, 2021
da93bda
fix_ci
JayLZhou Nov 6, 2021
7390fb4
fix_ci
JayLZhou Nov 6, 2021
a1dad99
fix_ci
JayLZhou Nov 6, 2021
04fc688
fix_ci
JayLZhou Nov 6, 2021
767791a
fix_ci
JayLZhou Nov 7, 2021
58a53cb
fix
JayLZhou Nov 7, 2021
ad3b990
add
JayLZhou Nov 8, 2021
aa91573
add
JayLZhou Nov 8, 2021
edfd442
add
JayLZhou Nov 8, 2021
f35d4f6
add
JayLZhou Nov 8, 2021
90ccb11
fix_ci
JayLZhou Nov 8, 2021
bb65656
fix_ci
JayLZhou Nov 8, 2021
20586b2
Merge branch 'master' into dml_write_and_mock_state
ti-chi-bot Nov 8, 2021
1c4f494
fix_ci
JayLZhou Nov 8, 2021
0f2ab17
Merge branch 'master' of https://github.com/pingcap/tidb into dml_wri…
JayLZhou Nov 8, 2021
3748f4f
Merge branch 'dml_write_and_mock_state' of https://github.com/JayL-zx…
JayLZhou Nov 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ddl/db_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func (s *testDBSuite2) TestAlterTableCache(c *C) {
c.Assert(terror.ErrorEqual(domain.ErrInfoSchemaChanged, err), IsTrue)
/* Test can skip schema checker */
tk.MustExec("begin")
tk.MustExec("drop table if exists t1")
tk.MustExec("CREATE TABLE t1 (a int)")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will raise a error. so i add it.

tk.MustExec("insert into t1 set a=2;")
tk2.MustExec("alter table t2 cache")
tk.MustExec("commit")
Expand Down
2 changes: 2 additions & 0 deletions executor/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.mockRemoteService"),
goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.NewCachedTable"),
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
}
goleak.VerifyTestMain(m, opts...)
}
1 change: 0 additions & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4151,7 +4151,6 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
return nil, err
}
// Use the txn of the transaction to determine whether the cache can be read.
// About read lock and read condition feature. will add in the next pr.
buffer, cond := cachedTable.TryGetMemcache(txn.StartTS())
if cond {
b.ctx.GetSessionVars().StmtCtx.StoreCacheTable(tbl.Meta().ID, buffer)
Expand Down
6 changes: 6 additions & 0 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,9 @@ type CachedTable interface {
// you need to update the lock information and read the data from the original table
UpdateLockForRead(ctx sessionctx.Context, ts uint64) error
}

// CacheData pack the cache data and lease
type CacheData struct {
Lease uint64
kv.MemBuffer
}
144 changes: 111 additions & 33 deletions table/tables/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
package tables

import (
"fmt"
"sync"
"context"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
)

Expand All @@ -30,74 +34,148 @@ var _ table.CachedTable = &cachedTable{}

type cachedTable struct {
TableCommon
kv.MemBuffer
mu sync.RWMutex
cacheData atomic.Value
handle StateRemote
}

func leaseFromTS(ts uint64) uint64 {
// TODO make this configurable in the following PRs
const defaultLeaseDuration time.Duration = 3 * time.Second
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
physicalTime := oracle.GetTimeFromTS(ts)
lease := oracle.GoTimeToTS(physicalTime.Add(defaultLeaseDuration))
return lease
}

func (c *cachedTable) TryGetMemcache(ts uint64) (kv.MemBuffer, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
if c.isReadFromCache(ts) {
return c.MemBuffer, true
tmp := c.cacheData.Load()
if tmp == nil {
return nil, false
}
data := tmp.(*table.CacheData)
if data.Lease > ts {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if ts is a very old one. For example:

begin;
do sleep(3600);
select * from small_table;

If some data updated during sleeping and then tidb obtained a read lock before select. Will the last select return the wrong data that is from cache?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In MVCC transaction, a transaction comes with a concept of version (ts in TiDB)

begin; do sleep(3600); select * from small_table;

This transaction should read the data of 1h ago, even some other transaction modify the data later, it doesn't break the snapshot isolation semantic.

Copy link
Collaborator

@lcwangchao lcwangchao Nov 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that for an old ts, it can get a wrong result from cache. Suppose the start ts is ts1 which is an hour ago and the table is locked by an read lock with lease ts2 . The cache version is ts3 . ts1 < ts3 < ts2 , so it will read data from cache instead of tikv. However we cannot guarantee that there no updates between ts1 and ts3 . So the select will get a newer data than expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that for an old ts, it can get a wrong result from cache. Suppose the start ts is ts1 which is an hour ago and the table is locked by an read lock with lease ts2 . The cache version is ts3 . ts1 < ts3 < ts2 , so it will read data from cache instead of tikv. However we cannot guarantee that there no updates between ts1 and ts3 . So the select will get a newer data than expected.

sorry my mistake。 There are such problems when using locks, but if only atomic.Value is used, the cacheData one hour ago has been fixed, even if there is an update later, it is not the update

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lcwangchao The correct implement should use atomic operation, instead of lock based. Because the lock here do not prevent the "ABA" problem as you mentioned.
The ts and membuffer together formed a cacheData. And when the plan is done, the snapshot of that cacheData is collected into the plan, and never change in the future. You can see how it's done in another PR:

https://github.com/pingcap/tidb/pull/29443/files#diff-dc8c4c0ab93631d336a3e92583a64a1837135319cef7337758cd13ab86992086R144-R147

Copy link
Collaborator

@lcwangchao lcwangchao Nov 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we are referring the same problem... That is my test result for this pr:

create table tc (id int);
alter table tc cache;

--- session1, begin a txn
begin;  
select * from tc; --- empty output

    -- in session2, do some write and then read
    insert into tc values(1); --- write data 1
    select * from tc; --- trigger read lock

--- back to session1, still in txn
select * from tc; --- Got '1' , expect empty result set

Two select results in session1 should be the same (both empty), but they are not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return data.MemBuffer, true
}
return nil, false
}
func (c *cachedTable) isReadFromCache(ts uint64) bool {
// If first read cache table. directly return false, the backend goroutine will help us update the lock information
// and read the data from the original table at the same time
// TODO : Use lease and ts judge whether it is readable.
// TODO : If the cache is not readable. MemBuffer become invalid.
return c.MemBuffer != nil
}

var mockStateRemote = struct {
Ch chan remoteTask
Data *mockStateRemoteData
}{}

// NewCachedTable creates a new CachedTable Instance
func NewCachedTable(tbl *TableCommon) (table.Table, error) {
return &cachedTable{
if mockStateRemote.Data == nil {
mockStateRemote.Data = newMockStateRemoteData()
mockStateRemote.Ch = make(chan remoteTask, 100)
go mockRemoteService(mockStateRemote.Data, mockStateRemote.Ch)
}
ret := &cachedTable{
TableCommon: *tbl,
}, nil
handle: &mockStateRemoteHandle{mockStateRemote.Ch},
}

return ret, nil
}

func (c *cachedTable) loadDataFromOriginalTable(ctx sessionctx.Context) error {
func (c *cachedTable) loadDataFromOriginalTable(ctx sessionctx.Context, lease uint64) (kv.MemBuffer, error) {
prefix := tablecodec.GenTablePrefix(c.tableID)
txn, err := ctx.Txn(true)
if err != nil {
return err
return nil, err
}
if txn.StartTS() >= lease {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can start a new txn with the same startTs with the old one to avoid this error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will modify it in the next pr. loadDataFromOriginalTable will execute a new txn. use txn.RunInNewTxn . sorry but this pr we fouces on write and mock a remoteState.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree to implement it in next pr. But I think txn.RunInNewTxn is not allowed to specify a custom ts.

Copy link
Contributor

@tiancaiamao tiancaiamao Nov 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use txn.RunInNewTxn and compare its ts with lease, if ts < lease, caching is safe. Otherwise we can return a error to the caller

return nil, errors.New("the loaded data is outdate for caching")
}

buffTxn, err := ctx.GetStore().BeginWithOption(tikv.DefaultStartTSOption().SetStartTS(0))
if err != nil {
return err
return nil, err
}

buffer := buffTxn.GetMemBuffer()
it, err := txn.Iter(prefix, prefix.PrefixNext())
if err != nil {
return err
return nil, err
}
defer it.Close()
if !it.Valid() {
return nil
}
for it.Valid() && it.Key().HasPrefix(prefix) {
value := it.Value()
err = buffer.Set(it.Key(), value)
if err != nil {
return err
return nil, err
}
err = it.Next()
if err != nil {
return err
return nil, err
}
}
c.mu.Lock()
c.MemBuffer = buffer
c.mu.Unlock()
return nil

return buffer, nil
}

func (c *cachedTable) UpdateLockForRead(ctx sessionctx.Context, ts uint64) error {
// Now only the data is re-load here, and the lock information is not updated. any read-lock information update will in the next pr.
err := c.loadDataFromOriginalTable(ctx)
// Load data from original table and the update lock information.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change UpdateLockForRead(ctx sessionctx.Context, ts uint64) to UpdateLockForRead(store kv.Storage, ts uint64) to avoid misuse for sessionctx.Context that can cause data race. You can do it in next pr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay. i will modify it in the next pr.

tid := c.Meta().ID
lease := leaseFromTS(ts)
succ, err := c.handle.LockForRead(tid, ts, lease)
if err != nil {
return fmt.Errorf("reload data error")
return errors.Trace(err)
}
if succ {
mb, err := c.loadDataFromOriginalTable(ctx, lease)
if err != nil {
return errors.Trace(err)
}

c.cacheData.Store(&table.CacheData{
Lease: lease,
MemBuffer: mb,
})
}
// Current status is not suitable to cache.
return nil
}

// AddRecord implements the AddRecord method for the table.Table interface.
func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) {
txn, err := ctx.Txn(true)
if err != nil {
return nil, err
}
now := txn.StartTS()
err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now))
if err != nil {
return nil, errors.Trace(err)
}
return c.TableCommon.AddRecord(ctx, r, opts...)

}

// UpdateRecord implements table.Table
func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error {
txn, err := sctx.Txn(true)
if err != nil {
return err
}
now := txn.StartTS()
err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now))
JayLZhou marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Trace(err)
}
return c.TableCommon.UpdateRecord(ctx, sctx, h, oldData, newData, touched)
}

// RemoveRecord implements table.Table RemoveRecord interface.
func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error {
txn, err := ctx.Txn(true)
if err != nil {
return err
}
now := txn.StartTS()
err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now))
if err != nil {
return errors.Trace(err)
}
return c.TableCommon.RemoveRecord(ctx, h, r)
}
39 changes: 39 additions & 0 deletions table/tables/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)

func TestCacheTableBasicScan(t *testing.T) {
Expand Down Expand Up @@ -113,3 +114,41 @@ func TestCacheTableBasicScan(t *testing.T) {
assertSelect()

}

func TestCacheTableBasicReadAndWrite(t *testing.T) {
t.Parallel()
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk.MustExec("drop table if exists write_tmp1")
tk.MustExec("create table write_tmp1 (id int primary key auto_increment, u int unique, v int)")
tk.MustExec("insert into write_tmp1 values" +
"(1, 101, 1001), (3, 113, 1003)",
)

tk.MustExec("alter table write_tmp1 cache")
// Read and add read lock
tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001",
"3 113 1003"))
// read lock should valid
for i := 0; i < 10; i++ {
if tk.HasPlan("select *from write_tmp1", "UnionScan") {
break
}
}
tk.MustExec("use test")
tk1.MustExec("insert into write_tmp1 values (2, 222, 222)")
// write lock exists
require.False(t, tk.HasPlan("select *from write_tmp1", "UnionScan"))
// wait write lock expire and check cache can be used again
for tk.HasPlan("select *from write_tmp1", "UnionScan") {
}
tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 222", "3 113 1003"))
tk1.MustExec("update write_tmp1 set v = 3333 where id = 2")
for tk.HasPlan("select *from write_tmp1", "UnionScan") {
}
tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 3333", "3 113 1003"))
}
2 changes: 1 addition & 1 deletion table/tables/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (

func TestMain(m *testing.M) {
testbridge.WorkaroundGoCheckFlags()

opts := []goleak.Option{
goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.mockRemoteService"),
}
goleak.VerifyTestMain(m, opts...)
}
Loading