diff --git a/executor/set_test.go b/executor/set_test.go index 5d90923995f0c..b55ed2328af74 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -518,16 +518,21 @@ func (s *testSerialSuite1) TestSetVar(c *C) { tk.MustExec("SET GLOBAL tidb_enable_extended_stats = off") tk.MustQuery("select @@global.tidb_enable_extended_stats").Check(testkit.Rows("0")) - tk.MustExec("SET SESSION tidb_enable_tiflash_fallback_tikv = on") - tk.MustQuery("select @@session.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("1")) - tk.MustExec("SET SESSION tidb_enable_tiflash_fallback_tikv = off") - tk.MustQuery("select @@session.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("0")) - tk.MustExec("SET GLOBAL tidb_enable_tiflash_fallback_tikv = on") - tk.MustQuery("select @@global.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("1")) - tk.MustExec("SET GLOBAL tidb_enable_tiflash_fallback_tikv = off") - tk.MustQuery("select @@global.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("0")) - c.Assert(tk.ExecToErr("SET SESSION tidb_enable_tiflash_fallback_tikv = 123"), NotNil) - c.Assert(tk.ExecToErr("SET GLOBAL tidb_enable_tiflash_fallback_tikv = 321"), NotNil) + tk.MustExec("SET SESSION tidb_allow_fallback_to_tikv = 'tiflash'") + tk.MustQuery("select @@session.tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash")) + tk.MustExec("SET SESSION tidb_allow_fallback_to_tikv = ''") + tk.MustQuery("select @@session.tidb_allow_fallback_to_tikv").Check(testkit.Rows("")) + tk.MustExec("SET GLOBAL tidb_allow_fallback_to_tikv = 'tiflash'") + tk.MustQuery("select @@global.tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash")) + tk.MustExec("SET GLOBAL tidb_allow_fallback_to_tikv = ''") + tk.MustQuery("select @@global.tidb_allow_fallback_to_tikv").Check(testkit.Rows("")) + tk.MustExec("set @@tidb_allow_fallback_to_tikv = 'tiflash, tiflash, tiflash'") + tk.MustQuery("select @@tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash")) + + tk.MustGetErrMsg("SET SESSION tidb_allow_fallback_to_tikv = 'tikv,tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tikv,tiflash'") + tk.MustGetErrMsg("SET GLOBAL tidb_allow_fallback_to_tikv = 'tikv,tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tikv,tiflash'") + tk.MustGetErrMsg("set @@tidb_allow_fallback_to_tikv = 'tidb, tiflash, tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tidb, tiflash, tiflash'") + tk.MustGetErrMsg("set @@tidb_allow_fallback_to_tikv = 'unknown, tiflash, tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'unknown, tiflash, tiflash'") // Test issue #22145 tk.MustExec(`set global sync_relay_log = "'"`) diff --git a/server/conn.go b/server/conn.go index 04e099b875c96..b95f77cc06503 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1495,7 +1495,8 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) { } retryable, err = cc.handleStmt(ctx, stmt, parserWarns, i == len(stmts)-1) if err != nil { - if cc.ctx.GetSessionVars().EnableTiFlashFallbackTiKV && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable { + _, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash] + if allowTiFlashFallback && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable { // When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash // server and fallback to TiKV. warns := append(parserWarns, stmtctx.SQLWarn{Level: stmtctx.WarnLevelError, Err: err}) @@ -1614,7 +1615,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm return pointPlans, nil } -// The first return value indicates whether the call of handleStmt has no side effect and can be retried to correct error. +// The first return value indicates whether the call of handleStmt has no side effect and can be retried. // Currently the first return value is used to fallback to TiKV when TiFlash is down. func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns []stmtctx.SQLWarn, lastStmt bool) (bool, error) { ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{}) @@ -1792,9 +1793,14 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool } for { - failpoint.Inject("secondNextErr", func(value failpoint.Value) { - if value.(bool) && !firstNext { + failpoint.Inject("fetchNextErr", func(value failpoint.Value) { + switch value.(string) { + case "firstNext": failpoint.Return(firstNext, tikv.ErrTiFlashServerTimeout) + case "secondNext": + if !firstNext { + failpoint.Return(firstNext, tikv.ErrTiFlashServerTimeout) + } } }) // Here server.tidbResultSet implements Next method. diff --git a/server/conn_stmt.go b/server/conn_stmt.go index 762a5be52cf39..de2d3a074e1cd 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -195,7 +195,8 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e } ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{}) retryable, err := cc.executePreparedStmtAndWriteResult(ctx, stmt, args, useCursor) - if cc.ctx.GetSessionVars().EnableTiFlashFallbackTiKV && err != nil && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable { + _, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash] + if allowTiFlashFallback && err != nil && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable { // When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash // server and fallback to TiKV. prevErr := err @@ -210,8 +211,8 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e return err } -// The first return value indicates whether the call of executePreparedStmtAndWriteResult has no side effect and can be retried -// to correct error. Currently the first return value is used to fallback to TiKV when TiFlash is down. +// The first return value indicates whether the call of executePreparedStmtAndWriteResult has no side effect and can be retried. +// Currently the first return value is used to fallback to TiKV when TiFlash is down. func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stmt PreparedStatement, args []types.Datum, useCursor bool) (bool, error) { rs, err := stmt.Execute(ctx, args) if err != nil { diff --git a/server/conn_test.go b/server/conn_test.go index 2aab4e0d3733e..6add06c0369a8 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -761,22 +761,28 @@ func (ts *ConnTestSuite) TestTiFlashFallback(c *C) { c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"), IsNil) // test COM_STMT_EXECUTE ctx := context.Background() - tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1") + tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'") c.Assert(cc.handleStmtPrepare(ctx, "select sum(a) from t"), IsNil) c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), IsNil) tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout")) // test COM_STMT_FETCH (cursor mode) c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0}), IsNil) c.Assert(cc.handleStmtFetch(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), NotNil) - tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=0") + tk.MustExec("set @@tidb_allow_fallback_to_tikv=''") c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), NotNil) c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0"), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/fetchNextErr", "return(\"firstNext\")"), IsNil) + // test COM_STMT_EXECUTE (cursor mode) + tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'") + c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0}), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/fetchNextErr"), IsNil) + // test that TiDB would not retry if the first execution already sends data to client - c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/secondNextErr", "return(true)"), IsNil) - tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1") + c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/fetchNextErr", "return(\"secondNext\")"), IsNil) + tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'") c.Assert(cc.handleQuery(ctx, "select * from t t1 join t t2 on t1.a = t2.a"), NotNil) - c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/secondNextErr"), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/fetchNextErr"), IsNil) // simple TiFlash query (unary + non-streaming) tk.MustExec("set @@tidb_allow_batch_cop=0; set @@tidb_allow_mpp=0;") @@ -809,9 +815,9 @@ func (ts *ConnTestSuite) TestTiFlashFallback(c *C) { func testFallbackWork(c *C, tk *testkit.TestKit, cc *clientConn, sql string) { ctx := context.Background() - tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=0") + tk.MustExec("set @@tidb_allow_fallback_to_tikv=''") c.Assert(tk.QueryToErr(sql), NotNil) - tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1") + tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'") c.Assert(cc.handleQuery(ctx, sql), IsNil) tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout")) diff --git a/session/session.go b/session/session.go index 365d06ed47dc2..becca9333974d 100644 --- a/session/session.go +++ b/session/session.go @@ -2496,7 +2496,7 @@ var builtinGlobalVariable = []string{ variable.TiDBTrackAggregateMemoryUsage, variable.TiDBMultiStatementMode, variable.TiDBEnableExchangePartition, - variable.TiDBEnableTiFlashFallbackTiKV, + variable.TiDBAllowFallbackToTiKV, } // loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session. diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 9b96ce9979c14..1f939a607ac63 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -815,8 +815,9 @@ type SessionVars struct { // TiDBEnableExchangePartition indicates whether to enable exchange partition TiDBEnableExchangePartition bool - // EnableTiFlashFallbackTiKV indicates whether to fallback to TiKV when TiFlash is unavailable. - EnableTiFlashFallbackTiKV bool + // AllowFallbackToTiKV indicates the engine types whose unavailability triggers fallback to TiKV. + // Now we only support TiFlash. + AllowFallbackToTiKV map[kv.StoreType]struct{} } // CheckAndGetTxnScope will return the transaction scope we should use in the current session. @@ -997,7 +998,7 @@ func NewSessionVars() *SessionVars { GuaranteeLinearizability: DefTiDBGuaranteeLinearizability, AnalyzeVersion: DefTiDBAnalyzeVersion, EnableIndexMergeJoin: DefTiDBEnableIndexMergeJoin, - EnableTiFlashFallbackTiKV: DefTiDBEnableTiFlashFallbackTiKV, + AllowFallbackToTiKV: make(map[kv.StoreType]struct{}), } vars.KVVars = kv.NewVariables(&vars.Killed) vars.Concurrency = Concurrency{ @@ -1749,8 +1750,14 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.MultiStatementMode = TiDBOptMultiStmt(val) case TiDBEnableExchangePartition: s.TiDBEnableExchangePartition = TiDBOptOn(val) - case TiDBEnableTiFlashFallbackTiKV: - s.EnableTiFlashFallbackTiKV = TiDBOptOn(val) + case TiDBAllowFallbackToTiKV: + s.AllowFallbackToTiKV = make(map[kv.StoreType]struct{}) + for _, engine := range strings.Split(val, ",") { + switch engine { + case kv.TiFlash.Name(): + s.AllowFallbackToTiKV[kv.TiFlash] = struct{}{} + } + } } s.systems[name] = val return nil diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index efced6ec1bd62..c8aae167e6b34 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -670,7 +670,30 @@ var defaultSysVars = []*SysVar{ {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableFastAnalyze, Value: BoolToOnOff(DefTiDBUseFastAnalyze), Type: TypeBool}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBSkipIsolationLevelCheck, Value: BoolToOnOff(DefTiDBSkipIsolationLevelCheck), Type: TypeBool}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableRateLimitAction, Value: BoolToOnOff(DefTiDBEnableRateLimitAction), Type: TypeBool}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableTiFlashFallbackTiKV, Value: BoolToOnOff(DefTiDBEnableTiFlashFallbackTiKV), Type: TypeBool}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowFallbackToTiKV, Value: "", Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) { + if normalizedValue == "" { + return "", nil + } + engines := strings.Split(normalizedValue, ",") + var formatVal string + storeTypes := make(map[kv.StoreType]struct{}) + for i, engine := range engines { + engine = strings.TrimSpace(engine) + switch { + case strings.EqualFold(engine, kv.TiFlash.Name()): + if _, ok := storeTypes[kv.TiFlash]; !ok { + if i != 0 { + formatVal += "," + } + formatVal += kv.TiFlash.Name() + storeTypes[kv.TiFlash] = struct{}{} + } + default: + return normalizedValue, ErrWrongValueForVar.GenWithStackByArgs(TiDBAllowFallbackToTiKV, normalizedValue) + } + } + return formatVal, nil + }}, /* The following variable is defined as session scope but is actually server scope. */ {Scope: ScopeSession, Name: TiDBGeneralLog, Value: BoolToOnOff(DefTiDBGeneralLog), Type: TypeBool}, {Scope: ScopeSession, Name: TiDBPProfSQLCPU, Value: strconv.Itoa(DefTiDBPProfSQLCPU), Type: TypeInt, MinValue: 0, MaxValue: 1}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index c087911a4db93..3b8851c1e7ff4 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -526,8 +526,9 @@ const ( // TiDBEnableExchangePartition indicates whether to enable exchange partition. TiDBEnableExchangePartition = "tidb_enable_exchange_partition" - // TiDBEnableTiFlashFallbackTiKV indicates whether to fallback to TiKV when TiFlash is unavailable. - TiDBEnableTiFlashFallbackTiKV = "tidb_enable_tiflash_fallback_tikv" + // TiDBAllowFallbackToTiKV indicates the engine types whose unavailability triggers fallback to TiKV. + // Now we only support TiFlash. + TiDBAllowFallbackToTiKV = "tidb_allow_fallback_to_tikv" ) // TiDB vars that have only global scope @@ -668,7 +669,6 @@ const ( DefTiDBEnableIndexMergeJoin = false DefTiDBTrackAggregateMemoryUsage = true DefTiDBEnableExchangePartition = false - DefTiDBEnableTiFlashFallbackTiKV = false ) // Process global variables. diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index f389adff3a8a4..b5ad1c5b20486 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -105,7 +105,6 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) { c.Assert(vars.ShardAllocateStep, Equals, int64(DefTiDBShardAllocateStep)) c.Assert(vars.EnableChangeColumnType, Equals, DefTiDBChangeColumnType) c.Assert(vars.AnalyzeVersion, Equals, DefTiDBAnalyzeVersion) - c.Assert(vars.EnableTiFlashFallbackTiKV, Equals, DefTiDBEnableTiFlashFallbackTiKV) assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.MemQuota)) assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.BatchSize)) @@ -597,6 +596,12 @@ func (s *testVarsutilSuite) TestValidate(c *C) { {TiDBEnableAmendPessimisticTxn, "0", false}, {TiDBEnableAmendPessimisticTxn, "1", false}, {TiDBEnableAmendPessimisticTxn, "256", true}, + {TiDBAllowFallbackToTiKV, "", false}, + {TiDBAllowFallbackToTiKV, "tiflash", false}, + {TiDBAllowFallbackToTiKV, " tiflash ", false}, + {TiDBAllowFallbackToTiKV, "tikv", true}, + {TiDBAllowFallbackToTiKV, "tidb", true}, + {TiDBAllowFallbackToTiKV, "tiflash,tikv,tidb", true}, } for _, t := range tests {