Skip to content

Commit

Permalink
Merge branch 'master' into fix-split-corner-case
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Mar 12, 2021
2 parents 4bf4c82 + 035c15f commit 81afc8e
Show file tree
Hide file tree
Showing 23 changed files with 358 additions and 72 deletions.
2 changes: 1 addition & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// DispatchMPPTasks dispathes all tasks and returns an iterator.
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int) (SelectResult, error) {
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, tasks)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks)
if resp == nil {
err := errors.New("client returns nil response")
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (e *HashAggExec) Close() error {
e.childResult = nil
e.groupSet, _ = set.NewStringSetWithMemoryUsage()
e.partialResultMap = nil
e.memTracker.ReplaceBytesUsed(0)
return e.baseExecutor.Close()
}
// `Close` may be called after `Open` without calling `Next` in test.
Expand All @@ -255,6 +256,7 @@ func (e *HashAggExec) Close() error {
for range e.finalOutputCh {
}
e.executed = false
e.memTracker.ReplaceBytesUsed(0)
return e.baseExecutor.Close()
}

Expand Down
1 change: 1 addition & 0 deletions executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.M
Timeout: 10,
SchemaVar: e.is.SchemaMetaVersion(),
StartTs: e.startTS,
State: kv.MppTaskReady,
}
e.mppReqs = append(e.mppReqs, req)
}
Expand Down
25 changes: 15 additions & 10 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,16 +518,21 @@ func (s *testSerialSuite1) TestSetVar(c *C) {
tk.MustExec("SET GLOBAL tidb_enable_extended_stats = off")
tk.MustQuery("select @@global.tidb_enable_extended_stats").Check(testkit.Rows("0"))

tk.MustExec("SET SESSION tidb_enable_tiflash_fallback_tikv = on")
tk.MustQuery("select @@session.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("1"))
tk.MustExec("SET SESSION tidb_enable_tiflash_fallback_tikv = off")
tk.MustQuery("select @@session.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("0"))
tk.MustExec("SET GLOBAL tidb_enable_tiflash_fallback_tikv = on")
tk.MustQuery("select @@global.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("1"))
tk.MustExec("SET GLOBAL tidb_enable_tiflash_fallback_tikv = off")
tk.MustQuery("select @@global.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("0"))
c.Assert(tk.ExecToErr("SET SESSION tidb_enable_tiflash_fallback_tikv = 123"), NotNil)
c.Assert(tk.ExecToErr("SET GLOBAL tidb_enable_tiflash_fallback_tikv = 321"), NotNil)
tk.MustExec("SET SESSION tidb_allow_fallback_to_tikv = 'tiflash'")
tk.MustQuery("select @@session.tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash"))
tk.MustExec("SET SESSION tidb_allow_fallback_to_tikv = ''")
tk.MustQuery("select @@session.tidb_allow_fallback_to_tikv").Check(testkit.Rows(""))
tk.MustExec("SET GLOBAL tidb_allow_fallback_to_tikv = 'tiflash'")
tk.MustQuery("select @@global.tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash"))
tk.MustExec("SET GLOBAL tidb_allow_fallback_to_tikv = ''")
tk.MustQuery("select @@global.tidb_allow_fallback_to_tikv").Check(testkit.Rows(""))
tk.MustExec("set @@tidb_allow_fallback_to_tikv = 'tiflash, tiflash, tiflash'")
tk.MustQuery("select @@tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash"))

tk.MustGetErrMsg("SET SESSION tidb_allow_fallback_to_tikv = 'tikv,tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tikv,tiflash'")
tk.MustGetErrMsg("SET GLOBAL tidb_allow_fallback_to_tikv = 'tikv,tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tikv,tiflash'")
tk.MustGetErrMsg("set @@tidb_allow_fallback_to_tikv = 'tidb, tiflash, tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tidb, tiflash, tiflash'")
tk.MustGetErrMsg("set @@tidb_allow_fallback_to_tikv = 'unknown, tiflash, tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'unknown, tiflash, tiflash'")

// Test issue #22145
tk.MustExec(`set global sync_relay_log = "'"`)
Expand Down
79 changes: 79 additions & 0 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,25 @@ package executor_test

import (
"fmt"
"sync"
"sync/atomic"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
)

type tiflashTestSuite struct {
Expand All @@ -36,6 +43,7 @@ type tiflashTestSuite struct {
}

func (s *tiflashTestSuite) SetUpSuite(c *C) {
testleak.BeforeTest()
var err error
s.store, err = mockstore.NewMockStore(
mockstore.WithClusterInspector(func(c cluster.Cluster) {
Expand Down Expand Up @@ -271,3 +279,74 @@ func (s *tiflashTestSuite) TestPartitionTable(c *C) {
failpoint.Disable("github.com/pingcap/tidb/executor/checkTotalMPPTasks")
failpoint.Disable("github.com/pingcap/tidb/executor/checkUseMPP")
}

func (s *tiflashTestSuite) TestCancelMppTasks(c *C) {
var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang"
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int not null primary key, b int not null)")
tk.MustExec("alter table t set tiflash replica 1")
tk.MustExec("insert into t values(1,0)")
tk.MustExec("insert into t values(2,0)")
tk.MustExec("insert into t values(3,0)")
tk.MustExec("insert into t values(4,0)")
tb := testGetTableByName(c, tk.Se, "test", "t")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
tk.MustExec("set @@session.tidb_allow_mpp=ON")
atomic.StoreUint32(&tk.Se.GetSessionVars().Killed, 0)
c.Assert(failpoint.Enable(hang, `return(true)`), IsNil)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := tk.QueryToErr("select count(*) from t as t1 , t where t1.a = t.a")
c.Assert(err, NotNil)
c.Assert(int(terror.ToSQLError(errors.Cause(err).(*terror.Error)).Code), Equals, int(executor.ErrQueryInterrupted.Code()))
}()
time.Sleep(1 * time.Second)
atomic.StoreUint32(&tk.Se.GetSessionVars().Killed, 1)
wg.Wait()
c.Assert(failpoint.Disable(hang), IsNil)
}

// all goroutines exit if one goroutine hangs but another return errors
func (s *tiflashTestSuite) TestMppGoroutinesExitFromErrors(c *C) {
defer testleak.AfterTest(c)()
// mock non-root tasks return error
var mppNonRootTaskError = "github.com/pingcap/tidb/store/copr/mppNonRootTaskError"
// mock root tasks hang
var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang"
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int not null primary key, b int not null)")
tk.MustExec("alter table t set tiflash replica 1")
tb := testGetTableByName(c, tk.Se, "test", "t")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("insert into t values(1,0)")
tk.MustExec("insert into t values(2,0)")
tk.MustExec("insert into t values(3,0)")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int not null primary key, b int not null)")
tk.MustExec("alter table t1 set tiflash replica 1")
tb = testGetTableByName(c, tk.Se, "test", "t1")
err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("insert into t1 values(1,0)")
tk.MustExec("insert into t1 values(2,0)")
tk.MustExec("insert into t1 values(3,0)")
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
tk.MustExec("set @@session.tidb_allow_mpp=ON")
c.Assert(failpoint.Enable(mppNonRootTaskError, `return(true)`), IsNil)
c.Assert(failpoint.Enable(hang, `return(true)`), IsNil)

// generate 2 root tasks, one will hang and another will return errors
err = tk.QueryToErr("select count(*) from t as t1 , t where t1.a = t.a")
c.Assert(err, NotNil)
c.Assert(failpoint.Disable(mppNonRootTaskError), IsNil)
c.Assert(failpoint.Disable(hang), IsNil)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/kr/text v0.2.0 // indirect
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef
github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb
github.com/ngaut/unistore v0.0.0-20210310131351-7ad6a204de87
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.1.0
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 h1:7KAv7KMGTTqSmYZtNdc
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8n5lC/jjNEhwcjbRDpl5NT7n2h+4UNcI=
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k=
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8=
github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb h1:2rGvEhflp/uK1l1rNUmoHA4CiHpbddHGxg52H71Fke8=
github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4=
github.com/ngaut/unistore v0.0.0-20210310131351-7ad6a204de87 h1:lVRrhmqIT2zMbmoalrgxQLwWzFd3VtFaaWy0fnMwPro=
github.com/ngaut/unistore v0.0.0-20210310131351-7ad6a204de87/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4=
github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
Expand Down Expand Up @@ -844,6 +844,7 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o=
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
Expand Down
17 changes: 16 additions & 1 deletion kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ func (t *MPPTask) ToPB() *mpp.TaskMeta {
return meta
}

//MppTaskStates denotes the state of mpp tasks
type MppTaskStates uint8

const (
// MppTaskReady means the task is ready
MppTaskReady MppTaskStates = iota
// MppTaskRunning means the task is running
MppTaskRunning
// MppTaskCancelled means the task is cancelled
MppTaskCancelled
// MppTaskDone means the task is done
MppTaskDone
)

// MPPDispatchRequest stands for a dispatching task.
type MPPDispatchRequest struct {
Data []byte // data encodes the dag coprocessor request.
Expand All @@ -55,6 +69,7 @@ type MPPDispatchRequest struct {
SchemaVar int64
StartTs uint64
ID int64 // identify a single task
State MppTaskStates
}

// MPPClient accepts and processes mpp requests.
Expand All @@ -64,7 +79,7 @@ type MPPClient interface {
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(context.Context, []*MPPDispatchRequest) Response
DispatchMPPTasks(context.Context, *Variables, []*MPPDispatchRequest) Response
}

// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment.
Expand Down
14 changes: 10 additions & 4 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1495,7 +1495,8 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
}
retryable, err = cc.handleStmt(ctx, stmt, parserWarns, i == len(stmts)-1)
if err != nil {
if cc.ctx.GetSessionVars().EnableTiFlashFallbackTiKV && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable {
_, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
if allowTiFlashFallback && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable {
// When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash
// server and fallback to TiKV.
warns := append(parserWarns, stmtctx.SQLWarn{Level: stmtctx.WarnLevelError, Err: err})
Expand Down Expand Up @@ -1614,7 +1615,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return pointPlans, nil
}

// The first return value indicates whether the call of handleStmt has no side effect and can be retried to correct error.
// The first return value indicates whether the call of handleStmt has no side effect and can be retried.
// Currently the first return value is used to fallback to TiKV when TiFlash is down.
func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns []stmtctx.SQLWarn, lastStmt bool) (bool, error) {
ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{})
Expand Down Expand Up @@ -1792,9 +1793,14 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool
}

for {
failpoint.Inject("secondNextErr", func(value failpoint.Value) {
if value.(bool) && !firstNext {
failpoint.Inject("fetchNextErr", func(value failpoint.Value) {
switch value.(string) {
case "firstNext":
failpoint.Return(firstNext, tikv.ErrTiFlashServerTimeout)
case "secondNext":
if !firstNext {
failpoint.Return(firstNext, tikv.ErrTiFlashServerTimeout)
}
}
})
// Here server.tidbResultSet implements Next method.
Expand Down
7 changes: 4 additions & 3 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
}
ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{})
retryable, err := cc.executePreparedStmtAndWriteResult(ctx, stmt, args, useCursor)
if cc.ctx.GetSessionVars().EnableTiFlashFallbackTiKV && err != nil && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable {
_, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
if allowTiFlashFallback && err != nil && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable {
// When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash
// server and fallback to TiKV.
prevErr := err
Expand All @@ -210,8 +211,8 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
return err
}

// The first return value indicates whether the call of executePreparedStmtAndWriteResult has no side effect and can be retried
// to correct error. Currently the first return value is used to fallback to TiKV when TiFlash is down.
// The first return value indicates whether the call of executePreparedStmtAndWriteResult has no side effect and can be retried.
// Currently the first return value is used to fallback to TiKV when TiFlash is down.
func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stmt PreparedStatement, args []types.Datum, useCursor bool) (bool, error) {
rs, err := stmt.Execute(ctx, args)
if err != nil {
Expand Down
20 changes: 13 additions & 7 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,22 +761,28 @@ func (ts *ConnTestSuite) TestTiFlashFallback(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"), IsNil)
// test COM_STMT_EXECUTE
ctx := context.Background()
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1")
tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'")
c.Assert(cc.handleStmtPrepare(ctx, "select sum(a) from t"), IsNil)
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), IsNil)
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
// test COM_STMT_FETCH (cursor mode)
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0}), IsNil)
c.Assert(cc.handleStmtFetch(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), NotNil)
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=0")
tk.MustExec("set @@tidb_allow_fallback_to_tikv=''")
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), NotNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0"), IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/fetchNextErr", "return(\"firstNext\")"), IsNil)
// test COM_STMT_EXECUTE (cursor mode)
tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'")
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0}), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/fetchNextErr"), IsNil)

