Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: avoid pass session ID by context & fix no schemaChecker error log #24696

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
}()

sctx := a.Ctx
ctx = util.SetSessionID(ctx, sctx.GetSessionVars().ConnectionID)
Copy link
Contributor

@lysu lysu May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems some other test case

if v := bo.GetCtx().Value(util.SessionID); v != nil {
need this ..

@MyonKeminta PTAL

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the test, you can see the _test.go file changes in this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems the previous

inject = false

will be used in some test case that out of tidb repo
https://github.com/pingcap/automated-tests/blob/cc44a56fd59cfdb71eaf37210e0861ba7acdc925/ticases/transaction/asynccommit/linearizability.go#L445

I'm not sure whether it works 😞

Copy link
Contributor

@cfzjywxk cfzjywxk May 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we may change the session id related logic too or abstract a isInnerExecution accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's a problem. Some failpoints that's used by tests outside the repo checks the session id and works only when session id != 0. These kind of usages also occurs in 2pc.go, prewrite.go, etc.

if _, ok := a.Plan.(*plannercore.Analyze); ok && sctx.GetSessionVars().InRestrictedSQL {
oriStats, _ := sctx.GetSessionVars().GetSystemVar(variable.TiDBBuildStatsConcurrency)
oriScan := sctx.GetSessionVars().DistSQLScanConcurrency()
Expand Down Expand Up @@ -582,6 +581,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
var lockKeyStats *util.LockKeysDetails
ctx = context.WithValue(ctx, util.LockKeysDetailCtxKey, &lockKeyStats)
startLocking := time.Now()
txn.SetOption(kv.SessionID, seVars.ConnectionID)
err = txn.LockKeys(ctx, lockCtx, keys...)
if lockKeyStats != nil {
seVars.StmtCtx.MergeLockKeysExecDetails(lockKeyStats)
Expand Down
3 changes: 2 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,8 @@ func doLockKeys(ctx context.Context, se sessionctx.Context, lockCtx *tikvstore.L
}
var lockKeyStats *tikvutil.LockKeysDetails
ctx = context.WithValue(ctx, tikvutil.LockKeysDetailCtxKey, &lockKeyStats)
err = txn.LockKeys(tikvutil.SetSessionID(ctx, se.GetSessionVars().ConnectionID), lockCtx, keys...)
txn.SetOption(kv.SessionID, se.GetSessionVars().ConnectionID)
err = txn.LockKeys(ctx, lockCtx, keys...)
if lockKeyStats != nil {
sctx.MergeLockKeysExecDetails(lockKeyStats)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
tikvutil "github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -949,7 +948,8 @@ func (e *SimpleExec) executeAlterUser(s *ast.AlterUserStmt) error {
if err != nil {
return err
}
err = txn.Commit(tikvutil.SetSessionID(context.TODO(), e.ctx.GetSessionVars().ConnectionID))
txn.SetOption(kv.SessionID, e.ctx.GetSessionVars().ConnectionID)
err = txn.Commit(context.TODO())
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.21.2+incompatible h1:U+YvJfjCh6MslYlIAXvPtzhW3YZEtc9uncueUNpD/0A=
Expand Down
2 changes: 2 additions & 0 deletions kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ const (
IsStalenessReadOnly
// MatchStoreLabels indicates the labels the store should be matched
MatchStoreLabels
// SessionID store the session (connection) ID to the transaction.
SessionID
)

// ReplicaReadType is the type of replica to read data from
Expand Down
3 changes: 2 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ func (s *session) doCommit(ctx context.Context) error {
}
s.txn.SetOption(kv.EnableAsyncCommit, s.GetSessionVars().EnableAsyncCommit)
s.txn.SetOption(kv.Enable1PC, s.GetSessionVars().Enable1PC)
s.txn.SetOption(kv.SessionID, s.GetSessionVars().ConnectionID)
// priority of the sysvar is lower than `start transaction with causal consistency only`
if val := s.txn.GetOption(kv.GuaranteeLinearizability); val == nil || val.(bool) {
// We needn't ask the TiKV client to guarantee linearizability for auto-commit transactions
Expand All @@ -529,7 +530,7 @@ func (s *session) doCommit(ctx context.Context) error {
s.GetSessionVars().TxnCtx.IsExplicit && s.GetSessionVars().GuaranteeLinearizability)
}

return s.txn.Commit(tikvutil.SetSessionID(ctx, s.GetSessionVars().ConnectionID))
return s.txn.Commit(ctx)
}

// removeTempTableFromBuffer filters out the temporary table key-values.
Expand Down
2 changes: 2 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
txn.KVTxn.GetSnapshot().SetIsStatenessReadOnly(val.(bool))
case kv.MatchStoreLabels:
txn.KVTxn.GetSnapshot().SetMatchStoreLabels(val.([]*metapb.StoreLabel))
case kv.SessionID:
txn.KVTxn.SetSessionID(val.(uint64))
}
}

Expand Down
17 changes: 9 additions & 8 deletions store/tikv/tests/1pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/util"
)

