diff --git a/go.mod b/go.mod index e61a95e5a0a2d..3c2868df8b118 100644 --- a/go.mod +++ b/go.mod @@ -65,7 +65,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.0-rc.0.20211213075151-b147ced35a14 + github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50 github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee github.com/twmb/murmur3 v1.1.3 github.com/uber/jaeger-client-go v2.22.1+incompatible diff --git a/go.sum b/go.sum index 56b154bd4a38f..a6b2d60e60a01 100644 --- a/go.sum +++ b/go.sum @@ -712,8 +712,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfK github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= -github.com/tikv/client-go/v2 v2.0.0-rc.0.20211213075151-b147ced35a14 h1:l2T+gfgYpwmLRY5geDq1zM4Lz4X2mi1ruO18/bDGo70= -github.com/tikv/client-go/v2 v2.0.0-rc.0.20211213075151-b147ced35a14/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8= +github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50 h1:B+cAIm2P1/SNsVV1vL9/mRaGUVl/vdgV8MU03O0vY28= +github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8= github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae/go.mod h1:varH0IE0jJ9E9WN2Ei/N6pajMlPkcXdDEf7f5mmsUVQ= github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee h1:rAAdvQ8Hh36syHr92g0VmZEpkH+40RGQBpFL2121xMs= github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee/go.mod h1:lRbwxBAhnTQR5vqbTzeI/Bj62bD2OvYYuFezo2vrmeI= diff --git a/kv/option.go b/kv/option.go index 683f5fcb8e389..2a7a17fedcb6c 100644 --- a/kv/option.go +++ b/kv/option.go @@ -71,6 +71,9 @@ const ( // SnapInterceptor is used for setting the interceptor for snapshot SnapInterceptor + // CommitTSUpperBoundChec is used by cached table + // The commitTS must be greater than all the write lock lease of the visited cached table. + CommitTSUpperBoundCheck ) // ReplicaReadType is the type of replica to read data from diff --git a/session/session.go b/session/session.go index 465de576b37c7..16fecfa3b0121 100644 --- a/session/session.go +++ b/session/session.go @@ -44,6 +44,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/table/temptable" "github.com/pingcap/tidb/util/topsql" "github.com/pingcap/tipb/go-binlog" @@ -89,6 +90,7 @@ import ( "github.com/pingcap/tidb/util/tableutil" "github.com/pingcap/tidb/util/timeutil" tikvstore "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" tikvutil "github.com/tikv/client-go/v2/util" ) @@ -559,10 +561,104 @@ func (s *session) doCommit(ctx context.Context) error { if tables := sessVars.TxnCtx.TemporaryTables; len(tables) > 0 { s.txn.SetOption(kv.KVFilter, temporaryTableKVFilter(tables)) } + if tables := sessVars.TxnCtx.CachedTables; len(tables) > 0 { + c := cachedTableRenewLease{tables: tables} + now := time.Now() + err := c.start(ctx) + defer c.stop(ctx) + sessVars.StmtCtx.WaitLockLeaseTime += time.Since(now) + if err != nil { + return errors.Trace(err) + } + s.txn.SetOption(kv.CommitTSUpperBoundCheck, c.commitTSCheck) + } return s.commitTxnWithTemporaryData(tikvutil.SetSessionID(ctx, sessVars.ConnectionID), &s.txn) } +type cachedTableRenewLease struct { + tables map[int64]interface{} + lease []uint64 // Lease for each visited cached tables. + exit chan struct{} +} + +func (c *cachedTableRenewLease) start(ctx context.Context) error { + c.exit = make(chan struct{}) + c.lease = make([]uint64, len(c.tables)) + wg := make(chan error) + ith := 0 + for tid, raw := range c.tables { + go c.keepAlive(ctx, wg, raw.(tables.StateRemote), tid, &c.lease[ith]) + ith++ + } + + // Wait for all LockForWrite() return, this function can return. + var err error + for ; ith > 0; ith-- { + tmp := <-wg + if tmp != nil { + err = tmp + } + } + return err +} + +const cacheTableWriteLease = 5 * time.Second + +func (c *cachedTableRenewLease) keepAlive(ctx context.Context, wg chan error, handle tables.StateRemote, tid int64, leasePtr *uint64) { + writeLockLease, err := handle.LockForWrite(ctx, tid) + atomic.StoreUint64(leasePtr, writeLockLease) + wg <- err + if err != nil { + logutil.Logger(ctx).Warn("[cached table] lock for write lock fail", zap.Error(err)) + return + } + + t := time.NewTicker(cacheTableWriteLease) + defer t.Stop() + for { + select { + case <-t.C: + if err := c.renew(ctx, handle, tid, leasePtr); err != nil { + logutil.Logger(ctx).Warn("[cached table] renew write lock lease fail", zap.Error(err)) + return + } + case <-c.exit: + return + } + } +} + +func (c *cachedTableRenewLease) renew(ctx context.Context, handle tables.StateRemote, tid int64, leasePtr *uint64) error { + oldLease := atomic.LoadUint64(leasePtr) + physicalTime := oracle.GetTimeFromTS(oldLease) + newLease := oracle.GoTimeToTS(physicalTime.Add(cacheTableWriteLease)) + + succ, err := handle.RenewLease(ctx, tid, newLease, tables.RenewWriteLease) + if err != nil { + return errors.Trace(err) + } + if succ { + atomic.StoreUint64(leasePtr, newLease) + } + return nil +} + +func (c *cachedTableRenewLease) stop(ctx context.Context) { + close(c.exit) +} + +func (c *cachedTableRenewLease) commitTSCheck(commitTS uint64) bool { + for i := 0; i < len(c.lease); i++ { + lease := atomic.LoadUint64(&c.lease[i]) + if commitTS >= lease { + // Txn fails to commit because the write lease is expired. + return false + } + } + return true +} + func (s *session) commitTxnWithTemporaryData(ctx context.Context, txn kv.Transaction) error { sessVars := s.sessionVars txnTempTables := sessVars.TxnCtx.TemporaryTables diff --git a/session/session_test.go b/session/session_test.go index acbc889102fe1..7b5febe0d18e0 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -5883,3 +5883,35 @@ func (s *testSessionSuite) TestSameNameObjectWithLocalTemporaryTable(c *C) { " `cs1` int(11) DEFAULT NULL\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) } + +func (s *testSessionSuite) TestWriteOnMultipleCachedTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists ct1, ct2") + tk.MustExec("create table ct1 (id int, c int)") + tk.MustExec("create table ct2 (id int, c int)") + tk.MustExec("alter table ct1 cache") + tk.MustExec("alter table ct2 cache") + tk.MustQuery("select * from ct1").Check(testkit.Rows()) + tk.MustQuery("select * from ct2").Check(testkit.Rows()) + + cached := false + for i := 0; i < 50; i++ { + if tk.HasPlan("select * from ct1", "Union") { + if tk.HasPlan("select * from ct2", "Union") { + cached = true + break + } + } + time.Sleep(100 * time.Millisecond) + } + c.Assert(cached, IsTrue) + + tk.MustExec("begin") + tk.MustExec("insert into ct1 values (3, 4)") + tk.MustExec("insert into ct2 values (5, 6)") + tk.MustExec("commit") + + tk.MustQuery("select * from ct1").Check(testkit.Rows("3 4")) + tk.MustQuery("select * from ct2").Check(testkit.Rows("5 6")) +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 680e6f2c1367b..8169eaa5c2d66 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -180,6 +180,9 @@ type TransactionContext struct { // TemporaryTables is used to store transaction-specific information for global temporary tables. // It can also be stored in sessionCtx with local temporary tables, but it's easier to clean this data after transaction ends. TemporaryTables map[int64]tableutil.TempTable + + // CachedTables is not nil if the transaction write on cached table. + CachedTables map[int64]interface{} } // GetShard returns the shard prefix for the next `count` rowids. diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 823d33ac88f59..717bf3b154761 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -230,6 +230,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.KVTxn.SetKVFilter(val.(tikv.KVFilter)) case kv.SnapInterceptor: txn.snapshotInterceptor = val.(kv.SnapshotInterceptor) + case kv.CommitTSUpperBoundCheck: + txn.KVTxn.SetCommitTSUpperBoundCheck(val.(func(commitTS uint64) bool)) } } diff --git a/table/tables/cache.go b/table/tables/cache.go index c95379593c066..7e3eb7c40b9a9 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -183,51 +183,31 @@ func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, t } // AddRecord implements the AddRecord method for the table.Table interface. -func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { - txn, err := ctx.Txn(true) - if err != nil { - return nil, err +func (c *cachedTable) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { + txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) + return c.TableCommon.AddRecord(sctx, r, opts...) +} + +func txnCtxAddCachedTable(sctx sessionctx.Context, tid int64, handle StateRemote) { + txnCtx := sctx.GetSessionVars().TxnCtx + if txnCtx.CachedTables == nil { + txnCtx.CachedTables = make(map[int64]interface{}) } - now := txn.StartTS() - start := time.Now() - err = c.handle.LockForWrite(context.Background(), c.Meta().ID, leaseFromTS(now)) - if err != nil { - return nil, errors.Trace(err) + if _, ok := txnCtx.CachedTables[tid]; !ok { + txnCtx.CachedTables[tid] = handle } - ctx.GetSessionVars().StmtCtx.WaitLockLeaseTime += time.Since(start) - return c.TableCommon.AddRecord(ctx, r, opts...) } // UpdateRecord implements table.Table func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { - txn, err := sctx.Txn(true) - if err != nil { - return err - } - now := txn.StartTS() - start := time.Now() - err = c.handle.LockForWrite(ctx, c.Meta().ID, leaseFromTS(now)) - if err != nil { - return errors.Trace(err) - } - sctx.GetSessionVars().StmtCtx.WaitLockLeaseTime += time.Since(start) + txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) return c.TableCommon.UpdateRecord(ctx, sctx, h, oldData, newData, touched) } // RemoveRecord implements table.Table RemoveRecord interface. -func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error { - txn, err := ctx.Txn(true) - if err != nil { - return err - } - now := txn.StartTS() - start := time.Now() - err = c.handle.LockForWrite(context.Background(), c.Meta().ID, leaseFromTS(now)) - if err != nil { - return errors.Trace(err) - } - ctx.GetSessionVars().StmtCtx.WaitLockLeaseTime += time.Since(start) - return c.TableCommon.RemoveRecord(ctx, h, r) +func (c *cachedTable) RemoveRecord(sctx sessionctx.Context, h kv.Handle, r []types.Datum) error { + txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) + return c.TableCommon.RemoveRecord(sctx, h, r) } func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData) func() { diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 83fd06f7e410e..aeddd5b972ab2 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -67,7 +67,7 @@ type StateRemote interface { LockForRead(ctx context.Context, tid int64, lease uint64) (bool, error) // LockForWrite try to add a write lock to the table with the specified tableID - LockForWrite(ctx context.Context, tid int64, lease uint64) error + LockForWrite(ctx context.Context, tid int64) (uint64, error) // RenewLease attempt to renew the read / write lock on the table with the specified tableID RenewLease(ctx context.Context, tid int64, newTs uint64, op RenewLeaseType) (bool, error) @@ -132,28 +132,32 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, ts uint6 return succ, err } -func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, ts uint64) error { +// LockForWrite try to add a write lock to the table with the specified tableID, return the write lock lease. +func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64) (uint64, error) { h.Lock() defer h.Unlock() + var ret uint64 for { - waitAndRetry, err := h.lockForWriteOnce(ctx, tid, ts) + waitAndRetry, lease, err := h.lockForWriteOnce(ctx, tid) if err != nil { - return err + return 0, err } if waitAndRetry == 0 { + ret = lease break } time.Sleep(waitAndRetry) } - return nil + return ret, nil } -func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, ts uint64) (waitAndRetry time.Duration, err error) { +func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64) (waitAndRetry time.Duration, ts uint64, err error) { err = h.runInTxn(ctx, func(ctx context.Context, now uint64) error { lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) } + ts = leaseFromTS(now) // The lease is outdated, so lock is invalid, clear orphan lock of any kind. if now > lease { if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { @@ -214,36 +218,69 @@ func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, newLease h.Lock() defer h.Unlock() + switch op { + case RenewReadLease: + return h.renewReadLease(ctx, tid, newLease) + case RenewWriteLease: + return h.renewWriteLease(ctx, tid, newLease) + } + return false, errors.New("wrong renew lease type") +} + +func (h *stateRemoteHandle) renewReadLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { var succ bool - if op == RenewReadLease { - err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { - lockType, oldLease, _, err := h.loadRow(ctx, tid) + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + lockType, oldLease, _, err := h.loadRow(ctx, tid) + if err != nil { + return errors.Trace(err) + } + if now >= oldLease { + // read lock had already expired, fail to renew + return nil + } + if lockType != CachedTableLockRead { + // Not read lock, fail to renew + return nil + } + + if newLease > oldLease { // lease should never decrease! + err = h.updateRow(ctx, tid, "READ", newLease) if err != nil { return errors.Trace(err) } - if now >= oldLease { - // read lock had already expired, fail to renew - return nil - } - if lockType != CachedTableLockRead { - // Not read lock, fail to renew - return nil - } + } + succ = true + return nil + }) + return succ, err +} - if newLease > oldLease { // lease should never decrease! - err = h.updateRow(ctx, tid, "READ", newLease) - if err != nil { - return errors.Trace(err) - } - } - succ = true +func (h *stateRemoteHandle) renewWriteLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { + var succ bool + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + lockType, oldLease, _, err := h.loadRow(ctx, tid) + if err != nil { + return errors.Trace(err) + } + if now >= oldLease { + // write lock had already expired, fail to renew return nil - }) - return succ, err - } + } + if lockType != CachedTableLockWrite { + // Not write lock, fail to renew + return nil + } - // TODO: renew for write lease - return false, errors.New("not implement yet") + if newLease > oldLease { // lease should never decrease! + err = h.updateRow(ctx, tid, "WRITE", newLease) + if err != nil { + return errors.Trace(err) + } + } + succ = true + return nil + }) + return succ, err } func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { diff --git a/table/tables/state_remote_test.go b/table/tables/state_remote_test.go index b854388aef2c3..dc4e9272b1830 100644 --- a/table/tables/state_remote_test.go +++ b/table/tables/state_remote_test.go @@ -27,19 +27,7 @@ import ( "github.com/tikv/client-go/v2/oracle" ) -// CreateMetaLockForCachedTable initializes the cached table meta lock information. -func createMetaLockForCachedTable(h session.Session) error { - createTable := "CREATE TABLE IF NOT EXISTS `mysql`.`table_cache_meta` (" + - "`tid` int(11) NOT NULL DEFAULT 0," + - "`lock_type` enum('NONE','READ', 'INTEND', 'WRITE') NOT NULL DEFAULT 'NONE'," + - "`lease` bigint(20) NOT NULL DEFAULT 0," + - "`oldReadLease` bigint(20) NOT NULL DEFAULT 0," + - "PRIMARY KEY (`tid`))" - _, err := h.ExecuteInternal(context.Background(), createTable) - return err -} - -// InitRow add a new record into the cached table meta lock table. +// initRow add a new record into the cached table meta lock table. func initRow(ctx context.Context, exec session.Session, tid int) error { _, err := exec.ExecuteInternal(ctx, "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", tid) return err @@ -54,9 +42,6 @@ func TestStateRemote(t *testing.T) { ctx := context.Background() h := tables.NewStateRemote(se) - err := createMetaLockForCachedTable(se) - require.NoError(t, err) - require.Equal(t, tables.CachedTableLockNone, tables.CachedTableLockType(0)) // Check the initial value. require.NoError(t, initRow(ctx, se, 5)) @@ -104,17 +89,18 @@ func TestStateRemote(t *testing.T) { require.Equal(t, lease, leaseVal) // Check write lock. - leaseVal = oracle.GoTimeToTS(physicalTime.Add(700 * time.Millisecond)) - require.NoError(t, h.LockForWrite(ctx, 5, leaseVal)) + writeLease, err := h.LockForWrite(ctx, 5) + require.NoError(t, err) lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockWrite) require.Equal(t, lockType.String(), "WRITE") - require.Equal(t, lease, leaseVal) + require.Equal(t, writeLease, lease) + require.Greater(t, writeLease, leaseVal) // Lock for write again - leaseVal = oracle.GoTimeToTS(physicalTime.Add(800 * time.Millisecond)) - require.NoError(t, h.LockForWrite(ctx, 5, leaseVal)) + writeLease, err = h.LockForWrite(ctx, 5) + require.NoError(t, err) lockType, _, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockWrite) @@ -130,10 +116,14 @@ func TestStateRemote(t *testing.T) { require.NoError(t, err) require.False(t, succ) - // But clear orphan write lock should success. - time.Sleep(time.Second) - leaseVal = oracle.GoTimeToTS(physicalTime.Add(2 * time.Second)) - succ, err = h.LockForRead(ctx, 5, leaseVal) + // Renew write lease. + succ, err = h.RenewLease(ctx, 5, writeLease+1, tables.RenewWriteLease) require.NoError(t, err) require.True(t, succ) + + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockWrite) + require.Equal(t, lockType.String(), "WRITE") + require.Equal(t, lease, writeLease+1) }