Skip to content

Commit

Permalink
merge master && fix conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: shirly <AndreMouche@126.com>
  • Loading branch information
AndreMouche committed May 21, 2021
2 parents 3ce3741 + bc7f182 commit 39f4ed1
Show file tree
Hide file tree
Showing 36 changed files with 4,734 additions and 360 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ In addition, you may enjoy following:
- [@PingCAP](https://twitter.com/PingCAP) on Twitter
- Question tagged [#tidb on StackOverflow](https://stackoverflow.com/questions/tagged/tidb)
- The PingCAP Team [English Blog](https://en.pingcap.com/blog) and [Chinese Blog](https://pingcap.com/blog-cn/)
- [TiDB Monthly](https://pingcap.com/weekly/)

For support, please contact [PingCAP](http://bit.ly/contact_us_via_github).

Expand Down
8 changes: 5 additions & 3 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,12 +1293,14 @@ func (s *testSuiteAgg) TestIssue20658(c *C) {
tk.MustExec("CREATE TABLE t(a bigint, b bigint);")
tk.MustExec("set tidb_init_chunk_size=1;")
tk.MustExec("set tidb_max_chunk_size=32;")
randSeed := time.Now().UnixNano()
r := rand.New(rand.NewSource(randSeed))
var insertSQL string
for i := 0; i < 1000; i++ {
if i == 0 {
insertSQL += fmt.Sprintf("(%d, %d)", rand.Intn(100), rand.Intn(100))
insertSQL += fmt.Sprintf("(%d, %d)", r.Intn(100), r.Intn(100))
} else {
insertSQL += fmt.Sprintf(",(%d, %d)", rand.Intn(100), rand.Intn(100))
insertSQL += fmt.Sprintf(",(%d, %d)", r.Intn(100), r.Intn(100))
}
}
tk.MustExec(fmt.Sprintf("insert into t values %s;", insertSQL))
Expand All @@ -1307,7 +1309,7 @@ func (s *testSuiteAgg) TestIssue20658(c *C) {
for _, sql := range sqls {
var expected [][]interface{}
for _, con := range concurrencies {
comment := Commentf("sql: %s; concurrency: %d", sql, con)
comment := Commentf("sql: %s; concurrency: %d, seed: ", sql, con, randSeed)
tk.MustExec(fmt.Sprintf("set @@tidb_streamagg_concurrency=%d;", con))
if con == 1 {
expected = tk.MustQuery(sql).Sort().Rows()
Expand Down
9 changes: 5 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,10 +706,11 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {
base := newBaseExecutor(b.ctx, v.Schema(), v.ID())
base.initCap = chunk.ZeroCapacity
e := &SimpleExec{
baseExecutor: base,
Statement: v.Statement,
IsFromRemote: v.IsFromRemote,
is: b.is,
baseExecutor: base,
Statement: v.Statement,
IsFromRemote: v.IsFromRemote,
is: b.is,
StalenessTxnOption: v.StalenessTxnOption,
}
return e
}
Expand Down
7 changes: 4 additions & 3 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3067,9 +3067,10 @@ func (s *testSerialSuite) TestTiDBLastTxnInfoCommitMode(c *C) {
c.Assert(rows[0][1], Equals, "false")
c.Assert(rows[0][2], Equals, "false")

config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.SafeWindow = 0
})
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/invalidMaxCommitTS", "return"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/invalidMaxCommitTS"), IsNil)
}()

tk.MustExec("set @@tidb_enable_async_commit = 1")
tk.MustExec("set @@tidb_enable_1pc = 0")
Expand Down
3 changes: 3 additions & 0 deletions executor/memtable_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ func fetchClusterConfig(sctx sessionctx.Context, nodeTypes, nodeAddrs set.String
url = fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), statusAddr, pdapi.Config)
case "tikv", "tidb":
url = fmt.Sprintf("%s://%s/config", util.InternalHTTPSchema(), statusAddr)
case "tiflash":
// TODO: support show tiflash config once tiflash supports it
return
default:
ch <- result{err: errors.Errorf("unknown node type: %s(%s)", typ, address)}
return
Expand Down
2 changes: 1 addition & 1 deletion executor/memtable_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (s *testMemTableReaderSuite) TestTiDBClusterConfig(c *C) {

// mock servers
servers := []string{}
for _, typ := range []string{"tidb", "tikv", "pd"} {
for _, typ := range []string{"tidb", "tikv", "tiflash", "pd"} {
for _, server := range testServers {
servers = append(servers, strings.Join([]string{typ, server.address, server.address}, ","))
}
Expand Down
46 changes: 46 additions & 0 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1829,3 +1829,49 @@ PARTITION BY RANGE (a) (
s.testData.GetTestCases(c, &input, &output)
s.verifyPartitionResult(tk, input, output)
}

func (s *testSuiteWithData) TestRangePartitionBoundariesLtM(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'")
tk.MustExec("create database TestRangePartitionBoundariesLtM")
defer tk.MustExec("drop database TestRangePartitionBoundariesLtM")
tk.MustExec("use TestRangePartitionBoundariesLtM")
tk.MustExec("drop table if exists t")
tk.MustExec(`CREATE TABLE t
(a INT, b varchar(255))
PARTITION BY RANGE (a) (
PARTITION p0 VALUES LESS THAN (1000000),
PARTITION p1 VALUES LESS THAN (2000000),
PARTITION p2 VALUES LESS THAN (3000000))`)

var input []string
var output []testOutput
s.testData.GetTestCases(c, &input, &output)
s.verifyPartitionResult(tk, input, output)
}

func (s *testSuiteWithData) TestRangePartitionBoundariesLtS(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'")
tk.MustExec("create database TestRangePartitionBoundariesLtS")
defer tk.MustExec("drop database TestRangePartitionBoundariesLtS")
tk.MustExec("use TestRangePartitionBoundariesLtS")
tk.MustExec("drop table if exists t")
tk.MustExec(`CREATE TABLE t
(a INT, b varchar(255))
PARTITION BY RANGE (a) (
PARTITION p0 VALUES LESS THAN (1),
PARTITION p1 VALUES LESS THAN (2),
PARTITION p2 VALUES LESS THAN (3),
PARTITION p3 VALUES LESS THAN (4),
PARTITION p4 VALUES LESS THAN (5),
PARTITION p5 VALUES LESS THAN (6),
PARTITION p6 VALUES LESS THAN (7))`)

var input []string
var output []testOutput
s.testData.GetTestCases(c, &input, &output)
s.verifyPartitionResult(tk, input, output)
}
32 changes: 26 additions & 6 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type SimpleExec struct {
IsFromRemote bool
done bool
is infoschema.InfoSchema

// StalenessTxnOption is used to execute the staleness txn during a read-only begin statement.
StalenessTxnOption *sessionctx.StalenessTxnOption
}

func (e *baseExecutor) getSysSession() (sessionctx.Context, error) {
Expand Down Expand Up @@ -566,13 +569,16 @@ func (e *SimpleExec) executeUse(s *ast.UseStmt) error {
}

func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
// If `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND` is the first statement in TxnCtx, we should
// If `START TRANSACTION READ ONLY` is the first statement in TxnCtx, we should
// always create a new Txn instead of reusing it.
if s.ReadOnly {
enableNoopFuncs := e.ctx.GetSessionVars().EnableNoopFuncs
if !enableNoopFuncs && s.Bound == nil {
if !enableNoopFuncs && s.AsOf == nil && s.Bound == nil {
return expression.ErrFunctionsNoopImpl.GenWithStackByArgs("READ ONLY")
}
if s.AsOf != nil {
return e.executeStartTransactionReadOnlyWithBoundedStaleness(ctx, s)
}
if s.Bound != nil {
return e.executeStartTransactionReadOnlyWithTimestampBound(ctx, s)
}
Expand Down Expand Up @@ -614,6 +620,22 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
return nil
}

func (e *SimpleExec) executeStartTransactionReadOnlyWithBoundedStaleness(ctx context.Context, s *ast.BeginStmt) error {
if e.StalenessTxnOption == nil {
return errors.New("Failed to get timestamp during start transaction read only as of timestamp")
}
if err := e.ctx.NewTxnWithStalenessOption(ctx, *e.StalenessTxnOption); err != nil {
return err
}

// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
e.ctx.GetSessionVars().SetInTxn(true)
return nil
}

// TODO: deprecate this syntax and only keep `AS OF TIMESTAMP` statement.
func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx context.Context, s *ast.BeginStmt) error {
opt := sessionctx.StalenessTxnOption{}
opt.Mode = s.Bound.Mode
Expand All @@ -632,8 +654,7 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte
if err != nil {
return err
}
startTS := oracle.GoTimeToTS(gt)
opt.StartTS = startTS
opt.StartTS = oracle.GoTimeToTS(gt)
case ast.TimestampBoundExactStaleness:
// TODO: support funcCallExpr in future
v, ok := s.Bound.Timestamp.(*driver.ValueExpr)
Expand Down Expand Up @@ -668,8 +689,7 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte
if err != nil {
return err
}
startTS := oracle.GoTimeToTS(gt)
opt.StartTS = startTS
opt.StartTS = oracle.GoTimeToTS(gt)
}
err := e.ctx.NewTxnWithStalenessOption(ctx, opt)
if err != nil {
Expand Down
Loading

0 comments on commit 39f4ed1

Please sign in to comment.