Skip to content

Commit

Permalink
*: rename tiflash fallback switch (#22886)
Browse files Browse the repository at this point in the history
Co-authored-by: Ti Chi Robot <71242396+ti-chi-bot@users.noreply.github.com>
  • Loading branch information
xuyifangreeneyes and ti-chi-bot authored Mar 12, 2021
1 parent 1d99292 commit 035c15f
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 35 deletions.
25 changes: 15 additions & 10 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,16 +518,21 @@ func (s *testSerialSuite1) TestSetVar(c *C) {
tk.MustExec("SET GLOBAL tidb_enable_extended_stats = off")
tk.MustQuery("select @@global.tidb_enable_extended_stats").Check(testkit.Rows("0"))

tk.MustExec("SET SESSION tidb_enable_tiflash_fallback_tikv = on")
tk.MustQuery("select @@session.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("1"))
tk.MustExec("SET SESSION tidb_enable_tiflash_fallback_tikv = off")
tk.MustQuery("select @@session.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("0"))
tk.MustExec("SET GLOBAL tidb_enable_tiflash_fallback_tikv = on")
tk.MustQuery("select @@global.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("1"))
tk.MustExec("SET GLOBAL tidb_enable_tiflash_fallback_tikv = off")
tk.MustQuery("select @@global.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("0"))
c.Assert(tk.ExecToErr("SET SESSION tidb_enable_tiflash_fallback_tikv = 123"), NotNil)
c.Assert(tk.ExecToErr("SET GLOBAL tidb_enable_tiflash_fallback_tikv = 321"), NotNil)
tk.MustExec("SET SESSION tidb_allow_fallback_to_tikv = 'tiflash'")
tk.MustQuery("select @@session.tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash"))
tk.MustExec("SET SESSION tidb_allow_fallback_to_tikv = ''")
tk.MustQuery("select @@session.tidb_allow_fallback_to_tikv").Check(testkit.Rows(""))
tk.MustExec("SET GLOBAL tidb_allow_fallback_to_tikv = 'tiflash'")
tk.MustQuery("select @@global.tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash"))
tk.MustExec("SET GLOBAL tidb_allow_fallback_to_tikv = ''")
tk.MustQuery("select @@global.tidb_allow_fallback_to_tikv").Check(testkit.Rows(""))
tk.MustExec("set @@tidb_allow_fallback_to_tikv = 'tiflash, tiflash, tiflash'")
tk.MustQuery("select @@tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash"))

tk.MustGetErrMsg("SET SESSION tidb_allow_fallback_to_tikv = 'tikv,tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tikv,tiflash'")
tk.MustGetErrMsg("SET GLOBAL tidb_allow_fallback_to_tikv = 'tikv,tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tikv,tiflash'")
tk.MustGetErrMsg("set @@tidb_allow_fallback_to_tikv = 'tidb, tiflash, tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tidb, tiflash, tiflash'")
tk.MustGetErrMsg("set @@tidb_allow_fallback_to_tikv = 'unknown, tiflash, tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'unknown, tiflash, tiflash'")

// Test issue #22145
tk.MustExec(`set global sync_relay_log = "'"`)
Expand Down
14 changes: 10 additions & 4 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1495,7 +1495,8 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
}
retryable, err = cc.handleStmt(ctx, stmt, parserWarns, i == len(stmts)-1)
if err != nil {
if cc.ctx.GetSessionVars().EnableTiFlashFallbackTiKV && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable {
_, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
if allowTiFlashFallback && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable {
// When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash
// server and fallback to TiKV.
warns := append(parserWarns, stmtctx.SQLWarn{Level: stmtctx.WarnLevelError, Err: err})
Expand Down Expand Up @@ -1614,7 +1615,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return pointPlans, nil
}

// The first return value indicates whether the call of handleStmt has no side effect and can be retried to correct error.
// The first return value indicates whether the call of handleStmt has no side effect and can be retried.
// Currently the first return value is used to fallback to TiKV when TiFlash is down.
func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns []stmtctx.SQLWarn, lastStmt bool) (bool, error) {
ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{})
Expand Down Expand Up @@ -1792,9 +1793,14 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool
}

for {
failpoint.Inject("secondNextErr", func(value failpoint.Value) {
if value.(bool) && !firstNext {
failpoint.Inject("fetchNextErr", func(value failpoint.Value) {
switch value.(string) {
case "firstNext":
failpoint.Return(firstNext, tikv.ErrTiFlashServerTimeout)
case "secondNext":
if !firstNext {
failpoint.Return(firstNext, tikv.ErrTiFlashServerTimeout)
}
}
})
// Here server.tidbResultSet implements Next method.
Expand Down
7 changes: 4 additions & 3 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
}
ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{})
retryable, err := cc.executePreparedStmtAndWriteResult(ctx, stmt, args, useCursor)
if cc.ctx.GetSessionVars().EnableTiFlashFallbackTiKV && err != nil && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable {
_, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
if allowTiFlashFallback && err != nil && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable {
// When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash
// server and fallback to TiKV.
prevErr := err
Expand All @@ -210,8 +211,8 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
return err
}

// The first return value indicates whether the call of executePreparedStmtAndWriteResult has no side effect and can be retried
// to correct error. Currently the first return value is used to fallback to TiKV when TiFlash is down.
// The first return value indicates whether the call of executePreparedStmtAndWriteResult has no side effect and can be retried.
// Currently the first return value is used to fallback to TiKV when TiFlash is down.
func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stmt PreparedStatement, args []types.Datum, useCursor bool) (bool, error) {
rs, err := stmt.Execute(ctx, args)
if err != nil {
Expand Down
20 changes: 13 additions & 7 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,22 +761,28 @@ func (ts *ConnTestSuite) TestTiFlashFallback(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"), IsNil)
// test COM_STMT_EXECUTE
ctx := context.Background()
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1")
tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'")
c.Assert(cc.handleStmtPrepare(ctx, "select sum(a) from t"), IsNil)
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), IsNil)
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
// test COM_STMT_FETCH (cursor mode)
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0}), IsNil)
c.Assert(cc.handleStmtFetch(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), NotNil)
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=0")
tk.MustExec("set @@tidb_allow_fallback_to_tikv=''")
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), NotNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0"), IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/fetchNextErr", "return(\"firstNext\")"), IsNil)
// test COM_STMT_EXECUTE (cursor mode)
tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'")
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0}), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/fetchNextErr"), IsNil)

