Skip to content

Commit

Permalink
Add tidb_low_resolution_tso session scope variable on master (pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaoguang authored and db-storage committed May 29, 2019
1 parent 823d010 commit e8939b8
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 3 deletions.
4 changes: 4 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
if snapshotTS != 0 {
return nil, errors.New("can not execute write statement when 'tidb_snapshot' is set")
}
lowResolutionTSO := sctx.GetSessionVars().LowResolutionTSO
if lowResolutionTSO {
return nil, errors.New("can not execute write statement when 'tidb_low_resolution_tso' is set")
}
}

var err error
Expand Down
22 changes: 22 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2117,6 +2117,28 @@ func (s *testSuite) TestHistoryRead(c *C) {
tk.MustQuery("select * from history_read order by a").Check(testkit.Rows("2 <nil>", "4 <nil>", "8 8", "9 9"))
}

func (s *testSuite) TestLowResolutionTSORead(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set @@autocommit=1")
tk.MustExec("use test")
tk.MustExec("drop table if exists low_resolution_tso")
tk.MustExec("create table low_resolution_tso(a int)")
tk.MustExec("insert low_resolution_tso values (1)")

// enable low resolution tso
c.Assert(tk.Se.GetSessionVars().LowResolutionTSO, IsFalse)
tk.Exec("set @@tidb_low_resolution_tso = 'on'")
c.Assert(tk.Se.GetSessionVars().LowResolutionTSO, IsTrue)

time.Sleep(3 * time.Second)
tk.MustQuery("select * from low_resolution_tso").Check(testkit.Rows("1"))
_, err := tk.Exec("update low_resolution_tso set a = 2")
c.Assert(err, NotNil)
tk.MustExec("set @@tidb_low_resolution_tso = 'off'")
tk.MustExec("update low_resolution_tso set a = 2")
tk.MustQuery("select * from low_resolution_tso").Check(testkit.Rows("2"))
}

