diff --git a/ddl/db_integration_test.go b/ddl/db_integration_test.go index 405ada57f15ec..0c2f851da4413 100644 --- a/ddl/db_integration_test.go +++ b/ddl/db_integration_test.go @@ -41,7 +41,6 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/tikv/mockstore/cluster" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -1343,7 +1342,7 @@ func getMaxTableHandle(ctx *testMaxTableRowIDContext, store kv.Storage) (kv.Hand c := ctx.c d := ctx.d tbl := ctx.tbl - curVer, err := store.CurrentVersion(oracle.GlobalTxnScope) + curVer, err := store.CurrentVersion(kv.GlobalTxnScope) c.Assert(err, IsNil) maxHandle, emptyTable, err := d.GetTableMaxHandle(curVer.Ver, tbl.(table.PhysicalTable)) c.Assert(err, IsNil) diff --git a/ddl/delete_range.go b/ddl/delete_range.go index e64c122d41e4d..1aeb5ab0354da 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" @@ -451,7 +450,7 @@ func doBatchInsert(s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint // getNowTS gets the current timestamp, in TSO. func getNowTSO(ctx sessionctx.Context) (uint64, error) { - currVer, err := ctx.GetStore().CurrentVersion(oracle.GlobalTxnScope) + currVer, err := ctx.GetStore().CurrentVersion(kv.GlobalTxnScope) if err != nil { return 0, errors.Trace(err) } diff --git a/ddl/reorg.go b/ddl/reorg.go index fbe42573dbbf7..37bfe82d4ce3d 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -534,7 +533,7 @@ func getTableRange(d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, prior } func getValidCurrentVersion(store kv.Storage) (ver kv.Version, err error) { - ver, err = store.CurrentVersion(oracle.GlobalTxnScope) + ver, err = store.CurrentVersion(kv.GlobalTxnScope) if err != nil { return ver, errors.Trace(err) } else if ver.Ver <= 0 { diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 69a6da548ec60..aced1a71aaa7b 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" @@ -236,7 +235,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req } builder.txnScope = sv.TxnCtx.TxnScope builder.IsStaleness = sv.TxnCtx.IsStaleness - if builder.IsStaleness && builder.txnScope != oracle.GlobalTxnScope { + if builder.IsStaleness && builder.txnScope != kv.GlobalTxnScope { builder.MatchStoreLabels = []*metapb.StoreLabel{ { Key: placement.DCLabelKey, @@ -279,9 +278,9 @@ func (builder *RequestBuilder) SetFromInfoSchema(is infoschema.InfoSchema) *Requ func (builder *RequestBuilder) verifyTxnScope() error { if builder.txnScope == "" { - builder.txnScope = oracle.GlobalTxnScope + builder.txnScope = kv.GlobalTxnScope } - if builder.txnScope == oracle.GlobalTxnScope || builder.is == nil { + if builder.txnScope == kv.GlobalTxnScope || builder.is == nil { return nil } visitPhysicalTableID := make(map[int64]struct{}) @@ -600,7 +599,7 @@ func CommonHandleRangesToKVRanges(sc *stmtctx.StatementContext, tids []int64, ra // VerifyTxnScope verify whether the txnScope and visited physical table break the leader rule's dcLocation. func VerifyTxnScope(txnScope string, physicalTableID int64, is infoschema.InfoSchema) bool { - if txnScope == "" || txnScope == oracle.GlobalTxnScope { + if txnScope == "" || txnScope == kv.GlobalTxnScope { return true } bundle, ok := is.BundleByName(placement.GroupID(physicalTableID)) diff --git a/domain/domain.go b/domain/domain.go index 44f6df1aa9086..a22d647066ea1 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -48,7 +48,6 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/store/tikv" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/telemetry" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" @@ -336,7 +335,7 @@ func (do *Domain) Reload() error { defer do.m.Unlock() startTime := time.Now() - ver, err := do.store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := do.store.CurrentVersion(kv.GlobalTxnScope) if err != nil { return err } diff --git a/domain/domain_test.go b/domain/domain_test.go index 82a583866aad3..51e0948d30715 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -347,7 +347,7 @@ func (*testSuite) TestT(c *C) { // for schemaValidator schemaVer := dom.SchemaValidator.(*schemaValidator).LatestSchemaVersion() - ver, err := store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := store.CurrentVersion(kv.GlobalTxnScope) c.Assert(err, IsNil) ts := ver.Ver @@ -360,7 +360,7 @@ func (*testSuite) TestT(c *C) { c.Assert(succ, Equals, ResultSucc) time.Sleep(ddlLease) - ver, err = store.CurrentVersion(oracle.GlobalTxnScope) + ver, err = store.CurrentVersion(kv.GlobalTxnScope) c.Assert(err, IsNil) ts = ver.Ver _, succ = dom.SchemaValidator.Check(ts, schemaVer, nil) diff --git a/domain/infosync/info.go b/domain/infosync/info.go index cbd76d4e6f266..be8d80246e96b 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -554,7 +554,7 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) { pl := is.manager.ShowProcessList() // Calculate the lower limit of the start timestamp to avoid extremely old transaction delaying GC. - currentVer, err := store.CurrentVersion(oracle.GlobalTxnScope) + currentVer, err := store.CurrentVersion(kv.GlobalTxnScope) if err != nil { logutil.BgLogger().Error("update minStartTS failed", zap.Error(err)) return diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 2137884c69745..726603a0ff88f 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -30,7 +30,6 @@ import ( driver "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -122,7 +121,7 @@ func (e *BatchPointGetExec) Open(context.Context) error { snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness) - if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope { + if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope { snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{ { Key: placement.DCLabelKey, diff --git a/executor/executor_test.go b/executor/executor_test.go index 7fa0d7b0d10bd..dde9511a8410d 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2693,11 +2693,11 @@ func (s *testSuiteP2) TestHistoryRead(c *C) { // SnapshotTS Is not updated if check failed. c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0)) - curVer1, _ := s.store.CurrentVersion(oracle.GlobalTxnScope) + curVer1, _ := s.store.CurrentVersion(kv.GlobalTxnScope) time.Sleep(time.Millisecond) snapshotTime := time.Now() time.Sleep(time.Millisecond) - curVer2, _ := s.store.CurrentVersion(oracle.GlobalTxnScope) + curVer2, _ := s.store.CurrentVersion(kv.GlobalTxnScope) tk.MustExec("insert history_read values (2)") tk.MustQuery("select * from history_read").Check(testkit.Rows("1", "2")) tk.MustExec("set @@tidb_snapshot = '" + snapshotTime.Format("2006-01-02 15:04:05.999999") + "'") @@ -8153,3 +8153,79 @@ func (s *testSerialSuite) TestIssue24210(c *C) { c.Assert(err, IsNil) } + +func (s testSerialSuite) TestExprBlackListForEnum(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(a enum('a','b','c'), b enum('a','b','c'), c int, index idx(b,a));") + tk.MustExec("insert into t values(1,1,1),(2,2,2),(3,3,3);") + + checkFuncPushDown := func(rows [][]interface{}, keyWord string) bool { + for _, line := range rows { + // Agg/Expr push down + if line[2].(string) == "cop[tikv]" && strings.Contains(line[4].(string), keyWord) { + return true + } + // access index + if line[2].(string) == "cop[tikv]" && strings.Contains(line[3].(string), keyWord) { + return true + } + } + return false + } + + // Test agg(enum) push down + tk.MustExec("insert into mysql.expr_pushdown_blacklist(name) values('enum');") + tk.MustExec("admin reload expr_pushdown_blacklist;") + rows := tk.MustQuery("desc format='brief' select /*+ HASH_AGG() */ max(a) from t;").Rows() + c.Assert(checkFuncPushDown(rows, "max"), IsFalse) + rows = tk.MustQuery("desc format='brief' select /*+ STREAM_AGG() */ max(a) from t;").Rows() + c.Assert(checkFuncPushDown(rows, "max"), IsFalse) + + tk.MustExec("delete from mysql.expr_pushdown_blacklist;") + tk.MustExec("admin reload expr_pushdown_blacklist;") + rows = tk.MustQuery("desc format='brief' select /*+ HASH_AGG() */ max(a) from t;").Rows() + c.Assert(checkFuncPushDown(rows, "max"), IsTrue) + rows = tk.MustQuery("desc format='brief' select /*+ STREAM_AGG() */ max(a) from t;").Rows() + c.Assert(checkFuncPushDown(rows, "max"), IsTrue) + + // Test expr(enum) push down + tk.MustExec("insert into mysql.expr_pushdown_blacklist(name) values('enum');") + tk.MustExec("admin reload expr_pushdown_blacklist;") + rows = tk.MustQuery("desc format='brief' select * from t where a + b;").Rows() + c.Assert(checkFuncPushDown(rows, "plus"), IsFalse) + rows = tk.MustQuery("desc format='brief' select * from t where a + b;").Rows() + c.Assert(checkFuncPushDown(rows, "plus"), IsFalse) + + tk.MustExec("delete from mysql.expr_pushdown_blacklist;") + tk.MustExec("admin reload expr_pushdown_blacklist;") + rows = tk.MustQuery("desc format='brief' select * from t where a + b;").Rows() + c.Assert(checkFuncPushDown(rows, "plus"), IsTrue) + rows = tk.MustQuery("desc format='brief' select * from t where a + b;").Rows() + c.Assert(checkFuncPushDown(rows, "plus"), IsTrue) + + // Test enum index + tk.MustExec("insert into mysql.expr_pushdown_blacklist(name) values('enum');") + tk.MustExec("admin reload expr_pushdown_blacklist;") + rows = tk.MustQuery("desc format='brief' select * from t where b = 1;").Rows() + c.Assert(checkFuncPushDown(rows, "index:idx(b)"), IsFalse) + rows = tk.MustQuery("desc format='brief' select * from t where b = 'a';").Rows() + c.Assert(checkFuncPushDown(rows, "index:idx(b)"), IsFalse) + rows = tk.MustQuery("desc format='brief' select * from t where b > 1;").Rows() + c.Assert(checkFuncPushDown(rows, "index:idx(b)"), IsFalse) + rows = tk.MustQuery("desc format='brief' select * from t where b > 'a';").Rows() + c.Assert(checkFuncPushDown(rows, "index:idx(b)"), IsFalse) + + tk.MustExec("delete from mysql.expr_pushdown_blacklist;") + tk.MustExec("admin reload expr_pushdown_blacklist;") + rows = tk.MustQuery("desc format='brief' select * from t where b = 1 and a = 1;").Rows() + c.Assert(checkFuncPushDown(rows, "index:idx(b, a)"), IsTrue) + rows = tk.MustQuery("desc format='brief' select * from t where b = 'a' and a = 'a';").Rows() + c.Assert(checkFuncPushDown(rows, "index:idx(b, a)"), IsTrue) + rows = tk.MustQuery("desc format='brief' select * from t where b = 1 and a > 1;").Rows() + c.Assert(checkFuncPushDown(rows, "index:idx(b, a)"), IsTrue) + rows = tk.MustQuery("desc format='brief' select * from t where b = 1 and a > 'a'").Rows() + c.Assert(checkFuncPushDown(rows, "index:idx(b, a)"), IsTrue) +} diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 529a1da343387..cc0e4074b39a0 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -436,12 +436,126 @@ func (s *partitionTableSuite) TestView(c *C) { } } +func (s *partitionTableSuite) TestDirectReadingwithIndexJoin(c *C) { + if israce.RaceEnabled { + c.Skip("exhaustive types test, skip race test") + } + + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create database test_dr_join") + tk.MustExec("use test_dr_join") + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") + + // hash and range partition + tk.MustExec("create table thash (a int, b int, c int, primary key(a), index idx_b(b)) partition by hash(a) partitions 4;") + tk.MustExec(`create table trange (a int, b int, c int, primary key(a), index idx_b(b)) partition by range(a) ( +   partition p0 values less than(1000), +   partition p1 values less than(2000), +   partition p2 values less than(3000), +   partition p3 values less than(4000));`) + + // regualr table + tk.MustExec(`create table tnormal (a int, b int, c int, primary key(a), index idx_b(b));`) + tk.MustExec(`create table touter (a int, b int, c int);`) + + // generate some random data to be inserted + vals := make([]string, 0, 2000) + for i := 0; i < 2000; i++ { + vals = append(vals, fmt.Sprintf("(%v, %v, %v)", rand.Intn(4000), rand.Intn(4000), rand.Intn(4000))) + } + tk.MustExec("insert ignore into trange values " + strings.Join(vals, ",")) + tk.MustExec("insert ignore into thash values " + strings.Join(vals, ",")) + tk.MustExec("insert ignore into tnormal values " + strings.Join(vals, ",")) + tk.MustExec("insert ignore into touter values " + strings.Join(vals, ",")) + + // test indexLookUp + hash + queryPartition := fmt.Sprintf("select /*+ INL_JOIN(touter, thash) */ * from touter join thash use index(idx_b) on touter.b = thash.b") + queryRegular := fmt.Sprintf("select /*+ INL_JOIN(touter, tnormal) */ * from touter join tnormal use index(idx_b) on touter.b = tnormal.b") + tk.MustQuery("explain format = 'brief' " + queryPartition).Check(testkit.Rows( + "IndexJoin 12487.50 root inner join, inner:IndexLookUp, outer key:test_dr_join.touter.b, inner key:test_dr_join.thash.b, equal cond:eq(test_dr_join.touter.b, test_dr_join.thash.b)", + "├─TableReader(Build) 9990.00 root data:Selection", + "│ └─Selection 9990.00 cop[tikv] not(isnull(test_dr_join.touter.b))", + "│ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo", + "└─IndexLookUp(Probe) 1.25 root partition:all ", + " ├─Selection(Build) 1.25 cop[tikv] not(isnull(test_dr_join.thash.b))", + " │ └─IndexRangeScan 1.25 cop[tikv] table:thash, index:idx_b(b) range: decided by [eq(test_dr_join.thash.b, test_dr_join.touter.b)], keep order:false, stats:pseudo", + " └─TableRowIDScan(Probe) 1.25 cop[tikv] table:thash keep order:false, stats:pseudo")) // check if IndexLookUp is used + tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) + + // test tableReader + hash + queryPartition = fmt.Sprintf("select /*+ INL_JOIN(touter, thash) */ * from touter join thash on touter.a = thash.a") + queryRegular = fmt.Sprintf("select /*+ INL_JOIN(touter, tnormal) */ * from touter join tnormal on touter.a = tnormal.a") + tk.MustQuery("explain format = 'brief' " + queryPartition).Check(testkit.Rows( + "IndexJoin 12487.50 root inner join, inner:TableReader, outer key:test_dr_join.touter.a, inner key:test_dr_join.thash.a, equal cond:eq(test_dr_join.touter.a, test_dr_join.thash.a)", + "├─TableReader(Build) 9990.00 root data:Selection", + "│ └─Selection 9990.00 cop[tikv] not(isnull(test_dr_join.touter.a))", + "│ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo", + "└─TableReader(Probe) 1.00 root partition:all data:TableRangeScan", + " └─TableRangeScan 1.00 cop[tikv] table:thash range: decided by [test_dr_join.touter.a], keep order:false, stats:pseudo")) // check if tableReader is used + tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) + + // test indexReader + hash + queryPartition = fmt.Sprintf("select /*+ INL_JOIN(touter, thash) */ thash.b from touter join thash use index(idx_b) on touter.b = thash.b;") + queryRegular = fmt.Sprintf("select /*+ INL_JOIN(touter, tnormal) */ tnormal.b from touter join tnormal use index(idx_b) on touter.b = tnormal.b;") + tk.MustQuery("explain format = 'brief' " + queryPartition).Check(testkit.Rows( + "IndexJoin 12487.50 root inner join, inner:IndexReader, outer key:test_dr_join.touter.b, inner key:test_dr_join.thash.b, equal cond:eq(test_dr_join.touter.b, test_dr_join.thash.b)", + "├─TableReader(Build) 9990.00 root data:Selection", + "│ └─Selection 9990.00 cop[tikv] not(isnull(test_dr_join.touter.b))", + "│ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo", + "└─IndexReader(Probe) 1.25 root partition:all index:Selection", + " └─Selection 1.25 cop[tikv] not(isnull(test_dr_join.thash.b))", + " └─IndexRangeScan 1.25 cop[tikv] table:thash, index:idx_b(b) range: decided by [eq(test_dr_join.thash.b, test_dr_join.touter.b)], keep order:false, stats:pseudo")) // check if indexReader is used + tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) + + // test indexLookUp + range + // explain select /*+ INL_JOIN(touter, tinner) */ * from touter join tinner use index(a) on touter.a = tinner.a; + queryPartition = fmt.Sprintf("select /*+ INL_JOIN(touter, trange) */ * from touter join trange use index(idx_b) on touter.b = trange.b;") + queryRegular = fmt.Sprintf("select /*+ INL_JOIN(touter, tnormal) */ * from touter join tnormal use index(idx_b) on touter.b = tnormal.b;") + tk.MustQuery("explain format = 'brief' " + queryPartition).Check(testkit.Rows( + "IndexJoin 12487.50 root inner join, inner:IndexLookUp, outer key:test_dr_join.touter.b, inner key:test_dr_join.trange.b, equal cond:eq(test_dr_join.touter.b, test_dr_join.trange.b)", + "├─TableReader(Build) 9990.00 root data:Selection", + "│ └─Selection 9990.00 cop[tikv] not(isnull(test_dr_join.touter.b))", + "│ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo", + "└─IndexLookUp(Probe) 1.25 root partition:all ", + " ├─Selection(Build) 1.25 cop[tikv] not(isnull(test_dr_join.trange.b))", + " │ └─IndexRangeScan 1.25 cop[tikv] table:trange, index:idx_b(b) range: decided by [eq(test_dr_join.trange.b, test_dr_join.touter.b)], keep order:false, stats:pseudo", + " └─TableRowIDScan(Probe) 1.25 cop[tikv] table:trange keep order:false, stats:pseudo")) // check if IndexLookUp is used + tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) + + // test tableReader + range + queryPartition = fmt.Sprintf("select /*+ INL_JOIN(touter, trange) */ * from touter join trange on touter.a = trange.a;") + queryRegular = fmt.Sprintf("select /*+ INL_JOIN(touter, tnormal) */ * from touter join tnormal on touter.a = tnormal.a;") + tk.MustQuery("explain format = 'brief' " + queryPartition).Check(testkit.Rows( + "IndexJoin 12487.50 root inner join, inner:TableReader, outer key:test_dr_join.touter.a, inner key:test_dr_join.trange.a, equal cond:eq(test_dr_join.touter.a, test_dr_join.trange.a)", + "├─TableReader(Build) 9990.00 root data:Selection", + "│ └─Selection 9990.00 cop[tikv] not(isnull(test_dr_join.touter.a))", + "│ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo", + "└─TableReader(Probe) 1.00 root partition:all data:TableRangeScan", + " └─TableRangeScan 1.00 cop[tikv] table:trange range: decided by [test_dr_join.touter.a], keep order:false, stats:pseudo")) // check if tableReader is used + tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) + + // test indexReader + range + // explain select /*+ INL_JOIN(touter, tinner) */ tinner.a from touter join tinner on touter.a = tinner.a; + queryPartition = fmt.Sprintf("select /*+ INL_JOIN(touter, trange) */ trange.b from touter join trange use index(idx_b) on touter.b = trange.b;") + queryRegular = fmt.Sprintf("select /*+ INL_JOIN(touter, tnormal) */ tnormal.b from touter join tnormal use index(idx_b) on touter.b = tnormal.b;") + tk.MustQuery("explain format = 'brief' " + queryPartition).Check(testkit.Rows( + "IndexJoin 12487.50 root inner join, inner:IndexReader, outer key:test_dr_join.touter.b, inner key:test_dr_join.trange.b, equal cond:eq(test_dr_join.touter.b, test_dr_join.trange.b)", + "├─TableReader(Build) 9990.00 root data:Selection", + "│ └─Selection 9990.00 cop[tikv] not(isnull(test_dr_join.touter.b))", + "│ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo", + "└─IndexReader(Probe) 1.25 root partition:all index:Selection", + " └─Selection 1.25 cop[tikv] not(isnull(test_dr_join.trange.b))", + " └─IndexRangeScan 1.25 cop[tikv] table:trange, index:idx_b(b) range: decided by [eq(test_dr_join.trange.b, test_dr_join.touter.b)], keep order:false, stats:pseudo")) // check if indexReader is used + tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) +} + func (s *partitionTableSuite) TestDynamicPruningUnderIndexJoin(c *C) { if israce.RaceEnabled { c.Skip("exhaustive types test, skip race test") } tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create database pruing_under_index_join") tk.MustExec("use pruing_under_index_join") tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") @@ -1452,3 +1566,80 @@ func (s *globalIndexSuite) TestIssue21731(c *C) { tk.MustExec("drop table if exists p, t") tk.MustExec("create table t (a int, b int, unique index idx(a)) partition by list columns(b) (partition p0 values in (1), partition p1 values in (2));") } + +func (s *testSuiteWithData) TestRangePartitionBoundariesEq(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("SET @@tidb_partition_prune_mode = 'dynamic'") + tk.MustExec("CREATE DATABASE TestRangePartitionBoundaries") + defer tk.MustExec("DROP DATABASE TestRangePartitionBoundaries") + tk.MustExec("USE TestRangePartitionBoundaries") + tk.MustExec("DROP TABLE IF EXISTS t") + tk.MustExec(`CREATE TABLE t +(a INT, b varchar(255)) +PARTITION BY RANGE (a) ( + PARTITION p0 VALUES LESS THAN (1000000), + PARTITION p1 VALUES LESS THAN (2000000), + PARTITION p2 VALUES LESS THAN (3000000)); +`) + + var input []string + var output []testOutput + s.testData.GetTestCases(c, &input, &output) + s.verifyPartitionResult(tk, input, output) +} + +type testOutput struct { + SQL string + Plan []string + Res []string +} + +func (s *testSuiteWithData) TestRangePartitionBoundariesNe(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("SET @@tidb_partition_prune_mode = 'dynamic'") + tk.MustExec("CREATE DATABASE TestRangePartitionBoundariesNe") + defer tk.MustExec("DROP DATABASE TestRangePartitionBoundariesNe") + tk.MustExec("USE TestRangePartitionBoundariesNe") + tk.MustExec("DROP TABLE IF EXISTS t") + tk.MustExec(`CREATE TABLE t +(a INT, b varchar(255)) +PARTITION BY RANGE (a) ( + PARTITION p0 VALUES LESS THAN (1), + PARTITION p1 VALUES LESS THAN (2), + PARTITION p2 VALUES LESS THAN (3), + PARTITION p3 VALUES LESS THAN (4), + PARTITION p4 VALUES LESS THAN (5), + PARTITION p5 VALUES LESS THAN (6), + PARTITION p6 VALUES LESS THAN (7))`) + + var input []string + var output []testOutput + s.testData.GetTestCases(c, &input, &output) + s.verifyPartitionResult(tk, input, output) +} + +func (s *testSuiteWithData) verifyPartitionResult(tk *testkit.TestKit, input []string, output []testOutput) { + for i, tt := range input { + var isSelect bool = false + if strings.HasPrefix(strings.ToLower(tt), "select ") { + isSelect = true + } + s.testData.OnRecord(func() { + output[i].SQL = tt + if isSelect { + output[i].Plan = s.testData.ConvertRowsToStrings(tk.UsedPartitions(tt).Rows()) + output[i].Res = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Sort().Rows()) + } else { + // to avoid double execution of INSERT (and INSERT does not return anything) + output[i].Res = nil + output[i].Plan = nil + } + }) + if isSelect { + tk.UsedPartitions(tt).Check(testkit.Rows(output[i].Plan...)) + } + tk.MayQuery(tt).Sort().Check(testkit.Rows(output[i].Res...)) + } +} diff --git a/executor/point_get.go b/executor/point_get.go index fc8326555bf01..dccb72bdebb17 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -32,7 +32,6 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -153,7 +152,7 @@ func (e *PointGetExecutor) Open(context.Context) error { e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness e.snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness) - if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope { + if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope { e.snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{ { Key: placement.DCLabelKey, @@ -392,7 +391,7 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error) func (e *PointGetExecutor) verifyTxnScope() error { txnScope := e.txn.GetOption(kv.TxnScope).(string) - if txnScope == "" || txnScope == oracle.GlobalTxnScope { + if txnScope == "" || txnScope == kv.GlobalTxnScope { return nil } var tblID int64 diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 7cf235bd3c0f7..64b334b15bf94 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -20,6 +20,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/placement" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/testkit" ) @@ -76,7 +77,7 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) { preSQL: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`, sql: "begin", IsStaleness: false, - txnScope: oracle.GlobalTxnScope, + txnScope: kv.GlobalTxnScope, zone: "", }, } diff --git a/executor/testdata/executor_suite_in.json b/executor/testdata/executor_suite_in.json index 6abd20c740a80..fff3187717f0a 100644 --- a/executor/testdata/executor_suite_in.json +++ b/executor/testdata/executor_suite_in.json @@ -51,5 +51,96 @@ "select count(*) from t as t1 left join t as t2 on t1.c1 = t2.c1 where t1.c1 != NULL", "select * from t as t1 left join t as t2 on t1.c1 = t2.c1 where t1.c1 is not NULL" ] + }, + { + "name": "TestRangePartitionBoundariesEq", + "cases": [ + "INSERT INTO t VALUES (999998, '999998 Filler ...'), (999999, '999999 Filler ...'), (1000000, '1000000 Filler ...'), (1000001, '1000001 Filler ...'), (1000002, '1000002 Filler ...')", + "INSERT INTO t VALUES (1999998, '1999998 Filler ...'), (1999999, '1999999 Filler ...'), (2000000, '2000000 Filler ...'), (2000001, '2000001 Filler ...'), (2000002, '2000002 Filler ...')", + "INSERT INTO t VALUES (2999998, '2999998 Filler ...'), (2999999, '2999999 Filler ...')", + "INSERT INTO t VALUES (-2147483648, 'MIN_INT filler...'), (0, '0 Filler...')", + "ANALYZE TABLE t", + "SELECT * FROM t WHERE a = -2147483648", + "SELECT * FROM t WHERE a IN (-2147483648)", + "SELECT * FROM t WHERE a = 0", + "SELECT * FROM t WHERE a IN (0)", + "SELECT * FROM t WHERE a = 999998", + "SELECT * FROM t WHERE a IN (999998)", + "SELECT * FROM t WHERE a = 999999", + "SELECT * FROM t WHERE a IN (999999)", + "SELECT * FROM t WHERE a = 1000000", + "SELECT * FROM t WHERE a IN (1000000)", + "SELECT * FROM t WHERE a = 1000001", + "SELECT * FROM t WHERE a IN (1000001)", + "SELECT * FROM t WHERE a = 1000002", + "SELECT * FROM t WHERE a IN (1000002)", + "SELECT * FROM t WHERE a = 3000000", + "SELECT * FROM t WHERE a IN (3000000)", + "SELECT * FROM t WHERE a = 3000001", + "SELECT * FROM t WHERE a IN (3000001)", + "SELECT * FROM t WHERE a IN (-2147483648, -2147483647)", + "SELECT * FROM t WHERE a IN (-2147483647, -2147483646)", + "SELECT * FROM t WHERE a IN (999997, 999998, 999999)", + "SELECT * FROM t WHERE a IN (999998, 999999, 1000000)", + "SELECT * FROM t WHERE a IN (999999, 1000000, 1000001)", + "SELECT * FROM t WHERE a IN (1000000, 1000001, 1000002)", + "SELECT * FROM t WHERE a IN (1999997, 1999998, 1999999)", + "SELECT * FROM t WHERE a IN (1999998, 1999999, 2000000)", + "SELECT * FROM t WHERE a IN (1999999, 2000000, 2000001)", + "SELECT * FROM t WHERE a IN (2000000, 2000001, 2000002)", + "SELECT * FROM t WHERE a IN (2999997, 2999998, 2999999)", + "SELECT * FROM t WHERE a IN (2999998, 2999999, 3000000)", + "SELECT * FROM t WHERE a IN (2999999, 3000000, 3000001)", + "SELECT * FROM t WHERE a IN (3000000, 3000001, 3000002)" + ] + }, + { + "name": "TestRangePartitionBoundariesNe", + "cases": [ + "INSERT INTO t VALUES (0, '0 Filler...')", + "INSERT INTO t VALUES (1, '1 Filler...')", + "INSERT INTO t VALUES (2, '2 Filler...')", + "INSERT INTO t VALUES (3, '3 Filler...')", + "INSERT INTO t VALUES (4, '4 Filler...')", + "INSERT INTO t VALUES (5, '5 Filler...')", + "INSERT INTO t VALUES (6, '6 Filler...')", + "ANALYZE TABLE t", + "SELECT * FROM t WHERE a != -1", + "SELECT * FROM t WHERE 1 = 1 AND a != -1", + "SELECT * FROM t WHERE a NOT IN (-2, -1)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1", + "SELECT * FROM t WHERE a != 0", + "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0", + "SELECT * FROM t WHERE a NOT IN (-2, -1, 0)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0", + "SELECT * FROM t WHERE a != 1", + "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1", + "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1", + "SELECT * FROM t WHERE a != 2", + "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2", + "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2", + "SELECT * FROM t WHERE a != 3", + "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3", + "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3", + "SELECT * FROM t WHERE a != 4", + "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3 AND a != 4", + "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3, 4)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3 OR a = 4", + "SELECT * FROM t WHERE a != 5", + "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3 AND a != 4 AND a != 5", + "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3, 4, 5)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3 OR a = 4 OR a = 5", + "SELECT * FROM t WHERE a != 6", + "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3 AND a != 4 AND a != 5 AND a != 6", + "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3, 4, 5, 6)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3 OR a = 4 OR a = 5 OR a = 6", + "SELECT * FROM t WHERE a != 7", + "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3 AND a != 4 AND a != 5 AND a != 6 AND a != 7", + "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3, 4, 5, 6, 7)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3 OR a = 4 OR a = 5 OR a = 6 OR a = 7" + ] } ] diff --git a/executor/testdata/executor_suite_out.json b/executor/testdata/executor_suite_out.json index 2be3c8ea4894f..caa5c4f948966 100644 --- a/executor/testdata/executor_suite_out.json +++ b/executor/testdata/executor_suite_out.json @@ -598,5 +598,802 @@ ] } ] + }, + { + "Name": "TestRangePartitionBoundariesEq", + "Cases": [ + { + "SQL": "INSERT INTO t VALUES (999998, '999998 Filler ...'), (999999, '999999 Filler ...'), (1000000, '1000000 Filler ...'), (1000001, '1000001 Filler ...'), (1000002, '1000002 Filler ...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (1999998, '1999998 Filler ...'), (1999999, '1999999 Filler ...'), (2000000, '2000000 Filler ...'), (2000001, '2000001 Filler ...'), (2000002, '2000002 Filler ...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (2999998, '2999998 Filler ...'), (2999999, '2999999 Filler ...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (-2147483648, 'MIN_INT filler...'), (0, '0 Filler...')", + "Plan": null, + "Res": null + }, + { + "SQL": "ANALYZE TABLE t", + "Plan": null, + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a = -2147483648", + "Plan": [ + "p0" + ], + "Res": [ + "-2147483648 MIN_INT filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (-2147483648)", + "Plan": [ + "p0" + ], + "Res": [ + "-2147483648 MIN_INT filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a = 0", + "Plan": [ + "p0" + ], + "Res": [ + "0 0 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (0)", + "Plan": [ + "p0" + ], + "Res": [ + "0 0 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a = 999998", + "Plan": [ + "p0" + ], + "Res": [ + "999998 999998 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (999998)", + "Plan": [ + "p0" + ], + "Res": [ + "999998 999998 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a = 999999", + "Plan": [ + "p0" + ], + "Res": [ + "999999 999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (999999)", + "Plan": [ + "p0" + ], + "Res": [ + "999999 999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a = 1000000", + "Plan": [ + "p1" + ], + "Res": [ + "1000000 1000000 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (1000000)", + "Plan": [ + "p1" + ], + "Res": [ + "1000000 1000000 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a = 1000001", + "Plan": [ + "p1" + ], + "Res": [ + "1000001 1000001 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (1000001)", + "Plan": [ + "p1" + ], + "Res": [ + "1000001 1000001 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a = 1000002", + "Plan": [ + "p1" + ], + "Res": [ + "1000002 1000002 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (1000002)", + "Plan": [ + "p1" + ], + "Res": [ + "1000002 1000002 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a = 3000000", + "Plan": [ + "dual" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a IN (3000000)", + "Plan": [ + "dual" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a = 3000001", + "Plan": [ + "dual" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a IN (3000001)", + "Plan": [ + "dual" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a IN (-2147483648, -2147483647)", + "Plan": [ + "p0" + ], + "Res": [ + "-2147483648 MIN_INT filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (-2147483647, -2147483646)", + "Plan": [ + "p0" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a IN (999997, 999998, 999999)", + "Plan": [ + "p0" + ], + "Res": [ + "999998 999998 Filler ...", + "999999 999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (999998, 999999, 1000000)", + "Plan": [ + "p0 p1" + ], + "Res": [ + "1000000 1000000 Filler ...", + "999998 999998 Filler ...", + "999999 999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (999999, 1000000, 1000001)", + "Plan": [ + "p0 p1" + ], + "Res": [ + "1000000 1000000 Filler ...", + "1000001 1000001 Filler ...", + "999999 999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (1000000, 1000001, 1000002)", + "Plan": [ + "p1" + ], + "Res": [ + "1000000 1000000 Filler ...", + "1000001 1000001 Filler ...", + "1000002 1000002 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (1999997, 1999998, 1999999)", + "Plan": [ + "p1" + ], + "Res": [ + "1999998 1999998 Filler ...", + "1999999 1999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (1999998, 1999999, 2000000)", + "Plan": [ + "p1 p2" + ], + "Res": [ + "1999998 1999998 Filler ...", + "1999999 1999999 Filler ...", + "2000000 2000000 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (1999999, 2000000, 2000001)", + "Plan": [ + "p1 p2" + ], + "Res": [ + "1999999 1999999 Filler ...", + "2000000 2000000 Filler ...", + "2000001 2000001 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (2000000, 2000001, 2000002)", + "Plan": [ + "p2" + ], + "Res": [ + "2000000 2000000 Filler ...", + "2000001 2000001 Filler ...", + "2000002 2000002 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (2999997, 2999998, 2999999)", + "Plan": [ + "p2" + ], + "Res": [ + "2999998 2999998 Filler ...", + "2999999 2999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (2999998, 2999999, 3000000)", + "Plan": [ + "p2" + ], + "Res": [ + "2999998 2999998 Filler ...", + "2999999 2999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (2999999, 3000000, 3000001)", + "Plan": [ + "p2" + ], + "Res": [ + "2999999 2999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (3000000, 3000001, 3000002)", + "Plan": [ + "dual" + ], + "Res": null + } + ] + }, + { + "Name": "TestRangePartitionBoundariesNe", + "Cases": [ + { + "SQL": "INSERT INTO t VALUES (0, '0 Filler...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (1, '1 Filler...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (2, '2 Filler...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (3, '3 Filler...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (4, '4 Filler...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (5, '5 Filler...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (6, '6 Filler...')", + "Plan": null, + "Res": null + }, + { + "SQL": "ANALYZE TABLE t", + "Plan": null, + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a != -1", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1)", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1", + "Plan": [ + "p0" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a != 0", + "Plan": [ + "all" + ], + "Res": [ + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0", + "Plan": [ + "all" + ], + "Res": [ + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1, 0)", + "Plan": [ + "all" + ], + "Res": [ + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0", + "Plan": [ + "p0" + ], + "Res": [ + "0 0 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a != 1", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1", + "Plan": [ + "all" + ], + "Res": [ + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1)", + "Plan": [ + "all" + ], + "Res": [ + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1", + "Plan": [ + "p0 p1" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a != 2", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2", + "Plan": [ + "all" + ], + "Res": [ + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2)", + "Plan": [ + "all" + ], + "Res": [ + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2", + "Plan": [ + "p0 p1 p2" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a != 3", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3", + "Plan": [ + "all" + ], + "Res": [ + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3)", + "Plan": [ + "all" + ], + "Res": [ + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3", + "Plan": [ + "p0 p1 p2 p3" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a != 4", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3 AND a != 4", + "Plan": [ + "all" + ], + "Res": [ + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3, 4)", + "Plan": [ + "all" + ], + "Res": [ + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3 OR a = 4", + "Plan": [ + "p0 p1 p2 p3 p4" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a != 5", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3 AND a != 4 AND a != 5", + "Plan": [ + "all" + ], + "Res": [ + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3, 4, 5)", + "Plan": [ + "all" + ], + "Res": [ + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3 OR a = 4 OR a = 5", + "Plan": [ + "p0 p1 p2 p3 p4 p5" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a != 6", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3 AND a != 4 AND a != 5 AND a != 6", + "Plan": [ + "all" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3, 4, 5, 6)", + "Plan": [ + "all" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3 OR a = 4 OR a = 5 OR a = 6", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a != 7", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3 AND a != 4 AND a != 5 AND a != 6 AND a != 7", + "Plan": [ + "all" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3, 4, 5, 6, 7)", + "Plan": [ + "all" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3 OR a = 4 OR a = 5 OR a = 6 OR a = 7", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + } + ] } ] diff --git a/expression/expr_to_pb.go b/expression/expr_to_pb.go index 59ff01b73b67c..dc031145d0d95 100644 --- a/expression/expr_to_pb.go +++ b/expression/expr_to_pb.go @@ -211,6 +211,10 @@ func (pc PbConverter) columnToPBExpr(column *Column) *tipb.Expr { switch column.GetType().Tp { case mysql.TypeBit, mysql.TypeSet, mysql.TypeGeometry, mysql.TypeUnspecified: return nil + case mysql.TypeEnum: + if !IsPushDownEnabled("enum", kv.UnSpecified) { + return nil + } } if pc.client.IsRequestTypeSupported(kv.ReqTypeDAG, kv.ReqSubTypeBasic) { diff --git a/kv/fault_injection.go b/kv/fault_injection.go index 218ca9cbd6966..d61685a7f8a71 100644 --- a/kv/fault_injection.go +++ b/kv/fault_injection.go @@ -16,6 +16,8 @@ package kv import ( "context" "sync" + + "github.com/pingcap/tidb/store/tikv" ) // InjectionConfig is used for fault injections for KV components. @@ -64,7 +66,7 @@ func (s *InjectedStore) Begin() (Transaction, error) { } // BeginWithOption creates an injected Transaction with given option. -func (s *InjectedStore) BeginWithOption(option TransactionOption) (Transaction, error) { +func (s *InjectedStore) BeginWithOption(option tikv.StartTSOption) (Transaction, error) { txn, err := s.Storage.BeginWithOption(option) return &InjectedTransaction{ Transaction: txn, diff --git a/kv/fault_injection_test.go b/kv/fault_injection_test.go index 33b6535214b2c..c5e203151fe63 100644 --- a/kv/fault_injection_test.go +++ b/kv/fault_injection_test.go @@ -19,7 +19,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/parser/terror" - "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/store/tikv" ) type testFaultInjectionSuite struct{} @@ -35,7 +35,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) { storage := NewInjectedStore(newMockStorage(), &cfg) txn, err := storage.Begin() c.Assert(err, IsNil) - _, err = storage.BeginWithOption(TransactionOption{}.SetTxnScope(oracle.GlobalTxnScope).SetStartTs(0)) + _, err = storage.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(GlobalTxnScope).SetStartTs(0)) c.Assert(err, IsNil) ver := Version{Ver: 1} snap := storage.GetSnapshot(ver) diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index 5d85261bc2111..9e41832678294 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -17,6 +17,7 @@ import ( "context" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" ) @@ -154,7 +155,7 @@ func (s *mockStorage) Begin() (Transaction, error) { return newMockTxn(), nil } -func (s *mockStorage) BeginWithOption(option TransactionOption) (Transaction, error) { +func (s *mockStorage) BeginWithOption(option tikv.StartTSOption) (Transaction, error) { return newMockTxn(), nil } diff --git a/kv/kv.go b/kv/kv.go index 20b0fc84b7144..471dfe09a110b 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/memory" @@ -339,59 +340,13 @@ type Driver interface { Open(path string) (Storage, error) } -// TransactionOption indicates the option when beginning a transaction -// `TxnScope` must be set for each object -// Every other fields are optional, but currently at most one of them can be set -type TransactionOption struct { - TxnScope string - StartTS *uint64 - PrevSec *uint64 - MinStartTS *uint64 - MaxPrevSec *uint64 -} - -// DefaultTransactionOption creates a default TransactionOption, ie. Work in GlobalTxnScope and get start ts when got used -func DefaultTransactionOption() TransactionOption { - return TransactionOption{TxnScope: oracle.GlobalTxnScope} -} - -// SetMaxPrevSec set maxPrevSec -func (to TransactionOption) SetMaxPrevSec(maxPrevSec uint64) TransactionOption { - to.MaxPrevSec = &maxPrevSec - return to -} - -// SetMinStartTS set minStartTS -func (to TransactionOption) SetMinStartTS(minStartTS uint64) TransactionOption { - to.MinStartTS = &minStartTS - return to -} - -// SetStartTs set startTS -func (to TransactionOption) SetStartTs(startTS uint64) TransactionOption { - to.StartTS = &startTS - return to -} - -// SetPrevSec set prevSec -func (to TransactionOption) SetPrevSec(prevSec uint64) TransactionOption { - to.PrevSec = &prevSec - return to -} - -// SetTxnScope set txnScope -func (to TransactionOption) SetTxnScope(txnScope string) TransactionOption { - to.TxnScope = txnScope - return to -} - // Storage defines the interface for storage. // Isolation should be at least SI(SNAPSHOT ISOLATION) type Storage interface { // Begin a global transaction Begin() (Transaction, error) // Begin a transaction with given option - BeginWithOption(option TransactionOption) (Transaction, error) + BeginWithOption(option tikv.StartTSOption) (Transaction, error) // GetSnapshot gets a snapshot that is able to read any data which data is <= ver. // if ver is MaxVersion or > current max committed version, we will use current version for this snapshot. GetSnapshot(ver Version) Snapshot diff --git a/kv/mock_test.go b/kv/mock_test.go index eba059e763f82..e09c291d5de95 100644 --- a/kv/mock_test.go +++ b/kv/mock_test.go @@ -17,7 +17,6 @@ import ( "context" . "github.com/pingcap/check" - "github.com/pingcap/tidb/store/tikv/oracle" ) var _ = Suite(testMockSuite{}) @@ -29,7 +28,7 @@ func (s testMockSuite) TestInterface(c *C) { storage := newMockStorage() storage.GetClient() storage.UUID() - version, err := storage.CurrentVersion(oracle.GlobalTxnScope) + version, err := storage.CurrentVersion(GlobalTxnScope) c.Check(err, IsNil) snapshot := storage.GetSnapshot(version) _, err = snapshot.BatchGet(context.Background(), []Key{Key("abc"), Key("def")}) diff --git a/kv/txn_scope_var.go b/kv/txn_scope_var.go new file mode 100644 index 0000000000000..941fdaff5f26f --- /dev/null +++ b/kv/txn_scope_var.go @@ -0,0 +1,73 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/store/tikv/oracle" +) + +// TxnScopeVar indicates the used txnScope for oracle +type TxnScopeVar struct { + // varValue indicates the value of @@txn_scope, which can only be `global` or `local` + varValue string + // txnScope indicates the value which the tidb-server holds to request tso to pd + txnScope string +} + +// GetTxnScopeVar gets TxnScopeVar from config +func GetTxnScopeVar() TxnScopeVar { + isGlobal, location := config.GetTxnScopeFromConfig() + if isGlobal { + return NewGlobalTxnScopeVar() + } + return NewLocalTxnScopeVar(location) +} + +// NewGlobalTxnScopeVar creates a Global TxnScopeVar +func NewGlobalTxnScopeVar() TxnScopeVar { + return newTxnScopeVar(GlobalTxnScope, GlobalTxnScope) +} + +// NewLocalTxnScopeVar creates a Local TxnScopeVar with given real txnScope value. +func NewLocalTxnScopeVar(txnScope string) TxnScopeVar { + return newTxnScopeVar(LocalTxnScope, txnScope) +} + +// GetVarValue returns the value of @@txn_scope which can only be `global` or `local` +func (t TxnScopeVar) GetVarValue() string { + return t.varValue +} + +// GetTxnScope returns the value of the tidb-server holds to request tso to pd. +func (t TxnScopeVar) GetTxnScope() string { + return t.txnScope +} + +func newTxnScopeVar(varValue string, txnScope string) TxnScopeVar { + return TxnScopeVar{ + varValue: varValue, + txnScope: txnScope, + } +} + +// Transaction scopes constants. +const ( + // GlobalTxnScope is synced with PD's define of global scope. + // If we want to remove the dependency on store/tikv here, we need to map + // the two GlobalTxnScopes in the driver layer. + GlobalTxnScope = oracle.GlobalTxnScope + // LocalTxnScope indicates the transaction should use local ts. + LocalTxnScope = "local" +) diff --git a/meta/meta_test.go b/meta/meta_test.go index 590e85fc2a21e..4ba54f1935a3a 100644 --- a/meta/meta_test.go +++ b/meta/meta_test.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/store/mockstore" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/testleak" . "github.com/pingcap/tidb/util/testutil" ) @@ -291,7 +290,7 @@ func (s *testSuite) TestSnapshot(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) - ver1, _ := store.CurrentVersion(oracle.GlobalTxnScope) + ver1, _ := store.CurrentVersion(kv.GlobalTxnScope) time.Sleep(time.Millisecond) txn, _ = store.Begin() m = meta.NewMeta(txn) diff --git a/server/server.go b/server/server.go index 29f5307895cc2..935abbb7bc693 100644 --- a/server/server.go +++ b/server/server.go @@ -54,11 +54,11 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/plugin" "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/fastrand" @@ -311,9 +311,9 @@ func setSSLVariable(ca, key, cert string) { func setTxnScope() { variable.SetSysVar("txn_scope", func() string { if isGlobal, _ := config.GetTxnScopeFromConfig(); isGlobal { - return oracle.GlobalTxnScope + return kv.GlobalTxnScope } - return oracle.LocalTxnScope + return kv.LocalTxnScope }()) } diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 72853d86208a9..2e8c01c75577b 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -2029,14 +2029,14 @@ func (s *testPessimisticSuite) TestSelectForUpdateConflictRetry(c *C) { tsCh := make(chan uint64) go func() { tk3.MustExec("update tk set c2 = c2 + 1 where c1 = 1") - lastTS, err := s.store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + lastTS, err := s.store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: kv.GlobalTxnScope}) c.Assert(err, IsNil) tsCh <- lastTS tk3.MustExec("commit") tsCh <- lastTS }() // tk2LastTS should be its forUpdateTS - tk2LastTS, err := s.store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + tk2LastTS, err := s.store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: kv.GlobalTxnScope}) c.Assert(err, IsNil) tk2.MustExec("commit") diff --git a/session/schema_amender_test.go b/session/schema_amender_test.go index ca05f4a74dbff..eda0e5e387e05 100644 --- a/session/schema_amender_test.go +++ b/session/schema_amender_test.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -426,7 +425,7 @@ func (s *testSchemaAmenderSuite) TestAmendCollectAndGenMutations(c *C) { } c.Assert(err, IsNil) } - curVer, err := se.store.CurrentVersion(oracle.GlobalTxnScope) + curVer, err := se.store.CurrentVersion(kv.GlobalTxnScope) c.Assert(err, IsNil) se.sessionVars.TxnCtx.SetForUpdateTS(curVer.Ver + 1) mutationVals, err := txn.BatchGet(ctx, checkKeys) diff --git a/session/session.go b/session/session.go index efd6706c4ffb3..902f35ca28e79 100644 --- a/session/session.go +++ b/session/session.go @@ -69,7 +69,6 @@ import ( "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/store/tikv" - "github.com/pingcap/tidb/store/tikv/oracle" tikvutil "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/telemetry" @@ -1821,7 +1820,7 @@ func (s *session) IsCachedExecOk(ctx context.Context, preparedStmt *plannercore. return false, nil } // check auto commit - if !s.GetSessionVars().IsAutocommit() { + if !plannercore.IsAutoCommitTxn(s) { return false, nil } // check schema version @@ -1975,7 +1974,7 @@ func (s *session) NewTxn(ctx context.Context) error { zap.String("txnScope", txnScope)) } - txn, err := s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) + txn, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) if err != nil { return err } @@ -2768,7 +2767,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error { } // no need to get txn from txnFutureCh since txn should init with startTs - txn, err := s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS)) + txn, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS)) if err != nil { return err } @@ -2801,22 +2800,22 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc txnScope := s.GetSessionVars().CheckAndGetTxnScope() switch option.Mode { case ast.TimestampBoundReadTimestamp: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) if err != nil { return err } case ast.TimestampBoundExactStaleness: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) if err != nil { return err } case ast.TimestampBoundMaxStaleness: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec)) if err != nil { return err } case ast.TimestampBoundMinReadTimestamp: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS)) if err != nil { return err } @@ -2929,9 +2928,9 @@ func (s *session) checkPlacementPolicyBeforeCommit() error { // Get the txnScope of the transaction we're going to commit. txnScope := s.GetSessionVars().TxnCtx.TxnScope if txnScope == "" { - txnScope = oracle.GlobalTxnScope + txnScope = kv.GlobalTxnScope } - if txnScope != oracle.GlobalTxnScope { + if txnScope != kv.GlobalTxnScope { is := s.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) deltaMap := s.GetSessionVars().TxnCtx.TableDeltaMap for physicalTableID := range deltaMap { diff --git a/session/session_test.go b/session/session_test.go index df2a167921e56..a6c7908237bca 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -52,7 +52,6 @@ import ( "github.com/pingcap/tidb/store/mockstore/mockcopr" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/mockstore/cluster" - "github.com/pingcap/tidb/store/tikv/oracle" tikvutil "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -2075,7 +2074,7 @@ func (s *testSchemaSerialSuite) TestLoadSchemaFailed(c *C) { _, err = tk1.Exec("commit") c.Check(err, NotNil) - ver, err := s.store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := s.store.CurrentVersion(kv.GlobalTxnScope) c.Assert(err, IsNil) c.Assert(ver, NotNil) @@ -3338,26 +3337,26 @@ func (s *testSessionSerialSuite) TestSetTxnScope(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) // assert default value result := tk.MustQuery("select @@txn_scope;") - result.Check(testkit.Rows(oracle.GlobalTxnScope)) - c.Assert(tk.Se.GetSessionVars().CheckAndGetTxnScope(), Equals, oracle.GlobalTxnScope) + result.Check(testkit.Rows(kv.GlobalTxnScope)) + c.Assert(tk.Se.GetSessionVars().CheckAndGetTxnScope(), Equals, kv.GlobalTxnScope) // assert set sys variable tk.MustExec("set @@session.txn_scope = 'local';") result = tk.MustQuery("select @@txn_scope;") - result.Check(testkit.Rows(oracle.GlobalTxnScope)) - c.Assert(tk.Se.GetSessionVars().CheckAndGetTxnScope(), Equals, oracle.GlobalTxnScope) + result.Check(testkit.Rows(kv.GlobalTxnScope)) + c.Assert(tk.Se.GetSessionVars().CheckAndGetTxnScope(), Equals, kv.GlobalTxnScope) failpoint.Disable("github.com/pingcap/tidb/store/tikv/config/injectTxnScope") failpoint.Enable("github.com/pingcap/tidb/store/tikv/config/injectTxnScope", `return("bj")`) defer failpoint.Disable("github.com/pingcap/tidb/store/tikv/config/injectTxnScope") tk = testkit.NewTestKitWithInit(c, s.store) // assert default value result = tk.MustQuery("select @@txn_scope;") - result.Check(testkit.Rows(oracle.LocalTxnScope)) + result.Check(testkit.Rows(kv.LocalTxnScope)) c.Assert(tk.Se.GetSessionVars().CheckAndGetTxnScope(), Equals, "bj") // assert set sys variable tk.MustExec("set @@session.txn_scope = 'global';") result = tk.MustQuery("select @@txn_scope;") - result.Check(testkit.Rows(oracle.GlobalTxnScope)) - c.Assert(tk.Se.GetSessionVars().CheckAndGetTxnScope(), Equals, oracle.GlobalTxnScope) + result.Check(testkit.Rows(kv.GlobalTxnScope)) + c.Assert(tk.Se.GetSessionVars().CheckAndGetTxnScope(), Equals, kv.GlobalTxnScope) // assert set invalid txn_scope err := tk.ExecToErr("set @@txn_scope='foo'") @@ -3414,9 +3413,9 @@ PARTITION BY RANGE (c) ( setBundle("p1", "dc-2") // set txn_scope to global - tk.MustExec(fmt.Sprintf("set @@session.txn_scope = '%s';", oracle.GlobalTxnScope)) + tk.MustExec(fmt.Sprintf("set @@session.txn_scope = '%s';", kv.GlobalTxnScope)) result := tk.MustQuery("select @@txn_scope;") - result.Check(testkit.Rows(oracle.GlobalTxnScope)) + result.Check(testkit.Rows(kv.GlobalTxnScope)) // test global txn auto commit tk.MustExec("insert into t1 (c) values (1)") // write dc-1 with global scope @@ -3427,7 +3426,7 @@ PARTITION BY RANGE (c) ( tk.MustExec("begin") txn, err := tk.Se.Txn(true) c.Assert(err, IsNil) - c.Assert(tk.Se.GetSessionVars().TxnCtx.TxnScope, Equals, oracle.GlobalTxnScope) + c.Assert(tk.Se.GetSessionVars().TxnCtx.TxnScope, Equals, kv.GlobalTxnScope) c.Assert(txn.Valid(), IsTrue) tk.MustExec("insert into t1 (c) values (1)") // write dc-1 with global scope result = tk.MustQuery("select * from t1") // read dc-1 and dc-2 with global scope @@ -3441,7 +3440,7 @@ PARTITION BY RANGE (c) ( tk.MustExec("begin") txn, err = tk.Se.Txn(true) c.Assert(err, IsNil) - c.Assert(tk.Se.GetSessionVars().TxnCtx.TxnScope, Equals, oracle.GlobalTxnScope) + c.Assert(tk.Se.GetSessionVars().TxnCtx.TxnScope, Equals, kv.GlobalTxnScope) c.Assert(txn.Valid(), IsTrue) tk.MustExec("insert into t1 (c) values (101)") // write dc-2 with global scope result = tk.MustQuery("select * from t1") // read dc-1 and dc-2 with global scope @@ -4486,3 +4485,61 @@ func (s *testSessionSuite) TestReadDMLBatchSize(c *C) { _, _ = se.Execute(context.TODO(), "select 1") c.Assert(se.GetSessionVars().DMLBatchSize, Equals, 1000) } + +func (s *testSessionSuite) TestInTxnPSProtoPointGet(c *C) { + ctx := context.Background() + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("create table t1(c1 int primary key, c2 int, c3 int)") + tk.MustExec("insert into t1 values(1, 10, 100)") + + // Generate the ps statement and make the prepared plan cached for point get. + id, _, _, err := tk.Se.PrepareStmt("select c1, c2 from t1 where c1 = ?") + c.Assert(err, IsNil) + idForUpdate, _, _, err := tk.Se.PrepareStmt("select c1, c2 from t1 where c1 = ? for update") + c.Assert(err, IsNil) + params := []types.Datum{types.NewDatum(1)} + rs, err := tk.Se.ExecutePreparedStmt(ctx, id, params) + c.Assert(err, IsNil) + tk.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows("1 10")) + rs, err = tk.Se.ExecutePreparedStmt(ctx, idForUpdate, params) + c.Assert(err, IsNil) + tk.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows("1 10")) + + // Query again the cached plan will be used. + rs, err = tk.Se.ExecutePreparedStmt(ctx, id, params) + c.Assert(err, IsNil) + tk.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows("1 10")) + rs, err = tk.Se.ExecutePreparedStmt(ctx, idForUpdate, params) + c.Assert(err, IsNil) + tk.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows("1 10")) + + // Start a transaction, now the in txn flag will be added to the session vars. + _, err = tk.Se.Execute(ctx, "start transaction") + c.Assert(err, IsNil) + rs, err = tk.Se.ExecutePreparedStmt(ctx, id, params) + c.Assert(err, IsNil) + tk.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows("1 10")) + txn, err := tk.Se.Txn(false) + c.Assert(err, IsNil) + c.Assert(txn.Valid(), IsTrue) + rs, err = tk.Se.ExecutePreparedStmt(ctx, idForUpdate, params) + c.Assert(err, IsNil) + tk.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows("1 10")) + txn, err = tk.Se.Txn(false) + c.Assert(err, IsNil) + c.Assert(txn.Valid(), IsTrue) + _, err = tk.Se.Execute(ctx, "update t1 set c2 = c2 + 1") + c.Assert(err, IsNil) + // Check the read result after in-transaction update. + rs, err = tk.Se.ExecutePreparedStmt(ctx, id, params) + c.Assert(err, IsNil) + tk.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows("1 11")) + rs, err = tk.Se.ExecutePreparedStmt(ctx, idForUpdate, params) + c.Assert(err, IsNil) + tk.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows("1 11")) + txn, err = tk.Se.Txn(false) + c.Assert(err, IsNil) + c.Assert(txn.Valid(), IsTrue) + tk.MustExec("commit") +} diff --git a/session/txn.go b/session/txn.go index 133cafb976aae..294725f8efaa0 100644 --- a/session/txn.go +++ b/session/txn.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" + "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" @@ -436,14 +437,14 @@ type txnFuture struct { func (tf *txnFuture) wait() (kv.Transaction, error) { startTS, err := tf.future.Wait() if err == nil { - return tf.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(tf.txnScope).SetStartTs(startTS)) + return tf.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(tf.txnScope).SetStartTs(startTS)) } else if config.GetGlobalConfig().Store == "unistore" { return nil, err } logutil.BgLogger().Warn("wait tso failed", zap.Error(err)) // It would retry get timestamp. - return tf.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(tf.txnScope)) + return tf.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(tf.txnScope)) } func (s *session) getTxnFuture(ctx context.Context) *txnFuture { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index c474e7905fa7b..d38881a8ddcfa 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -800,7 +800,7 @@ type SessionVars struct { PartitionPruneMode atomic2.String // TxnScope indicates the scope of the transactions. It should be `global` or equal to `dc-location` in configuration. - TxnScope oracle.TxnScope + TxnScope kv.TxnScopeVar // EnabledRateLimitAction indicates whether enabled ratelimit action during coprocessor EnabledRateLimitAction bool @@ -863,12 +863,12 @@ func (s *SessionVars) IsMPPEnforced() bool { // CheckAndGetTxnScope will return the transaction scope we should use in the current session. func (s *SessionVars) CheckAndGetTxnScope() string { if s.InRestrictedSQL { - return oracle.GlobalTxnScope + return kv.GlobalTxnScope } - if s.TxnScope.GetVarValue() == oracle.LocalTxnScope { + if s.TxnScope.GetVarValue() == kv.LocalTxnScope { return s.TxnScope.GetTxnScope() } - return oracle.GlobalTxnScope + return kv.GlobalTxnScope } // UseDynamicPartitionPrune indicates whether use new dynamic partition prune. @@ -1055,7 +1055,7 @@ func NewSessionVars() *SessionVars { EnableAlterPlacement: DefTiDBEnableAlterPlacement, EnableAmendPessimisticTxn: DefTiDBEnableAmendPessimisticTxn, PartitionPruneMode: *atomic2.NewString(DefTiDBPartitionPruneMode), - TxnScope: oracle.GetTxnScope(), + TxnScope: kv.GetTxnScopeVar(), EnabledRateLimitAction: DefTiDBEnableRateLimitAction, EnableAsyncCommit: DefTiDBEnableAsyncCommit, Enable1PC: DefTiDBEnable1PC, diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 18ebe75eb80dd..1df9e3c0f582c 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/logutil" @@ -743,15 +742,15 @@ var defaultSysVars = []*SysVar{ /* TiDB specific variables */ {Scope: ScopeSession, Name: TiDBTxnScope, Value: func() string { if isGlobal, _ := config.GetTxnScopeFromConfig(); isGlobal { - return oracle.GlobalTxnScope + return kv.GlobalTxnScope } - return oracle.LocalTxnScope + return kv.LocalTxnScope }(), SetSession: func(s *SessionVars, val string) error { switch val { - case oracle.GlobalTxnScope: - s.TxnScope = oracle.NewGlobalTxnScope() - case oracle.LocalTxnScope: - s.TxnScope = oracle.GetTxnScope() + case kv.GlobalTxnScope: + s.TxnScope = kv.NewGlobalTxnScopeVar() + case kv.LocalTxnScope: + s.TxnScope = kv.GetTxnScopeVar() default: return ErrWrongValueForVar.GenWithStack("@@txn_scope value should be global or local") } diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index cc0f217280f31..2d93b7eda4abb 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -307,7 +307,7 @@ func (s *tikvStore) Begin() (kv.Transaction, error) { } // BeginWithOption begins a transaction with given option -func (s *tikvStore) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) { +func (s *tikvStore) BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) { txn, err := s.KVStore.BeginWithOption(option) if err != nil { return nil, derr.ToTiDBErr(err) diff --git a/store/gcworker/gc_worker.go b/store/gcworker/gc_worker.go index b408f279be98a..b5b42df1838b9 100644 --- a/store/gcworker/gc_worker.go +++ b/store/gcworker/gc_worker.go @@ -74,7 +74,7 @@ type GCWorker struct { // NewGCWorker creates a GCWorker instance. func NewGCWorker(store kv.Storage, pdClient pd.Client) (*GCWorker, error) { - ver, err := store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := store.CurrentVersion(kv.GlobalTxnScope) if err != nil { return nil, errors.Trace(err) } @@ -429,7 +429,7 @@ func (w *GCWorker) calcSafePointByMinStartTS(ctx context.Context, safePoint uint } func (w *GCWorker) getOracleTime() (time.Time, error) { - currentVer, err := w.store.CurrentVersion(oracle.GlobalTxnScope) + currentVer, err := w.store.CurrentVersion(kv.GlobalTxnScope) if err != nil { return time.Time{}, errors.Trace(err) } @@ -1984,7 +1984,7 @@ type MockGCWorker struct { // NewMockGCWorker creates a MockGCWorker instance ONLY for test. func NewMockGCWorker(store kv.Storage) (*MockGCWorker, error) { - ver, err := store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := store.CurrentVersion(kv.GlobalTxnScope) if err != nil { return nil, errors.Trace(err) } diff --git a/store/helper/helper.go b/store/helper/helper.go index 49aa7cf2107e0..8eb9b9d7db828 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -48,7 +48,7 @@ import ( // Methods copied from kv.Storage and tikv.Storage due to limitation of go1.13. type Storage interface { Begin() (kv.Transaction, error) - BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) + BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) GetSnapshot(ver kv.Version) kv.Snapshot GetClient() kv.Client GetMPPClient() kv.MPPClient diff --git a/store/mockstore/mockcopr/executor_test.go b/store/mockstore/mockcopr/executor_test.go index af9ac45beae96..7437b8d995997 100644 --- a/store/mockstore/mockcopr/executor_test.go +++ b/store/mockstore/mockcopr/executor_test.go @@ -83,7 +83,7 @@ func (s *testExecutorSuite) TestResolvedLargeTxnLocks(c *C) { tk.MustExec("insert into t values (1, 1)") o := s.store.GetOracle() - tso, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + tso, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: kv.GlobalTxnScope}) c.Assert(err, IsNil) key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1)) diff --git a/store/mockstore/mockstorage/storage.go b/store/mockstore/mockstorage/storage.go index 6221ef855707d..7d78d1a9b7418 100644 --- a/store/mockstore/mockstorage/storage.go +++ b/store/mockstore/mockstorage/storage.go @@ -83,7 +83,7 @@ func (s *mockStorage) ShowStatus(ctx context.Context, key string) (interface{}, } // BeginWithOption begins a transaction with given option -func (s *mockStorage) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) { +func (s *mockStorage) BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) { return newTiKVTxn(s.KVStore.BeginWithOption(option)) } diff --git a/store/store_test.go b/store/store_test.go index 627a214badee7..3f4a44cecc189 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -26,7 +26,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/testleak" ) @@ -543,7 +542,7 @@ func (s *testKVSuite) TestDBClose(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) - ver, err := store.CurrentVersion(oracle.GlobalTxnScope) + ver, err := store.CurrentVersion(kv.GlobalTxnScope) c.Assert(err, IsNil) c.Assert(kv.MaxVersion.Cmp(ver), Equals, 1) diff --git a/store/tikv/kv.go b/store/tikv/kv.go index edaef3b4744d7..622313f382abd 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" - tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/config" tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/kv" @@ -189,11 +188,11 @@ func (s *KVStore) runSafePointChecker() { // Begin a global transaction. func (s *KVStore) Begin() (*KVTxn, error) { - return s.BeginWithOption(tidbkv.DefaultTransactionOption()) + return s.BeginWithOption(DefaultStartTSOption()) } -// BeginWithOption begins a transaction with the given TransactionOption -func (s *KVStore) BeginWithOption(options tidbkv.TransactionOption) (*KVTxn, error) { +// BeginWithOption begins a transaction with the given StartTSOption +func (s *KVStore) BeginWithOption(options StartTSOption) (*KVTxn, error) { return newTiKVTxnWithOptions(s, options) } @@ -389,6 +388,7 @@ func (s *KVStore) getSafeTS(storeID uint64) uint64 { return safeTS.(uint64) } +// setSafeTS sets safeTs for store storeID, export for testing func (s *KVStore) setSafeTS(storeID, safeTS uint64) { s.safeTSMap.Store(storeID, safeTS) } diff --git a/store/tikv/oracle/oracle.go b/store/tikv/oracle/oracle.go index daf00c66814ca..1b08129d412aa 100644 --- a/store/tikv/oracle/oracle.go +++ b/store/tikv/oracle/oracle.go @@ -18,7 +18,6 @@ import ( "time" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/store/tikv/config" "github.com/pingcap/tidb/store/tikv/logutil" "go.uber.org/zap" ) @@ -45,57 +44,11 @@ type Future interface { Wait() (uint64, error) } -// TxnScope indicates the used txnScope for oracle -type TxnScope struct { - // varValue indicates the value of @@txn_scope, which can only be `global` or `local` - varValue string - // txnScope indicates the value which the tidb-server holds to request tso to pd - txnScope string -} - -// GetTxnScope gets oracle.TxnScope from config -func GetTxnScope() TxnScope { - isGlobal, location := config.GetTxnScopeFromConfig() - if isGlobal { - return NewGlobalTxnScope() - } - return NewLocalTxnScope(location) -} - -// NewGlobalTxnScope creates a Global TxnScope -func NewGlobalTxnScope() TxnScope { - return newTxnScope(GlobalTxnScope, GlobalTxnScope) -} - -// NewLocalTxnScope creates a Local TxnScope with given real txnScope value. -func NewLocalTxnScope(txnScope string) TxnScope { - return newTxnScope(LocalTxnScope, txnScope) -} - -// GetVarValue returns the value of @@txn_scope which can only be `global` or `local` -func (t TxnScope) GetVarValue() string { - return t.varValue -} - -// GetTxnScope returns the value of the tidb-server holds to request tso to pd. -func (t TxnScope) GetTxnScope() string { - return t.txnScope -} - -func newTxnScope(varValue string, txnScope string) TxnScope { - return TxnScope{ - varValue: varValue, - txnScope: txnScope, - } -} - const ( physicalShiftBits = 18 logicalBits = (1 << physicalShiftBits) - 1 // GlobalTxnScope is the default transaction scope for a Oracle service. GlobalTxnScope = "global" - // LocalTxnScope indicates the local txn scope for a Oracle service. - LocalTxnScope = "local" ) // ComposeTS creates a ts from physical and logical parts. diff --git a/store/tikv/test_probe.go b/store/tikv/test_probe.go index 1a8dc5062218d..3e80e6310fe4b 100644 --- a/store/tikv/test_probe.go +++ b/store/tikv/test_probe.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/store/tikv/unionstore" @@ -81,6 +82,23 @@ func (s StoreProbe) SaveSafePoint(v uint64) error { return saveSafePoint(s.GetSafePointKV(), v) } +// SetRegionCacheStore is used to set a store in region cache, for testing only +func (s StoreProbe) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) { + s.regionCache.storeMu.Lock() + defer s.regionCache.storeMu.Unlock() + s.regionCache.storeMu.stores[id] = &Store{ + storeID: id, + storeType: storeType, + state: state, + labels: labels, + } +} + +// SetSafeTS is used to set safeTS for the store with `storeID` +func (s StoreProbe) SetSafeTS(storeID, safeTS uint64) { + s.setSafeTS(storeID, safeTS) +} + // TxnProbe wraps a txn and exports internal states for testing purpose. type TxnProbe struct { *KVTxn diff --git a/store/tikv/tests/2pc_test.go b/store/tikv/tests/2pc_test.go index 5589752043b2b..43b682160c514 100644 --- a/store/tikv/tests/2pc_test.go +++ b/store/tikv/tests/2pc_test.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" - tidbkv "github.com/pingcap/tidb/kv" drivertxn "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/config" @@ -603,12 +602,12 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) { // Use max.Uint64 to read the data and success. // That means the final commitTS > startTS+2, it's not the one we provide. // So we cover the rety commitTS logic. - txn1, err := s.store.BeginWithOption(tidbkv.DefaultTransactionOption().SetStartTs(committer.GetStartTS() + 2)) + txn1, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetStartTs(committer.GetStartTS() + 2)) c.Assert(err, IsNil) _, err = txn1.Get(bo.GetCtx(), []byte("x")) c.Assert(tikverr.IsErrNotFound(err), IsTrue) - txn2, err := s.store.BeginWithOption(tidbkv.DefaultTransactionOption().SetStartTs(math.MaxUint64)) + txn2, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetStartTs(math.MaxUint64)) c.Assert(err, IsNil) val, err := txn2.Get(bo.GetCtx(), []byte("x")) c.Assert(err, IsNil) diff --git a/store/tikv/extract_start_ts_test.go b/store/tikv/tests/extract_start_ts_test.go similarity index 56% rename from store/tikv/extract_start_ts_test.go rename to store/tikv/tests/extract_start_ts_test.go index b392ca365cde8..82f37796dce8a 100644 --- a/store/tikv/extract_start_ts_test.go +++ b/store/tikv/tests/extract_start_ts_test.go @@ -11,20 +11,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tikv +package tikv_test import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore/unistore" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" ) type extractStartTsSuite struct { - store *KVStore + store *tikv.KVStore } var _ = SerialSuites(&extractStartTsSuite{}) @@ -33,31 +33,24 @@ func (s *extractStartTsSuite) SetUpTest(c *C) { client, pdClient, cluster, err := unistore.New("") c.Assert(err, IsNil) unistore.BootstrapWithSingleStore(cluster) - store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0) + store, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0) c.Assert(err, IsNil) - store.regionCache.storeMu.stores[2] = &Store{ - storeID: 2, - storeType: tikvrpc.TiKV, - state: uint64(resolved), - labels: []*metapb.StoreLabel{ - { - Key: DCLabelKey, - Value: oracle.LocalTxnScope, - }, + probe := tikv.StoreProbe{KVStore: store} + probe.SetRegionCacheStore(2, tikvrpc.TiKV, 1, []*metapb.StoreLabel{ + { + Key: tikv.DCLabelKey, + Value: "local1", }, - } - store.regionCache.storeMu.stores[3] = &Store{ - storeID: 3, - storeType: tikvrpc.TiKV, - state: uint64(resolved), - labels: []*metapb.StoreLabel{{ - Key: DCLabelKey, + }) + probe.SetRegionCacheStore(3, tikvrpc.TiKV, 1, []*metapb.StoreLabel{ + { + Key: tikv.DCLabelKey, Value: "Some Random Label", - }}, - } - store.setSafeTS(2, 102) - store.setSafeTS(3, 101) - s.store = store + }, + }) + probe.SetSafeTS(2, 102) + probe.SetSafeTS(3, 101) + s.store = probe.KVStore } func (s *extractStartTsSuite) TestExtractStartTs(c *C) { @@ -69,26 +62,26 @@ func (s *extractStartTsSuite) TestExtractStartTs(c *C) { cases := []struct { expectedTS uint64 - option kv.TransactionOption + option tikv.StartTSOption }{ // StartTS setted - {100, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: &i, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, + {100, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: &i, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, // PrevSec setted - {200, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: &i, MinStartTS: nil, MaxPrevSec: nil}}, + {200, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: &i, MinStartTS: nil, MaxPrevSec: nil}}, // MinStartTS setted, global - {101, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, + {101, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, // MinStartTS setted, local - {102, kv.TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, + {102, tikv.StartTSOption{TxnScope: "local1", StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, // MaxPrevSec setted // however we need to add more cases to check the behavior when it fall backs to MinStartTS setted // see `TestMaxPrevSecFallback` - {200, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {200, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, // nothing setted - {300, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, + {300, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, } for _, cs := range cases { expected := cs.expectedTS - result, _ := extractStartTs(s.store, cs.option) + result, _ := tikv.ExtractStartTs(s.store, cs.option) c.Assert(result, Equals, expected) } @@ -97,18 +90,19 @@ func (s *extractStartTsSuite) TestExtractStartTs(c *C) { } func (s *extractStartTsSuite) TestMaxPrevSecFallback(c *C) { - s.store.setSafeTS(2, 0x8000000000000002) - s.store.setSafeTS(3, 0x8000000000000001) + probe := tikv.StoreProbe{KVStore: s.store} + probe.SetSafeTS(2, 0x8000000000000002) + probe.SetSafeTS(3, 0x8000000000000001) i := uint64(100) cases := []struct { expectedTS uint64 - option kv.TransactionOption + option tikv.StartTSOption }{ - {0x8000000000000001, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, - {0x8000000000000002, kv.TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {0x8000000000000001, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {0x8000000000000002, tikv.StartTSOption{TxnScope: "local1", StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, } for _, cs := range cases { - result, _ := extractStartTs(s.store, cs.option) + result, _ := tikv.ExtractStartTs(s.store, cs.option) c.Assert(result, Equals, cs.expectedTS) } } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index beeeafe66a063..d228b834e00dc 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/kv" tikverr "github.com/pingcap/tidb/store/tikv/error" tikv "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" @@ -55,6 +54,52 @@ type SchemaAmender interface { AmendTxn(ctx context.Context, startInfoSchema SchemaVer, change *RelatedSchemaChange, mutations CommitterMutations) (CommitterMutations, error) } +// StartTSOption indicates the option when beginning a transaction +// `TxnScope` must be set for each object +// Every other fields are optional, but currently at most one of them can be set +type StartTSOption struct { + TxnScope string + StartTS *uint64 + PrevSec *uint64 + MinStartTS *uint64 + MaxPrevSec *uint64 +} + +// DefaultStartTSOption creates a default StartTSOption, ie. Work in GlobalTxnScope and get start ts when got used +func DefaultStartTSOption() StartTSOption { + return StartTSOption{TxnScope: oracle.GlobalTxnScope} +} + +// SetMaxPrevSec returns a new StartTSOption with MaxPrevSec set to maxPrevSec +func (to StartTSOption) SetMaxPrevSec(maxPrevSec uint64) StartTSOption { + to.MaxPrevSec = &maxPrevSec + return to +} + +// SetMinStartTS returns a new StartTSOption with MinStartTS set to minStartTS +func (to StartTSOption) SetMinStartTS(minStartTS uint64) StartTSOption { + to.MinStartTS = &minStartTS + return to +} + +// SetStartTs returns a new StartTSOption with StartTS set to startTS +func (to StartTSOption) SetStartTs(startTS uint64) StartTSOption { + to.StartTS = &startTS + return to +} + +// SetPrevSec returns a new StartTSOption with PrevSec set to prevSec +func (to StartTSOption) SetPrevSec(prevSec uint64) StartTSOption { + to.PrevSec = &prevSec + return to +} + +// SetTxnScope returns a new StartTSOption with TxnScope set to txnScope +func (to StartTSOption) SetTxnScope(txnScope string) StartTSOption { + to.TxnScope = txnScope + return to +} + // KVTxn contains methods to interact with a TiKV transaction. type KVTxn struct { snapshot *KVSnapshot @@ -90,23 +135,24 @@ type KVTxn struct { kvFilter KVFilter } -func extractStartTs(store *KVStore, options kv.TransactionOption) (uint64, error) { +// ExtractStartTs use `option` to get the proper startTS for a transaction +func ExtractStartTs(store *KVStore, option StartTSOption) (uint64, error) { var startTs uint64 var err error - if options.StartTS != nil { - startTs = *options.StartTS - } else if options.PrevSec != nil { + if option.StartTS != nil { + startTs = *option.StartTS + } else if option.PrevSec != nil { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) - startTs, err = store.getStalenessTimestamp(bo, options.TxnScope, *options.PrevSec) - } else if options.MinStartTS != nil { + startTs, err = store.getStalenessTimestamp(bo, option.TxnScope, *option.PrevSec) + } else if option.MinStartTS != nil { stores := make([]*Store, 0) allStores := store.regionCache.getStoresByType(tikvrpc.TiKV) - if options.TxnScope != oracle.GlobalTxnScope { + if option.TxnScope != oracle.GlobalTxnScope { for _, store := range allStores { if store.IsLabelsMatch([]*metapb.StoreLabel{ { Key: DCLabelKey, - Value: options.TxnScope, + Value: option.TxnScope, }, }) { stores = append(stores, store) @@ -116,32 +162,32 @@ func extractStartTs(store *KVStore, options kv.TransactionOption) (uint64, error stores = allStores } safeTS := store.getMinSafeTSByStores(stores) - startTs = *options.MinStartTS + startTs = *option.MinStartTS // If the safeTS is larger than the minStartTS, we will use safeTS as StartTS, otherwise we will use // minStartTS directly. if startTs < safeTS { startTs = safeTS } - } else if options.MaxPrevSec != nil { + } else if option.MaxPrevSec != nil { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) - minStartTS, err := store.getStalenessTimestamp(bo, options.TxnScope, *options.MaxPrevSec) + minStartTS, err := store.getStalenessTimestamp(bo, option.TxnScope, *option.MaxPrevSec) if err != nil { return 0, errors.Trace(err) } - options.MinStartTS = &minStartTS - return extractStartTs(store, options) + option.MinStartTS = &minStartTS + return ExtractStartTs(store, option) } else { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) - startTs, err = store.getTimestampWithRetry(bo, options.TxnScope) + startTs, err = store.getTimestampWithRetry(bo, option.TxnScope) } return startTs, err } -func newTiKVTxnWithOptions(store *KVStore, options kv.TransactionOption) (*KVTxn, error) { +func newTiKVTxnWithOptions(store *KVStore, options StartTSOption) (*KVTxn, error) { if options.TxnScope == "" { options.TxnScope = oracle.GlobalTxnScope } - startTs, err := extractStartTs(store, options) + startTs, err := ExtractStartTs(store, options) if err != nil { return nil, errors.Trace(err) } diff --git a/util/checksum/checksum.go b/util/checksum/checksum.go index 843440547100c..ef90f44e7cb00 100644 --- a/util/checksum/checksum.go +++ b/util/checksum/checksum.go @@ -42,11 +42,12 @@ var checksumReaderBufPool = sync.Pool{ // | -- 4B -- | -- 1020B -- || -- 4B -- | -- 1020B -- || -- 4B -- | -- 60B -- | // | -- checksum -- | -- payload -- || -- checksum -- | -- payload -- || -- checksum -- | -- payload -- | type Writer struct { - err error - w io.WriteCloser - buf []byte - payload []byte - payloadUsed int + err error + w io.WriteCloser + buf []byte + payload []byte + payloadUsed int + flushedUserDataCnt int64 } // NewWriter returns a new Writer which calculates and stores a CRC-32 checksum for the payload before @@ -104,10 +105,21 @@ func (w *Writer) Flush() error { w.err = err return err } + w.flushedUserDataCnt += int64(w.payloadUsed) w.payloadUsed = 0 return nil } +// GetCache returns the byte slice that holds the data not flushed to disk. +func (w *Writer) GetCache() []byte { + return w.payload[:w.payloadUsed] +} + +// GetCacheDataOffset return the user data offset in cache. +func (w *Writer) GetCacheDataOffset() int64 { + return w.flushedUserDataCnt +} + // Close implements the io.Closer interface. func (w *Writer) Close() (err error) { err = w.Flush() diff --git a/util/checksum/checksum_test.go b/util/checksum/checksum_test.go index b0de5b90586c9..1473903fbe080 100644 --- a/util/checksum/checksum_test.go +++ b/util/checksum/checksum_test.go @@ -651,3 +651,75 @@ func (s *testChecksumSuite) testTiCase3651and3652(c *check.C, encrypt bool) { assertReadAt(0, make([]byte, 10200), nil, 10200, strings.Repeat("0123456789", 1020), f1) assertReadAt(0, make([]byte, 10200), nil, 10200, strings.Repeat("0123456789", 1020), f2) } + +var checkFlushedData = func(c *check.C, f io.ReaderAt, off int64, readBufLen int, assertN int, assertErr interface{}, assertRes []byte) { + readBuf := make([]byte, readBufLen) + r := NewReader(f) + n, err := r.ReadAt(readBuf, off) + c.Assert(err, check.Equals, assertErr) + c.Assert(n, check.Equals, assertN) + c.Assert(bytes.Compare(readBuf, assertRes), check.Equals, 0) +} + +func (s *testChecksumSuite) TestChecksumWriter(c *check.C) { + path := "checksum_TestChecksumWriter" + f, err := os.Create(path) + c.Assert(err, check.IsNil) + defer func() { + err = f.Close() + c.Assert(err, check.IsNil) + err = os.Remove(path) + c.Assert(err, check.IsNil) + }() + + buf := bytes.NewBuffer(nil) + testData := "0123456789" + for i := 0; i < 100; i++ { + buf.WriteString(testData) + } + + // Write 1000 bytes and flush. + w := NewWriter(f) + n, err := w.Write(buf.Bytes()) + c.Assert(err, check.IsNil) + c.Assert(n, check.Equals, 1000) + + err = w.Flush() + c.Assert(err, check.IsNil) + checkFlushedData(c, f, 0, 1000, 1000, nil, buf.Bytes()) + + // All data flushed, so no data in cache. + cacheOff := w.GetCacheDataOffset() + c.Assert(cacheOff, check.Equals, int64(1000)) +} + +func (s *testChecksumSuite) TestChecksumWriterAutoFlush(c *check.C) { + path := "checksum_TestChecksumWriterAutoFlush" + f, err := os.Create(path) + c.Assert(err, check.IsNil) + defer func() { + err = f.Close() + c.Assert(err, check.IsNil) + err = os.Remove(path) + c.Assert(err, check.IsNil) + }() + + w := NewWriter(f) + + buf := bytes.NewBuffer(nil) + testData := "0123456789" + for i := 0; i < 102; i++ { + buf.WriteString(testData) + } + n, err := w.Write(buf.Bytes()) + c.Assert(err, check.IsNil) + c.Assert(n, check.Equals, len(buf.Bytes())) + + // This write will trigger flush. + n, err = w.Write([]byte("0")) + c.Assert(err, check.IsNil) + c.Assert(n, check.Equals, 1) + checkFlushedData(c, f, 0, 1020, 1020, nil, buf.Bytes()) + cacheOff := w.GetCacheDataOffset() + c.Assert(cacheOff, check.Equals, int64(len(buf.Bytes()))) +} diff --git a/util/chunk/disk.go b/util/chunk/disk.go index c7962c9aa9e9d..ef269213e9d0d 100644 --- a/util/chunk/disk.go +++ b/util/chunk/disk.go @@ -46,6 +46,9 @@ type ListInDisk struct { diskTracker *disk.Tracker // track disk usage. numRowsInDisk int + checksumWriter *checksum.Writer + cipherWriter *encrypt.Writer + // ctrCipher stores the key and nonce using by aes encrypt io layer ctrCipher *encrypt.CtrCipher } @@ -78,9 +81,11 @@ func (l *ListInDisk) initDiskFile() (err error) { if err != nil { return } - underlying = encrypt.NewWriter(l.disk, l.ctrCipher) + l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher) + underlying = l.cipherWriter } - l.w = checksum.NewWriter(underlying) + l.checksumWriter = checksum.NewWriter(underlying) + l.w = l.checksumWriter l.bufFlushMutex = sync.RWMutex{} return } @@ -164,16 +169,16 @@ func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) { // GetRow gets a Row from the ListInDisk by RowPtr. func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) { - err = l.flush() if err != nil { return } off := l.offsets[ptr.ChkIdx][ptr.RowIdx] var underlying io.ReaderAt = l.disk if l.ctrCipher != nil { - underlying = encrypt.NewReader(l.disk, l.ctrCipher) + underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset()) } - r := io.NewSectionReader(checksum.NewReader(underlying), off, l.offWrite-off) + checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset()) + r := io.NewSectionReader(checksumReader, off, l.offWrite-off) format := rowInDisk{numCol: len(l.fieldTypes)} _, err = format.ReadFrom(r) if err != nil { @@ -367,3 +372,51 @@ func (format *diskFormatRow) toMutRow(fields []*types.FieldType) MutRow { } return MutRow{c: chk} } + +// ReaderWithCache helps to read data that has not be flushed to underlying layer. +// By using ReaderWithCache, user can still write data into ListInDisk even after reading. +type ReaderWithCache struct { + r io.ReaderAt + cacheOff int64 + cache []byte +} + +// NewReaderWithCache returns a ReaderWithCache. +func NewReaderWithCache(r io.ReaderAt, cache []byte, cacheOff int64) *ReaderWithCache { + return &ReaderWithCache{ + r: r, + cacheOff: cacheOff, + cache: cache, + } +} + +// ReadAt implements the ReadAt interface. +func (r *ReaderWithCache) ReadAt(p []byte, off int64) (readCnt int, err error) { + readCnt, err = r.r.ReadAt(p, off) + if err != io.EOF { + return readCnt, err + } + + if len(p) == readCnt { + return readCnt, err + } else if len(p) < readCnt { + return readCnt, errors2.Trace(errors2.Errorf("cannot read more data than user requested"+ + "(readCnt: %v, len(p): %v", readCnt, len(p))) + } + + // When got here, user input is not filled fully, so we need read data from cache. + err = nil + p = p[readCnt:] + beg := off - r.cacheOff + if beg < 0 { + // This happens when only partial data of user requested resides in r.cache. + beg = 0 + } + end := int(beg) + len(p) + if end > len(r.cache) { + err = io.EOF + end = len(r.cache) + } + readCnt += copy(p, r.cache[beg:end]) + return readCnt, err +} diff --git a/util/chunk/disk_test.go b/util/chunk/disk_test.go index 86461de5659c7..36750aa898244 100644 --- a/util/chunk/disk_test.go +++ b/util/chunk/disk_test.go @@ -14,12 +14,14 @@ package chunk import ( + "bytes" "fmt" "io" "io/ioutil" "math/rand" "os" "path/filepath" + "reflect" "strconv" "strings" "testing" @@ -30,6 +32,8 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/types/json" + "github.com/pingcap/tidb/util/checksum" + "github.com/pingcap/tidb/util/encrypt" ) func initChunks(numChk, numRow int) ([]*Chunk, []*types.FieldType) { @@ -219,6 +223,8 @@ func (s *testChunkSuite) TestListInDiskWithChecksum(c *check.C) { }) testListInDisk(c) + testReaderWithCache(c) + testReaderWithCacheNoFlush(c) } func (s *testChunkSuite) TestListInDiskWithChecksumAndEncrypt(c *check.C) { @@ -227,4 +233,129 @@ func (s *testChunkSuite) TestListInDiskWithChecksumAndEncrypt(c *check.C) { conf.Security.SpilledFileEncryptionMethod = config.SpilledFileEncryptionMethodAES128CTR }) testListInDisk(c) + + testReaderWithCache(c) + testReaderWithCacheNoFlush(c) +} + +// Following diagram describes the testdata we use to test: +// 4 B: checksum of this segment. +// 8 B: all columns' length, in the following example, we will only have one column. +// 1012 B: data in file. because max length of each segment is 1024, so we only have 1020B for user payload. +// +// Data in File Data in mem cache +// +------+------------------------------------------+ +-----------------------------+ +// | | 1020B payload | | | +// |4Bytes| +---------+----------------------------+ | | | +// |checksum|8B collen| 1012B user data | | | 12B remained user data | +// | | +---------+----------------------------+ | | | +// | | | | | +// +------+------------------------------------------+ +-----------------------------+ +func testReaderWithCache(c *check.C) { + testData := "0123456789" + buf := bytes.NewBuffer(nil) + for i := 0; i < 102; i++ { + buf.WriteString(testData) + } + buf.WriteString("0123") + + field := []*types.FieldType{types.NewFieldType(mysql.TypeString)} + chk := NewChunkWithCapacity(field, 1) + chk.AppendString(0, buf.String()) + l := NewListInDisk(field) + err := l.Add(chk) + c.Assert(err, check.IsNil) + + // Basic test for GetRow(). + row, err := l.GetRow(RowPtr{0, 0}) + c.Assert(err, check.IsNil) + c.Assert(row.GetDatumRow(field), check.DeepEquals, chk.GetRow(0).GetDatumRow(field)) + + var underlying io.ReaderAt = l.disk + if l.ctrCipher != nil { + underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset()) + } + checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset()) + + // Read all data. + data := make([]byte, 1024) + // Offset is 8, because we want to ignore col length. + readCnt, err := checksumReader.ReadAt(data, 8) + c.Assert(err, check.IsNil) + c.Assert(readCnt, check.Equals, 1024) + c.Assert(reflect.DeepEqual(data, buf.Bytes()), check.IsTrue) + + // Only read data of mem cache. + data = make([]byte, 1024) + readCnt, err = checksumReader.ReadAt(data, 1020) + c.Assert(err, check.Equals, io.EOF) + c.Assert(readCnt, check.Equals, 12) + c.Assert(reflect.DeepEqual(data[:12], buf.Bytes()[1012:]), check.IsTrue) + + // Read partial data of mem cache. + data = make([]byte, 1024) + readCnt, err = checksumReader.ReadAt(data, 1025) + c.Assert(err, check.Equals, io.EOF) + c.Assert(readCnt, check.Equals, 7) + c.Assert(reflect.DeepEqual(data[:7], buf.Bytes()[1017:]), check.IsTrue) + + // Read partial data from both file and mem cache. + data = make([]byte, 1024) + readCnt, err = checksumReader.ReadAt(data, 1010) + c.Assert(err, check.Equals, io.EOF) + c.Assert(readCnt, check.Equals, 22) + c.Assert(reflect.DeepEqual(data[:22], buf.Bytes()[1002:]), check.IsTrue) + + // Offset is too large, so no data is read. + data = make([]byte, 1024) + readCnt, err = checksumReader.ReadAt(data, 1032) + c.Assert(err, check.Equals, io.EOF) + c.Assert(readCnt, check.Equals, 0) + c.Assert(reflect.DeepEqual(data, make([]byte, 1024)), check.IsTrue) + + // Only read 1 byte from mem cache. + data = make([]byte, 1024) + readCnt, err = checksumReader.ReadAt(data, 1031) + c.Assert(err, check.Equals, io.EOF) + c.Assert(readCnt, check.Equals, 1) + c.Assert(reflect.DeepEqual(data[:1], buf.Bytes()[1023:]), check.IsTrue) + + // Test user requested data is small. + // Only request 10 bytes. + data = make([]byte, 10) + readCnt, err = checksumReader.ReadAt(data, 1010) + c.Assert(err, check.IsNil) + c.Assert(readCnt, check.Equals, 10) + c.Assert(reflect.DeepEqual(data, buf.Bytes()[1002:1012]), check.IsTrue) +} + +// Here we test situations where size of data is small, so no data is flushed to disk. +func testReaderWithCacheNoFlush(c *check.C) { + testData := "0123456789" + + field := []*types.FieldType{types.NewFieldType(mysql.TypeString)} + chk := NewChunkWithCapacity(field, 1) + chk.AppendString(0, testData) + l := NewListInDisk(field) + err := l.Add(chk) + c.Assert(err, check.IsNil) + + // Basic test for GetRow(). + row, err := l.GetRow(RowPtr{0, 0}) + c.Assert(err, check.IsNil) + c.Assert(row.GetDatumRow(field), check.DeepEquals, chk.GetRow(0).GetDatumRow(field)) + + var underlying io.ReaderAt = l.disk + if l.ctrCipher != nil { + underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset()) + } + checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset()) + + // Read all data. + data := make([]byte, 1024) + // Offset is 8, because we want to ignore col length. + readCnt, err := checksumReader.ReadAt(data, 8) + c.Assert(err, check.Equals, io.EOF) + c.Assert(readCnt, check.Equals, len(testData)) + c.Assert(reflect.DeepEqual(data[:10], []byte(testData)), check.IsTrue) } diff --git a/util/chunk/row_container_test.go b/util/chunk/row_container_test.go index feed2290f38b6..a39346e34ff80 100644 --- a/util/chunk/row_container_test.go +++ b/util/chunk/row_container_test.go @@ -113,6 +113,28 @@ func (r *rowContainerTestSuite) TestSpillAction(c *check.C) { rc.actionSpill.WaitForTest() c.Assert(err, check.IsNil) c.Assert(rc.AlreadySpilledSafeForTest(), check.Equals, true) + + // Read + resChk, err := rc.GetChunk(0) + c.Assert(err, check.IsNil) + c.Assert(resChk.NumRows(), check.Equals, chk.NumRows()) + for rowIdx := 0; rowIdx < resChk.NumRows(); rowIdx++ { + c.Assert(resChk.GetRow(rowIdx).GetDatumRow(fields), check.DeepEquals, chk.GetRow(rowIdx).GetDatumRow(fields)) + } + // Write again + err = rc.Add(chk) + rc.actionSpill.WaitForTest() + c.Assert(err, check.IsNil) + c.Assert(rc.AlreadySpilledSafeForTest(), check.Equals, true) + + // Read + resChk, err = rc.GetChunk(2) + c.Assert(err, check.IsNil) + c.Assert(resChk.NumRows(), check.Equals, chk.NumRows()) + for rowIdx := 0; rowIdx < resChk.NumRows(); rowIdx++ { + c.Assert(resChk.GetRow(rowIdx).GetDatumRow(fields), check.DeepEquals, chk.GetRow(rowIdx).GetDatumRow(fields)) + } + err = rc.Reset() c.Assert(err, check.IsNil) } diff --git a/util/encrypt/ase_layer.go b/util/encrypt/aes_layer.go similarity index 91% rename from util/encrypt/ase_layer.go rename to util/encrypt/aes_layer.go index 2bcea4373073f..a27d23da90fa6 100644 --- a/util/encrypt/ase_layer.go +++ b/util/encrypt/aes_layer.go @@ -71,11 +71,12 @@ func (ctr *CtrCipher) stream(counter uint64) cipher.Stream { // Writer implements an io.WriteCloser, it encrypt data using AES before writing to the underlying object. type Writer struct { - err error - w io.WriteCloser - n int - buf []byte - cipherStream cipher.Stream + err error + w io.WriteCloser + n int + buf []byte + cipherStream cipher.Stream + flushedUserDataCnt int64 } // NewWriter returns a new Writer which encrypt data using AES before writing to the underlying object. @@ -123,6 +124,7 @@ func (w *Writer) Flush() error { } w.cipherStream.XORKeyStream(w.buf[:w.n], w.buf[:w.n]) n, err := w.w.Write(w.buf[:w.n]) + w.flushedUserDataCnt += int64(n) if n < w.n && err == nil { err = io.ErrShortWrite } @@ -134,6 +136,16 @@ func (w *Writer) Flush() error { return nil } +// GetCache returns the byte slice that holds the data not flushed to disk. +func (w *Writer) GetCache() []byte { + return w.buf[:w.n] +} + +// GetCacheDataOffset return the user data offset in cache. +func (w *Writer) GetCacheDataOffset() int64 { + return w.flushedUserDataCnt +} + // Close implements the io.Closer interface. func (w *Writer) Close() (err error) { err = w.Flush() diff --git a/util/mock/context.go b/util/mock/context.go index d6a5f1d913902..d23124e555ea2 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -26,7 +26,7 @@ import ( "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/kvcache" @@ -204,7 +204,7 @@ func (c *Context) InitTxnWithStartTS(startTS uint64) error { return nil } if c.Store != nil { - txn, err := c.Store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(oracle.GlobalTxnScope).SetStartTs(startTS)) + txn, err := c.Store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(kv.GlobalTxnScope).SetStartTs(startTS)) if err != nil { return errors.Trace(err) } diff --git a/util/mock/store.go b/util/mock/store.go index 7c86de4b3cb72..3adba59e115e5 100644 --- a/util/mock/store.go +++ b/util/mock/store.go @@ -17,6 +17,7 @@ import ( "context" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" ) @@ -38,7 +39,7 @@ func (s *Store) GetOracle() oracle.Oracle { return nil } func (s *Store) Begin() (kv.Transaction, error) { return nil, nil } // BeginWithOption implements kv.Storage interface. -func (s *Store) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) { +func (s *Store) BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) { return s.Begin() } diff --git a/util/testkit/testkit.go b/util/testkit/testkit.go index 7cacaf211375e..ef0a0858f76cf 100644 --- a/util/testkit/testkit.go +++ b/util/testkit/testkit.go @@ -271,6 +271,21 @@ func (tk *TestKit) MustPartition(sql string, partitions string, args ...interfac return tk.MustQuery(sql, args...) } +// UsedPartitions returns the partition names that will be used or all/dual. +func (tk *TestKit) UsedPartitions(sql string, args ...interface{}) *Result { + rs := tk.MustQuery("explain "+sql, args...) + var usedPartitions [][]string + for i := range rs.rows { + index := strings.Index(rs.rows[i][3], "partition:") + if index != -1 { + p := rs.rows[i][3][index+len("partition:"):] + partitions := strings.Split(strings.SplitN(p, " ", 2)[0], ",") + usedPartitions = append(usedPartitions, partitions) + } + } + return &Result{rows: usedPartitions, c: tk.c, comment: check.Commentf("sql:%s, args:%v", sql, args)} +} + // MustUseIndex checks if the result execution plan contains specific index(es). func (tk *TestKit) MustUseIndex(sql string, index string, args ...interface{}) bool { rs := tk.MustQuery("explain "+sql, args...) @@ -312,6 +327,19 @@ func (tk *TestKit) MustQuery(sql string, args ...interface{}) *Result { return tk.ResultSetToResult(rs, comment) } +// MayQuery query the statements and returns result rows if result set is returned. +// If expected result is set it asserts the query result equals expected result. +func (tk *TestKit) MayQuery(sql string, args ...interface{}) *Result { + comment := check.Commentf("sql:%s, args:%v", sql, args) + rs, err := tk.Exec(sql, args...) + tk.c.Assert(errors.ErrorStack(err), check.Equals, "", comment) + if rs == nil { + var emptyStringAoA [][]string + return &Result{rows: emptyStringAoA, c: tk.c, comment: comment} + } + return tk.ResultSetToResult(rs, comment) +} + // QueryToErr executes a sql statement and discard results. func (tk *TestKit) QueryToErr(sql string, args ...interface{}) error { comment := check.Commentf("sql:%s, args:%v", sql, args)