// test that TiDB would not retry if the first execution already sends data to client
c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/secondNextErr", "return(true)"), IsNil)
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/fetchNextErr", "return(\"secondNext\")"), IsNil)
tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'")
c.Assert(cc.handleQuery(ctx, "select * from t t1 join t t2 on t1.a = t2.a"), NotNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/secondNextErr"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/fetchNextErr"), IsNil)

// simple TiFlash query (unary + non-streaming)
tk.MustExec("set @@tidb_allow_batch_cop=0; set @@tidb_allow_mpp=0;")
Expand Down Expand Up @@ -809,9 +815,9 @@ func (ts *ConnTestSuite) TestTiFlashFallback(c *C) {

func testFallbackWork(c *C, tk *testkit.TestKit, cc *clientConn, sql string) {
ctx := context.Background()
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=0")
tk.MustExec("set @@tidb_allow_fallback_to_tikv=''")
c.Assert(tk.QueryToErr(sql), NotNil)
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1")
tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'")

c.Assert(cc.handleQuery(ctx, sql), IsNil)
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
Expand Down
12 changes: 11 additions & 1 deletion session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,11 @@ const (
version64 = 64
// version65 add mysql.stats_fm_sketch table.
version65 = 65
// version66 enables the feature `track_aggregate_memory_usage` by default.
version66 = 66

// please make sure this is the largest version
currentBootstrapVersion = version65
currentBootstrapVersion = version66
)

var (
Expand Down Expand Up @@ -546,6 +548,7 @@ var (
upgradeToVer63,
upgradeToVer64,
upgradeToVer65,
upgradeToVer66,
}
)

Expand Down Expand Up @@ -1448,6 +1451,13 @@ func upgradeToVer65(s Session, ver int64) {
doReentrantDDL(s, CreateStatsFMSketchTable)
}

func upgradeToVer66(s Session, ver int64) {
if ver >= version66 {
return
}
mustExecute(s, "set @@global.tidb_track_aggregate_memory_usage = 1")
}

func writeOOMAction(s Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
Expand Down
Loading

0 comments on commit 81afc8e

Please sign in to comment.