// test that TiDB would not retry if the first execution already sends data to client
c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/secondNextErr", "return(true)"), IsNil)
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/fetchNextErr", "return(\"secondNext\")"), IsNil)
tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'")
c.Assert(cc.handleQuery(ctx, "select * from t t1 join t t2 on t1.a = t2.a"), NotNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/secondNextErr"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/fetchNextErr"), IsNil)

// simple TiFlash query (unary + non-streaming)
tk.MustExec("set @@tidb_allow_batch_cop=0; set @@tidb_allow_mpp=0;")
Expand Down Expand Up @@ -809,9 +815,9 @@ func (ts *ConnTestSuite) TestTiFlashFallback(c *C) {

func testFallbackWork(c *C, tk *testkit.TestKit, cc *clientConn, sql string) {
ctx := context.Background()
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=0")
tk.MustExec("set @@tidb_allow_fallback_to_tikv=''")
c.Assert(tk.QueryToErr(sql), NotNil)
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1")
tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'")

c.Assert(cc.handleQuery(ctx, sql), IsNil)
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2496,7 +2496,7 @@ var builtinGlobalVariable = []string{
variable.TiDBTrackAggregateMemoryUsage,
variable.TiDBMultiStatementMode,
variable.TiDBEnableExchangePartition,
variable.TiDBEnableTiFlashFallbackTiKV,
variable.TiDBAllowFallbackToTiKV,
}

// loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session.
Expand Down
17 changes: 12 additions & 5 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,8 +815,9 @@ type SessionVars struct {
// TiDBEnableExchangePartition indicates whether to enable exchange partition
TiDBEnableExchangePartition bool

// EnableTiFlashFallbackTiKV indicates whether to fallback to TiKV when TiFlash is unavailable.
EnableTiFlashFallbackTiKV bool
// AllowFallbackToTiKV indicates the engine types whose unavailability triggers fallback to TiKV.
// Now we only support TiFlash.
AllowFallbackToTiKV map[kv.StoreType]struct{}
}

// CheckAndGetTxnScope will return the transaction scope we should use in the current session.
Expand Down Expand Up @@ -997,7 +998,7 @@ func NewSessionVars() *SessionVars {
GuaranteeLinearizability: DefTiDBGuaranteeLinearizability,
AnalyzeVersion: DefTiDBAnalyzeVersion,
EnableIndexMergeJoin: DefTiDBEnableIndexMergeJoin,
EnableTiFlashFallbackTiKV: DefTiDBEnableTiFlashFallbackTiKV,
AllowFallbackToTiKV: make(map[kv.StoreType]struct{}),
}
vars.KVVars = kv.NewVariables(&vars.Killed)
vars.Concurrency = Concurrency{
Expand Down Expand Up @@ -1749,8 +1750,14 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.MultiStatementMode = TiDBOptMultiStmt(val)
case TiDBEnableExchangePartition:
s.TiDBEnableExchangePartition = TiDBOptOn(val)
case TiDBEnableTiFlashFallbackTiKV:
s.EnableTiFlashFallbackTiKV = TiDBOptOn(val)
case TiDBAllowFallbackToTiKV:
s.AllowFallbackToTiKV = make(map[kv.StoreType]struct{})
for _, engine := range strings.Split(val, ",") {
switch engine {
case kv.TiFlash.Name():
s.AllowFallbackToTiKV[kv.TiFlash] = struct{}{}
}
}
}
s.systems[name] = val
return nil
Expand Down
25 changes: 24 additions & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,30 @@ var defaultSysVars = []*SysVar{
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableFastAnalyze, Value: BoolToOnOff(DefTiDBUseFastAnalyze), Type: TypeBool},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBSkipIsolationLevelCheck, Value: BoolToOnOff(DefTiDBSkipIsolationLevelCheck), Type: TypeBool},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableRateLimitAction, Value: BoolToOnOff(DefTiDBEnableRateLimitAction), Type: TypeBool},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableTiFlashFallbackTiKV, Value: BoolToOnOff(DefTiDBEnableTiFlashFallbackTiKV), Type: TypeBool},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowFallbackToTiKV, Value: "", Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
if normalizedValue == "" {
return "", nil
}
engines := strings.Split(normalizedValue, ",")
var formatVal string
storeTypes := make(map[kv.StoreType]struct{})
for i, engine := range engines {
engine = strings.TrimSpace(engine)
switch {
case strings.EqualFold(engine, kv.TiFlash.Name()):
if _, ok := storeTypes[kv.TiFlash]; !ok {
if i != 0 {
formatVal += ","
}
formatVal += kv.TiFlash.Name()
storeTypes[kv.TiFlash] = struct{}{}
}
default:
return normalizedValue, ErrWrongValueForVar.GenWithStackByArgs(TiDBAllowFallbackToTiKV, normalizedValue)
}
}
return formatVal, nil
}},
/* The following variable is defined as session scope but is actually server scope. */
{Scope: ScopeSession, Name: TiDBGeneralLog, Value: BoolToOnOff(DefTiDBGeneralLog), Type: TypeBool},
{Scope: ScopeSession, Name: TiDBPProfSQLCPU, Value: strconv.Itoa(DefTiDBPProfSQLCPU), Type: TypeInt, MinValue: 0, MaxValue: 1},
Expand Down
6 changes: 3 additions & 3 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,9 @@ const (
// TiDBEnableExchangePartition indicates whether to enable exchange partition.
TiDBEnableExchangePartition = "tidb_enable_exchange_partition"

// TiDBEnableTiFlashFallbackTiKV indicates whether to fallback to TiKV when TiFlash is unavailable.
TiDBEnableTiFlashFallbackTiKV = "tidb_enable_tiflash_fallback_tikv"
// TiDBAllowFallbackToTiKV indicates the engine types whose unavailability triggers fallback to TiKV.
// Now we only support TiFlash.
TiDBAllowFallbackToTiKV = "tidb_allow_fallback_to_tikv"
)

