Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add system variable to turn on/off the large transaction feature #13299

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const (
SnapshotTS
// Set replica read
ReplicaRead
// LargeTxn option means this txn is a large txn.
LargeTxn
)

// Priority value for transaction priority.
Expand Down
7 changes: 5 additions & 2 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ type testPessimisticSuite struct {
dom *domain.Domain
}

func (s *testPessimisticSuite) SetUpSuite(c *C) {
testleak.BeforeTest()
func init() {
// Set it to 300ms for testing lock resolve.
tikv.PessimisticLockTTL = 300
}

func (s *testPessimisticSuite) SetUpSuite(c *C) {
testleak.BeforeTest()
s.cluster = mocktikv.NewCluster()
mocktikv.BootstrapWithSingleStore(s.cluster)
s.mvccStore = mocktikv.MustNewMVCCStore()
Expand Down
3 changes: 3 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ func (s *session) doCommit(ctx context.Context) error {
// Set this option for 2 phase commit to validate schema lease.
s.txn.SetOption(kv.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.sessionVars.TxnCtx.SchemaVersion, tableIDs))

if s.sessionVars.EnableLargeTxn && s.txn.Size() > int(s.sessionVars.LargeTxnSize) {
s.txn.SetOption(kv.LargeTxn, true)
}
return s.txn.Commit(sessionctx.SetCommitCtx(ctx, s))
}

Expand Down
23 changes: 23 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2882,3 +2882,26 @@ func (s *testSessionSuite) TestStmtHints(c *C) {
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower)
}

func (s *testSessionSuite) TestEnableLargeTxn(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table enable_large_txn (id int)")
tk.MustExec("set @@tidb_enable_large_txn = 0")
tk.MustQuery("select @@tidb_enable_large_txn").Check(testkit.Rows("0"))

tk.MustExec("set @@tidb_enable_large_txn = 1")
tk.MustQuery("select @@tidb_enable_large_txn").Check(testkit.Rows("1"))

tk.MustExec("set @@tidb_large_txn_size = 6666")
tk.MustQuery("select @@tidb_large_txn_size").Check(testkit.Rows("6666"))

tk.MustExec("set @@tidb_large_txn_size = 0")
tk.MustQuery("select @@tidb_large_txn_size").Check(testkit.Rows("0"))

var codeRun bool
ctx := context.WithValue(context.Background(), "checkLargeTxnEnable", &codeRun)
_, err := tk.Se.Execute(ctx, "insert into enable_large_txn values (1),(2),(3)")
c.Assert(err, IsNil)
c.Assert(codeRun, IsTrue)
}
12 changes: 12 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,12 @@ type SessionVars struct {
// TODO: remove this after tidb-server configuration "enable-streaming' removed.
EnableStreaming bool

// EnableLargeTxn enables the large transaction feature.
EnableLargeTxn bool

// LargeTxnSize is the size threshold for a transaction to be considered as large transaction.
LargeTxnSize int64

// EnableChunkRPC indicates whether the coprocessor request can use chunk API.
EnableChunkRPC bool

Expand Down Expand Up @@ -529,6 +535,8 @@ func NewSessionVars() *SessionVars {
UsePlanBaselines: DefTiDBUsePlanBaselines,
isolationReadEngines: map[kv.StoreType]struct{}{kv.TiKV: {}, kv.TiFlash: {}},
LockWaitTimeout: DefInnodbLockWaitTimeout * 1000,
EnableLargeTxn: DefTiDBEnableLargeTxn,
LargeTxnSize: DefTiDBLargeTxnSize,
}
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
Expand Down Expand Up @@ -920,6 +928,10 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.DisableTxnAutoRetry = TiDBOptOn(val)
case TiDBEnableStreaming:
s.EnableStreaming = TiDBOptOn(val)
case TiDBEnableLargeTxn:
s.EnableLargeTxn = TiDBOptOn(val)
case TiDBLargeTxnSize:
s.LargeTxnSize = tidbOptInt64(val, DefTiDBLargeTxnSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we need to add TiDBLargeTxnSize set value check in ValidateSetSystemVar

case TiDBEnableChunkRPC:
s.EnableChunkRPC = TiDBOptOn(val)
case TiDBEnableCascadesPlanner:
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,8 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBEnableVectorizedExpression, BoolToIntStr(DefEnableVectorizedExpression)},
{ScopeGlobal | ScopeSession, TiDBEnableFastAnalyze, BoolToIntStr(DefTiDBUseFastAnalyze)},
{ScopeGlobal | ScopeSession, TiDBSkipIsolationLevelCheck, BoolToIntStr(DefTiDBSkipIsolationLevelCheck)},
{ScopeGlobal | ScopeSession, TiDBEnableLargeTxn, BoolToIntStr(DefTiDBEnableLargeTxn)},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use a config file option, so we can deprecate it later.

{ScopeGlobal | ScopeSession, TiDBLargeTxnSize, strconv.Itoa(DefTiDBLargeTxnSize)},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use a constant for it instead of a session variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constant for TiDBEnableLargeTxn and TiDBLargeTxnSize ? or for DefTiDBXXX

/* The following variable is defined as session scope but is actually server scope. */
{ScopeSession, TiDBGeneralLog, strconv.Itoa(DefTiDBGeneralLog)},
{ScopeSession, TiDBSlowLogThreshold, strconv.Itoa(logutil.DefaultSlowThreshold)},
Expand Down
8 changes: 8 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ const (
// off: always disable table partition.
TiDBEnableTablePartition = "tidb_enable_table_partition"

// tidb_enable_large_txn enables TiDB to use the large transaction feature.
TiDBEnableLargeTxn = "tidb_enable_large_txn"

// tidb_large_txn_size controls the size threshold for a transaction to use the large transaction feature.
TiDBLargeTxnSize = "tidb_large_txn_size"

// TiDBCheckMb4ValueInUTF8 is used to control whether to enable the check wrong utf8 value.
TiDBCheckMb4ValueInUTF8 = "tidb_check_mb4_value_in_utf8"

Expand Down Expand Up @@ -414,6 +420,8 @@ const (
DefTiDBAllowRemoveAutoInc = false
DefTiDBUsePlanBaselines = true
DefInnodbLockWaitTimeout = 50 // 50s
DefTiDBEnableLargeTxn = false
DefTiDBLargeTxnSize = 4 << 20 // 4M
)

// Process global variables.
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) {
c.Assert(vars.AllowWriteRowID, Equals, DefOptWriteRowID)
c.Assert(vars.TiDBOptJoinReorderThreshold, Equals, DefTiDBOptJoinReorderThreshold)
c.Assert(vars.EnableFastAnalyze, Equals, DefTiDBUseFastAnalyze)
c.Assert(vars.EnableLargeTxn, Equals, DefTiDBEnableLargeTxn)
c.Assert(vars.LargeTxnSize, Equals, int64(DefTiDBLargeTxnSize))

assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.Concurrency))
assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.MemQuota))
Expand Down
15 changes: 14 additions & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,10 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys, txnSize uint64
isPessimisticLock[i] = true
}
}
var minCommitTS uint64
if c.txn.IsLargeTxn() {
minCommitTS = c.startTS + 1
}
req := &pb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
Expand All @@ -497,11 +501,12 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys, txnSize uint64
IsPessimisticLock: isPessimisticLock,
ForUpdateTs: c.forUpdateTS,
TxnSize: txnSize,
MinCommitTs: minCommitTS,
}
return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, pb.Context{Priority: c.priority, SyncLog: c.syncLog})
}

