Skip to content

Commit

Permalink
table/tables: fix a corner case of renew lease operation on cached ta…
Browse files Browse the repository at this point in the history
…ble (#32643)

close #32642
  • Loading branch information
tiancaiamao authored Mar 15, 2022
1 parent e0db77e commit ecffe84
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 50 deletions.
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ func (c *cachedTableRenewLease) renew(ctx context.Context, handle tables.StateRe
physicalTime := oracle.GetTimeFromTS(oldLease)
newLease := oracle.GoTimeToTS(physicalTime.Add(cacheTableWriteLease))

succ, err := handle.RenewLease(ctx, tid, newLease, tables.RenewWriteLease)
succ, err := handle.RenewWriteLease(ctx, tid, newLease)
if err != nil {
return errors.Trace(err)
}
Expand Down
42 changes: 24 additions & 18 deletions table/tables/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
Expand All @@ -32,16 +33,6 @@ import (
"go.uber.org/zap"
)

// RenewLeaseType define the type for renew lease.
type RenewLeaseType int

const (
// RenewReadLease means renew read lease.
RenewReadLease RenewLeaseType = iota + 1
// RenewWriteLease means renew write lease.
RenewWriteLease
)

var (
_ table.CachedTable = &cachedTable{}
)
Expand All @@ -52,7 +43,6 @@ type cachedTable struct {
handle StateRemote
totalSize int64

lockingForRead tokenLimit
renewReadLease tokenLimit
}

Expand Down Expand Up @@ -91,7 +81,13 @@ func (c *cachedTable) TryReadFromCache(ts uint64, leaseDuration time.Duration) k
leaseTime := oracle.GetTimeFromTS(data.Lease)
nowTime := oracle.GetTimeFromTS(ts)
distance := leaseTime.Sub(nowTime)
if distance >= 0 && distance <= leaseDuration/2 {

var triggerFailpoint bool
failpoint.Inject("mockRenewLeaseABA1", func(_ failpoint.Value) {
triggerFailpoint = true
})

if distance >= 0 && distance <= leaseDuration/2 || triggerFailpoint {
select {
case c.renewReadLease <- struct{}{}:
go c.renewLease(ts, data, leaseDuration)
Expand All @@ -107,7 +103,6 @@ func (c *cachedTable) TryReadFromCache(ts uint64, leaseDuration time.Duration) k
func newCachedTable(tbl *TableCommon) (table.Table, error) {
ret := &cachedTable{
TableCommon: *tbl,
lockingForRead: make(chan struct{}, 1),
renewReadLease: make(chan struct{}, 1),
}
return ret, nil
Expand Down Expand Up @@ -171,7 +166,7 @@ func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64)

func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64, leaseDuration time.Duration) {
select {
case c.lockingForRead <- struct{}{}:
case c.renewReadLease <- struct{}{}:
go c.updateLockForRead(ctx, store, ts, leaseDuration)
default:
// There is a inflight calling already.
Expand All @@ -185,7 +180,7 @@ func (c *cachedTable) updateLockForRead(ctx context.Context, store kv.Storage, t
zap.Reflect("r", r),
zap.Stack("stack trace"))
}
<-c.lockingForRead
<-c.renewReadLease
}()

// Load data from original table and the update lock information.
Expand Down Expand Up @@ -249,20 +244,31 @@ func (c *cachedTable) RemoveRecord(sctx sessionctx.Context, h kv.Handle, r []typ
return c.TableCommon.RemoveRecord(sctx, h, r)
}

// TestMockRenewLeaseABA2 is used by test function TestRenewLeaseABAFailPoint.
var TestMockRenewLeaseABA2 chan struct{}

func (c *cachedTable) renewLease(ts uint64, data *cacheData, leaseDuration time.Duration) {
defer func() { <-c.renewReadLease }()

failpoint.Inject("mockRenewLeaseABA2", func(_ failpoint.Value) {
<-TestMockRenewLeaseABA2
})

tid := c.Meta().ID
lease := leaseFromTS(ts, leaseDuration)
succ, err := c.handle.RenewLease(context.Background(), tid, lease, RenewReadLease)
newLease, err := c.handle.RenewReadLease(context.Background(), tid, data.Lease, lease)
if err != nil && !kv.IsTxnRetryableError(err) {
log.Warn("Renew read lease error", zap.Error(err))
}
if succ {
if newLease > 0 {
c.cacheData.Store(&cacheData{
Start: data.Start,
Lease: lease,
Lease: newLease,
MemBuffer: data.MemBuffer,
})
}

failpoint.Inject("mockRenewLeaseABA2", func(_ failpoint.Value) {
TestMockRenewLeaseABA2 <- struct{}{}
})
}
60 changes: 60 additions & 0 deletions table/tables/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/auth"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/pingcap/tidb/util/stmtsummary"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func lastReadFromCache(tk *testkit.TestKit) bool {
Expand Down Expand Up @@ -588,3 +590,61 @@ func TestMetrics(t *testing.T) {
hit := pb.GetCounter().GetValue()
require.Equal(t, i, hit)
}

func TestRenewLeaseABAFailPoint(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

tables.TestMockRenewLeaseABA2 = make(chan struct{})

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t_lease;")
tk.MustExec(`create table t_lease(a int, b int);`)
tk.MustExec(`insert into t_lease values (1, 1)`)
tk.MustExec(`alter table t_lease cache`)

tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")

// Load the cache data by this query.
var cacheUsed bool
for i := 0; i < 10; i++ {
tk.MustQuery("select * from t_lease").Check(testkit.Rows("1 1"))
if lastReadFromCache(tk) {
cacheUsed = true
break
}
time.Sleep(50 * time.Millisecond)
}
require.True(t, cacheUsed)

// Renew lease by this query, mock the operation is delayed.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/table/tables/mockRenewLeaseABA1", `return`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/table/tables/mockRenewLeaseABA2", `return`))
tk.MustQuery("select * from t_lease").Check(testkit.Rows("1 1"))

// Make the cache data stale after writing: read lock-> write lock
tk1.MustExec("update t_lease set b = 2 where a = 1")

// Mock reading from another TiDB instance: write lock -> read lock
is := tk2.Session().GetInfoSchema().(infoschema.InfoSchema)
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t_lease"))
require.NoError(t, err)
lease := oracle.GoTimeToTS(time.Now().Add(20 * time.Second)) // A big enough future time
tk2.MustExec("update mysql.table_cache_meta set lock_type = 'READ', lease = ? where tid = ?", lease, tbl.Meta().ID)

// Then the stagnant renew lease operation finally arrive.
tables.TestMockRenewLeaseABA2 <- struct{}{}

<-tables.TestMockRenewLeaseABA2
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/table/tables/mockRenewLeaseABA1"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/table/tables/mockRenewLeaseABA2"))

// The renew lease operation should not success,
// And the session should not read from a staled cache data.
tk.MustQuery("select * from t_lease").Check(testkit.Rows("1 2"))
require.False(t, lastReadFromCache(tk))
}
63 changes: 39 additions & 24 deletions table/tables/state_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ type StateRemote interface {
// LockForWrite try to add a write lock to the table with the specified tableID
LockForWrite(ctx context.Context, tid int64, leaseDuration time.Duration) (uint64, error)

// RenewLease attempt to renew the read / write lock on the table with the specified tableID
RenewLease(ctx context.Context, tid int64, newTs uint64, op RenewLeaseType) (bool, error)
// RenewReadLease attempt to renew the read lock lease on the table with the specified tableID
RenewReadLease(ctx context.Context, tid int64, oldLocalLease, newValue uint64) (uint64, error)

// RenewWriteLease attempt to renew the write lock lease on the table with the specified tableID
RenewWriteLease(ctx context.Context, tid int64, newTs uint64) (bool, error)
}

type sqlExec interface {
Expand Down Expand Up @@ -214,27 +217,17 @@ func waitForLeaseExpire(oldReadLease, now uint64) time.Duration {
return 0
}

func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, newLease uint64, op RenewLeaseType) (bool, error) {
h.Lock()
defer h.Unlock()

switch op {
case RenewReadLease:
return h.renewReadLease(ctx, tid, newLease)
case RenewWriteLease:
return h.renewWriteLease(ctx, tid, newLease)
}
return false, errors.New("wrong renew lease type")
}

func (h *stateRemoteHandle) renewReadLease(ctx context.Context, tid int64, newLease uint64) (bool, error) {
var succ bool
// RenewReadLease renew the read lock lease.
// Return the current lease value on success, and return 0 on fail.
func (h *stateRemoteHandle) RenewReadLease(ctx context.Context, tid int64, oldLocalLease, newValue uint64) (uint64, error) {
var newLease uint64
err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error {
lockType, oldLease, _, err := h.loadRow(ctx, tid)
lockType, remoteLease, _, err := h.loadRow(ctx, tid)
if err != nil {
return errors.Trace(err)
}
if now >= oldLease {

if now >= remoteLease {
// read lock had already expired, fail to renew
return nil
}
Expand All @@ -243,19 +236,36 @@ func (h *stateRemoteHandle) renewReadLease(ctx context.Context, tid int64, newLe
return nil
}

if newLease > oldLease { // lease should never decrease!
err = h.updateRow(ctx, tid, "READ", newLease)
// It means that the lease had already been changed by some other TiDB instances.
if oldLocalLease != remoteLease {
// 1. Data in [cacheDataTS -------- oldLocalLease) time range is also immutable.
// 2. Data in [ now ------------------- remoteLease) time range is immutable.
//
// If now < oldLocalLease, it means data in all the time range is immutable,
// so the old cache data is still available.
if now < oldLocalLease {
newLease = remoteLease
}
// Otherwise, there might be write operation during the oldLocalLease and the new remoteLease
// Make renew lease operation fail.
return nil
}

if newValue > remoteLease { // lease should never decrease!
err = h.updateRow(ctx, tid, "READ", newValue)
if err != nil {
return errors.Trace(err)
}
newLease = newValue
} else {
newLease = remoteLease
}
succ = true
return nil
})
return succ, err
return newLease, err
}

func (h *stateRemoteHandle) renewWriteLease(ctx context.Context, tid int64, newLease uint64) (bool, error) {
func (h *stateRemoteHandle) RenewWriteLease(ctx context.Context, tid int64, newLease uint64) (bool, error) {
var succ bool
err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error {
lockType, oldLease, _, err := h.loadRow(ctx, tid)
Expand Down Expand Up @@ -304,6 +314,11 @@ func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Co
return errors.Trace(err)
}

_, err = h.execSQL(ctx, "set @@session.tidb_retry_limit = 0")
if err != nil {
return errors.Trace(err)
}

rows, err := h.execSQL(ctx, "select @@tidb_current_ts")
if err != nil {
return errors.Trace(err)
Expand Down
14 changes: 7 additions & 7 deletions table/tables/state_remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ func TestStateRemote(t *testing.T) {

// Renew read lock lease operation.
leaseVal = oracle.GoTimeToTS(physicalTime.Add(400 * time.Millisecond))
succ, err = h.RenewLease(ctx, 5, leaseVal, tables.RenewReadLease)
leaseVal, err = h.RenewReadLease(ctx, 5, lease, leaseVal)
require.NoError(t, err)
require.True(t, succ)
require.True(t, leaseVal > 0)
lockType, lease, err = h.Load(ctx, 5)
require.NoError(t, err)
require.Equal(t, lockType, tables.CachedTableLockRead)
Expand All @@ -101,23 +101,23 @@ func TestStateRemote(t *testing.T) {
// Lock for write again
writeLease, err = h.LockForWrite(ctx, 5, 3*time.Second)
require.NoError(t, err)
lockType, _, err = h.Load(ctx, 5)
lockType, lease, err = h.Load(ctx, 5)
require.NoError(t, err)
require.Equal(t, lockType, tables.CachedTableLockWrite)
require.Equal(t, lockType.String(), "WRITE")

// Renew read lock lease should fail when the write lock is hold.
succ, err = h.RenewLease(ctx, 5, leaseVal, tables.RenewReadLease)
leaseVal, err = h.RenewReadLease(ctx, 5, lease, lease+1)
require.NoError(t, err)
require.False(t, succ)
require.False(t, leaseVal > 0)

// Acquire read lock should also fail when the write lock is hold.
succ, err = h.LockForRead(ctx, 5, leaseVal)
succ, err = h.LockForRead(ctx, 5, lease+1)
require.NoError(t, err)
require.False(t, succ)

// Renew write lease.
succ, err = h.RenewLease(ctx, 5, writeLease+1, tables.RenewWriteLease)
succ, err = h.RenewWriteLease(ctx, 5, writeLease+1)
require.NoError(t, err)
require.True(t, succ)

Expand Down

0 comments on commit ecffe84

Please sign in to comment.