// TiDB vars that have only global scope
Expand Down Expand Up @@ -668,7 +669,6 @@ const (
DefTiDBEnableIndexMergeJoin = false
DefTiDBTrackAggregateMemoryUsage = true
DefTiDBEnableExchangePartition = false
DefTiDBEnableTiFlashFallbackTiKV = false
)

// Process global variables.
Expand Down
7 changes: 6 additions & 1 deletion sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) {
c.Assert(vars.ShardAllocateStep, Equals, int64(DefTiDBShardAllocateStep))
c.Assert(vars.EnableChangeColumnType, Equals, DefTiDBChangeColumnType)
c.Assert(vars.AnalyzeVersion, Equals, DefTiDBAnalyzeVersion)
c.Assert(vars.EnableTiFlashFallbackTiKV, Equals, DefTiDBEnableTiFlashFallbackTiKV)

assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.MemQuota))
assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.BatchSize))
Expand Down Expand Up @@ -597,6 +596,12 @@ func (s *testVarsutilSuite) TestValidate(c *C) {
{TiDBEnableAmendPessimisticTxn, "0", false},
{TiDBEnableAmendPessimisticTxn, "1", false},
{TiDBEnableAmendPessimisticTxn, "256", true},
{TiDBAllowFallbackToTiKV, "", false},
{TiDBAllowFallbackToTiKV, "tiflash", false},
{TiDBAllowFallbackToTiKV, " tiflash ", false},
{TiDBAllowFallbackToTiKV, "tikv", true},
{TiDBAllowFallbackToTiKV, "tidb", true},
{TiDBAllowFallbackToTiKV, "tiflash,tikv,tidb", true},
}

for _, t := range tests {
Expand Down

0 comments on commit 035c15f

Please sign in to comment.