func (s *testSuite) TestScanControlSelection(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
7 changes: 6 additions & 1 deletion session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,12 @@ func (s *session) getTxnFuture(ctx context.Context) *txnFuture {
}

oracleStore := s.store.GetOracle()
tsFuture := oracleStore.GetTimestampAsync(ctx)
var tsFuture oracle.Future
if s.sessionVars.LowResolutionTSO {
tsFuture = oracleStore.GetLowResolutionTimestampAsync(ctx)
} else {
tsFuture = oracleStore.GetTimestampAsync(ctx)
}
ret := &txnFuture{future: tsFuture, store: s.store}
if x := ctx.Value("mockGetTSFail"); x != nil {
ret.mockFail = true
Expand Down
6 changes: 5 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,12 @@ type SessionVars struct {
// EnableFastAnalyze indicates whether to take fast analyze.
EnableFastAnalyze bool

// PessimisticLock indicates whether new transaction should be pessimistic .
// PessimisticLock indicates whether new transaction should be pessimistic.
PessimisticLock bool

MaxExecutionTime uint64
// LowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds.
LowResolutionTSO bool
}

// ConnectionInfo present connection used by audit.
Expand Down Expand Up @@ -797,6 +799,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.WaitTableSplitFinish = TiDBOptOn(val)
case TiDBPessimisticLock:
s.PessimisticLock = TiDBOptOn(val)
case TiDBLowResolutionTSO:
s.LowResolutionTSO = TiDBOptOn(val)
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBCheckMb4ValueInUTF8, BoolToIntStr(config.GetGlobalConfig().CheckMb4ValueInUTF8)},
{ScopeSession, TiDBSlowQueryFile, ""},
{ScopeSession, TiDBWaitTableSplitFinish, BoolToIntStr(DefTiDBWaitTableSplitFinish)},
{ScopeSession, TiDBLowResolutionTSO, "0"},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ func (*testSysVarSuite) TestSysVar(c *C) {
c.Assert(f, NotNil)
c.Assert(f.Value, Equals, "4000")

f = GetSysVar("tidb_low_resolution_tso")
c.Assert(f.Value, Equals, "0")
}
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ const (
// tidb_skip_isolation_level_check is used to control whether to return error when set unsupported transaction
// isolation level.
TiDBSkipIsolationLevelCheck = "tidb_skip_isolation_level_check"

// TiDBLowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds
TiDBLowResolutionTSO = "tidb_low_resolution_tso"
)

// TiDB system variable names that both in session and global scope.
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
TiDBOptInSubqToJoinAndAgg, TiDBEnableFastAnalyze,
TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming,
TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction,
TiDBCheckMb4ValueInUTF8:
TiDBCheckMb4ValueInUTF8, TiDBLowResolutionTSO:
if strings.EqualFold(value, "ON") || value == "1" || strings.EqualFold(value, "OFF") || value == "0" {
return value, nil
}
Expand Down
11 changes: 11 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,17 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) {
c.Assert(val, Equals, "0")
c.Assert(config.GetGlobalConfig().CheckMb4ValueInUTF8, Equals, false)

SetSessionSystemVar(v, TiDBLowResolutionTSO, types.NewStringDatum("1"))
val, err = GetSessionSystemVar(v, TiDBLowResolutionTSO)
c.Assert(err, IsNil)
c.Assert(val, Equals, "1")
c.Assert(v.LowResolutionTSO, Equals, true)
SetSessionSystemVar(v, TiDBLowResolutionTSO, types.NewStringDatum("0"))
val, err = GetSessionSystemVar(v, TiDBLowResolutionTSO)
c.Assert(err, IsNil)
c.Assert(val, Equals, "0")
c.Assert(v.LowResolutionTSO, Equals, false)

c.Assert(v.CorrelationThreshold, Equals, 0.9)
err = SetSessionSystemVar(v, TiDBOptCorrelationThreshold, types.NewStringDatum("0"))
c.Assert(err, IsNil)
Expand Down
10 changes: 10 additions & 0 deletions store/mockoracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ func (o *MockOracle) GetTimestampAsync(ctx context.Context) oracle.Future {
return &mockOracleFuture{o, ctx}
}

// GetLowResolutionTimestamp implements oracle.Oracle interface.
func (o *MockOracle) GetLowResolutionTimestamp(ctx context.Context) (uint64, error) {
return o.GetTimestamp(ctx)
}

// GetLowResolutionTimestampAsync implements oracle.Oracle interface.
func (o *MockOracle) GetLowResolutionTimestampAsync(ctx context.Context) oracle.Future {
return o.GetTimestampAsync(ctx)
}

// IsExpired implements oracle.Oracle interface.
func (o *MockOracle) IsExpired(lockTimestamp uint64, TTL uint64) bool {
o.RLock()
Expand Down
2 changes: 2 additions & 0 deletions store/tikv/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
type Oracle interface {
GetTimestamp(ctx context.Context) (uint64, error)
GetTimestampAsync(ctx context.Context) Future
GetLowResolutionTimestamp(ctx context.Context) (uint64, error)
GetLowResolutionTimestampAsync(ctx context.Context) Future
IsExpired(lockTimestamp uint64, TTL uint64) bool
UntilExpired(lockTimeStamp uint64, TTL uint64) int64
Close()
Expand Down
8 changes: 8 additions & 0 deletions store/tikv/oracle/oracles/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ func (l *localOracle) GetTimestampAsync(ctx context.Context) oracle.Future {
}
}

func (l *localOracle) GetLowResolutionTimestamp(ctx context.Context) (uint64, error) {
return l.GetTimestamp(ctx)
}

func (l *localOracle) GetLowResolutionTimestampAsync(ctx context.Context) oracle.Future {
return l.GetTimestampAsync(ctx)
}

type future struct {
ctx context.Context
l *localOracle
Expand Down
19 changes: 19 additions & 0 deletions store/tikv/oracle/oracles/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,22 @@ func (o *pdOracle) UntilExpired(lockTS uint64, TTL uint64) int64 {
func (o *pdOracle) Close() {
close(o.quit)
}

// A future that resolves immediately to a low resolution timestamp.
type lowResolutionTsFuture uint64

// Wait implements the oracle.Future interface.
func (f lowResolutionTsFuture) Wait() (uint64, error) {
return uint64(f), nil
}

// GetLowResolutionTimestamp gets a new increasing time.
func (o *pdOracle) GetLowResolutionTimestamp(ctx context.Context) (uint64, error) {
lastTS := atomic.LoadUint64(&o.lastTS)
return lastTS, nil
}

func (o *pdOracle) GetLowResolutionTimestampAsync(ctx context.Context) oracle.Future {
lastTS := atomic.LoadUint64(&o.lastTS)
return lowResolutionTsFuture(lastTS)
}

0 comments on commit e8939b8

Please sign in to comment.