func (actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
txnSize := uint64(c.regionTxnSize[batch.region.id])
// When we retry because of a region miss, we don't know the transaction size. We set the transaction size here
// to MaxUint64 to avoid unexpected "resolve lock lite".
Expand All @@ -510,6 +515,14 @@ func (actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, bat
}

req := c.buildPrewriteRequest(batch, txnSize)
if x := bo.ctx.Value("checkLargeTxnEnable"); x != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has some extra cost, can we avoid it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried failpoint but the concurrent test is a problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test doesn't make much sense.

codeRun := x.(*bool)
*codeRun = true
if req.Prewrite().MinCommitTs == 0 {
return errors.New("minCommitTS should greater then 0 in the large txn")
}
}

for {
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
if err != nil {
Expand Down
15 changes: 13 additions & 2 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ func (s *testLockSuite) prewriteTxnWithTTL(c *C, txn *tikvTxn, ttl uint64) {
c.Assert(err, IsNil)
}

func mustNewTwoPhaseCommitter(c *C, txn *tikvTxn) *twoPhaseCommitter {
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
return committer
}

func (s *testLockSuite) mustGetLock(c *C, key []byte) *Lock {
ver, err := s.store.CurrentVersion()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -427,9 +433,12 @@ func (s *testLockSuite) TestLockTTL(c *C) {
c.Assert(err, IsNil)
txn.Set(kv.Key("key"), []byte("value"))
time.Sleep(time.Millisecond)
s.prewriteTxnWithTTL(c, txn.(*tikvTxn), 1000)
bo := NewBackoffer(context.Background(), PrewriteMaxBackoff)
committer := mustNewTwoPhaseCommitter(c, txn.(*tikvTxn))
c.Assert(committer.prewriteKeys(bo, committer.keys), IsNil)
l := s.mustGetLock(c, []byte("key"))
c.Assert(l.TTL >= defaultLockTTL, IsTrue)
committer.cleanupKeys(bo, committer.keys)

// Huge txn has a greater TTL.
txn, err = s.store.Begin()
Expand All @@ -440,9 +449,11 @@ func (s *testLockSuite) TestLockTTL(c *C) {
k, v := randKV(1024, 1024)
txn.Set(kv.Key(k), []byte(v))
}
s.prewriteTxn(c, txn.(*tikvTxn))
committer = mustNewTwoPhaseCommitter(c, txn.(*tikvTxn))
c.Assert(committer.prewriteKeys(bo, committer.keys), IsNil)
l = s.mustGetLock(c, []byte("key"))
s.ttlEquals(c, l.TTL, uint64(ttlFactor*2)+uint64(time.Since(start)/time.Millisecond))
committer.cleanupKeys(bo, committer.keys)

// Txn with long read time.
start = time.Now()
Expand Down
4 changes: 4 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ func (txn *tikvTxn) DelOption(opt kv.Option) {
txn.us.DelOption(opt)
}

func (txn *tikvTxn) IsLargeTxn() bool {
return txn.us.GetOption(kv.LargeTxn) != nil
}

func (txn *tikvTxn) IsPessimistic() bool {
return txn.us.GetOption(kv.Pessimistic) != nil
}
Expand Down