func (s *testAsyncCommitCommon) begin1PC(c *C) tikv.TxnProbe {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.SetEnable1PC(true)
txn.SetSessionID(1)
return tikv.TxnProbe{KVTxn: txn}
}

Expand All @@ -44,7 +44,7 @@ func (s *testOnePCSuite) SetUpTest(c *C) {
}

func (s *testOnePCSuite) Test1PC(c *C) {
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
ctx := context.Background()

k1 := []byte("k1")
v1 := []byte("v1")
Expand All @@ -67,20 +67,21 @@ func (s *testOnePCSuite) Test1PC(c *C) {
txn = s.begin1PC(c)
err = txn.Set(k2, v2)
c.Assert(err, IsNil)
txn.SetSessionID(0)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
c.Assert(txn.GetCommitter().IsOnePC(), IsFalse)
c.Assert(txn.GetCommitter().GetOnePCCommitTS(), Equals, uint64(0))
c.Assert(txn.GetCommitter().GetCommitTS(), Greater, txn.StartTS())

// 1PC doesn't work if system variable not set

k3 := []byte("k3")
v3 := []byte("v3")

txn = s.begin(c)
err = txn.Set(k3, v3)
c.Assert(err, IsNil)
txn.SetSessionID(1)
err = txn.Commit(ctx)
c.Assert(err, IsNil)
c.Assert(txn.GetCommitter().IsOnePC(), IsFalse)
Expand Down Expand Up @@ -142,7 +143,7 @@ func (s *testOnePCSuite) Test1PC(c *C) {
}

func (s *testOnePCSuite) Test1PCIsolation(c *C) {
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
ctx := context.Background()

k := []byte("k")
v1 := []byte("v1")
Expand Down Expand Up @@ -183,7 +184,7 @@ func (s *testOnePCSuite) Test1PCDisallowMultiRegion(c *C) {
return
}

ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
ctx := context.Background()

txn := s.begin1PC(c)

Expand Down Expand Up @@ -234,7 +235,7 @@ func (s *testOnePCSuite) Test1PCLinearizability(c *C) {
c.Assert(err, IsNil)
err = t2.Set([]byte("b"), []byte("b1"))
c.Assert(err, IsNil)
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
ctx := context.Background()
// t2 commits earlier than t1
err = t2.Commit(ctx)
c.Assert(err, IsNil)
Expand All @@ -255,7 +256,7 @@ func (s *testOnePCSuite) Test1PCWithMultiDC(c *C) {
err := localTxn.Set([]byte("a"), []byte("a1"))
localTxn.SetScope("bj")
c.Assert(err, IsNil)
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
ctx := context.Background()
err = localTxn.Commit(ctx)
c.Assert(err, IsNil)
c.Assert(localTxn.GetCommitter().IsOnePC(), IsFalse)
Expand All @@ -276,7 +277,7 @@ func (s *testOnePCSuite) TestTxnCommitCounter(c *C) {
txn := s.begin(c)
err := txn.Set([]byte("k"), []byte("v"))
c.Assert(err, IsNil)
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
ctx := context.Background()
err = txn.Commit(ctx)
c.Assert(err, IsNil)
curr := metrics.GetTxnCommitCounter()
Expand Down
13 changes: 7 additions & 6 deletions store/tikv/tests/async_commit_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/store/tikv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/util"
)

type testAsyncCommitFailSuite struct {
Expand Down Expand Up @@ -57,7 +56,7 @@ func (s *testAsyncCommitFailSuite) TestFailAsyncCommitPrewriteRpcErrors(c *C) {
t1 := s.beginAsyncCommit(c)
err := t1.Set([]byte("a"), []byte("a1"))
c.Assert(err, IsNil)
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
ctx := context.Background()
err = t1.Commit(ctx)
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, terror.ErrResultUndetermined), IsTrue, Commentf("%s", errors.ErrorStack(err)))
Expand Down Expand Up @@ -99,7 +98,7 @@ func (s *testAsyncCommitFailSuite) TestAsyncCommitPrewriteCancelled(c *C) {
c.Assert(err, IsNil)
err = t1.Set([]byte("z"), []byte("z"))
c.Assert(err, IsNil)
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
ctx := context.Background()
err = t1.Commit(ctx)
c.Assert(err, NotNil)
_, ok := errors.Cause(err).(*tikverr.ErrWriteConflict)
Expand All @@ -117,7 +116,7 @@ func (s *testAsyncCommitFailSuite) TestPointGetWithAsyncCommit(c *C) {

// PointGet cannot ignore async commit transactions' locks.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", "return"), IsNil)
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
ctx := context.Background()
err := txn.Commit(ctx)
c.Assert(err, IsNil)
c.Assert(txn.GetCommitter().IsAsyncCommit(), IsTrue)
Expand Down Expand Up @@ -167,9 +166,10 @@ func (s *testAsyncCommitFailSuite) TestSecondaryListInPrimaryLock(c *C) {
var sessionID uint64 = 0
test := func(keys []string, values []string) {
sessionID++
ctx := context.WithValue(context.Background(), util.SessionID, sessionID)
ctx := context.Background()

txn := s.beginAsyncCommit(c)
txn.SetSessionID(sessionID)
for i := range keys {
txn.Set([]byte(keys[i]), []byte(values[i]))
}
Expand Down Expand Up @@ -228,7 +228,8 @@ func (s *testAsyncCommitFailSuite) TestAsyncCommitContextCancelCausingUndetermin
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/rpcContextCancelErr"), IsNil)
}()

ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
ctx := context.Background()
txn.SetSessionID(1)
err = txn.Commit(ctx)
c.Assert(err, NotNil)
c.Assert(txn.GetCommitter().GetUndeterminedErr(), NotNil)
Expand Down
1 change: 1 addition & 0 deletions store/tikv/tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (s *testAsyncCommitCommon) beginAsyncCommit(c *C) tikv.TxnProbe {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.SetEnableAsyncCommit(true)
txn.SetSessionID(1)
return tikv.TxnProbe{KVTxn: txn}
}

Expand Down
32 changes: 10 additions & 22 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type KVTxn struct {
causalConsistency bool
scope string
kvFilter KVFilter
sessionID uint64
}

func extractStartTs(store *KVStore, options kv.TransactionOption) (uint64, error) {
Expand Down Expand Up @@ -285,6 +286,11 @@ func (txn *KVTxn) SetKVFilter(filter KVFilter) {
txn.kvFilter = filter
}

// SetSessionID sets the session ID for this transaction.
func (txn *KVTxn) SetSessionID(id uint64) {
txn.sessionID = id
}

// IsPessimistic returns true if it is pessimistic.
func (txn *KVTxn) IsPessimistic() bool {
return txn.isPessimistic
Expand Down Expand Up @@ -325,18 +331,11 @@ func (txn *KVTxn) Commit(ctx context.Context) error {
start := time.Now()
defer func() { metrics.TxnCmdHistogramWithCommit.Observe(time.Since(start).Seconds()) }()

// sessionID is used for log.
var sessionID uint64
val := ctx.Value(util.SessionID)
if val != nil {
sessionID = val.(uint64)
}

var err error
// If the txn use pessimistic lock, committer is initialized.
committer := txn.committer
if committer == nil {
committer, err = newTwoPhaseCommitter(txn, sessionID)
committer, err = newTwoPhaseCommitter(txn, txn.sessionID)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -369,9 +368,7 @@ func (txn *KVTxn) Commit(ctx context.Context) error {
// pessimistic transaction should also bypass latch.
if txn.store.txnLatches == nil || txn.IsPessimistic() {
err = committer.execute(ctx)
if val == nil || sessionID > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this make the following logic executed unexpected for the inner session statements whose session id is 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For internal queries, val is always nil; for external queries sessionID > 0 always hold,
so val == nil || sessionID > 0 is basically always true? @MyonKeminta

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I've change it to:
sessionID > 0 => onCommit
sessionID == 0 => nothing

The original author means to call onCommit for external queries (although val == nil may be confusing in the old code).
@cfzjywxk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it's hard to distinguish external from internal queries form sessionID?

InRestrictedSQL also can call doCommit

s.txn.SetOption(kv.SessionID, s.GetSessionVars().ConnectionID)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we take a session from the session pool to execute the SQL, sessionID is not set.
However, if we direct use the session to execute a internal SQL, that might be a problem.

txn.onCommitted(err)
}
txn.onCommitted(err)
logutil.Logger(ctx).Debug("[kv] txnLatches disabled, 2pc directly", zap.Error(err))
return errors.Trace(err)
}
Expand All @@ -390,9 +387,7 @@ func (txn *KVTxn) Commit(ctx context.Context) error {
return &tikverr.ErrWriteConflictInLatch{StartTS: txn.startTS}
}
err = committer.execute(ctx)
if val == nil || sessionID > 0 {
txn.onCommitted(err)
}
txn.onCommitted(err)
if err == nil {
lock.SetCommitTS(committer.commitTS)
}
Expand Down Expand Up @@ -548,14 +543,7 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
keys = deduplicateKeys(keys)
if txn.IsPessimistic() && lockCtx.ForUpdateTS > 0 {
if txn.committer == nil {
// sessionID is used for log.
var sessionID uint64
var err error
val := ctx.Value(util.SessionID)
if val != nil {
sessionID = val.(uint64)
}
txn.committer, err = newTwoPhaseCommitter(txn, sessionID)
txn.committer, err = newTwoPhaseCommitter(txn, txn.sessionID)
if err != nil {
return err
}
Expand Down