Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: rename tiflash fallback switch #22886

Merged
merged 16 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,16 +518,21 @@ func (s *testSerialSuite1) TestSetVar(c *C) {
tk.MustExec("SET GLOBAL tidb_enable_extended_stats = off")
tk.MustQuery("select @@global.tidb_enable_extended_stats").Check(testkit.Rows("0"))

tk.MustExec("SET SESSION tidb_enable_tiflash_fallback_tikv = on")
tk.MustQuery("select @@session.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("1"))
tk.MustExec("SET SESSION tidb_enable_tiflash_fallback_tikv = off")
tk.MustQuery("select @@session.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("0"))
tk.MustExec("SET GLOBAL tidb_enable_tiflash_fallback_tikv = on")
tk.MustQuery("select @@global.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("1"))
tk.MustExec("SET GLOBAL tidb_enable_tiflash_fallback_tikv = off")
tk.MustQuery("select @@global.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("0"))
c.Assert(tk.ExecToErr("SET SESSION tidb_enable_tiflash_fallback_tikv = 123"), NotNil)
c.Assert(tk.ExecToErr("SET GLOBAL tidb_enable_tiflash_fallback_tikv = 321"), NotNil)
tk.MustExec("SET SESSION tidb_allow_fallback_to_tikv = 'tiflash'")
tk.MustQuery("select @@session.tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash"))
tk.MustExec("SET SESSION tidb_allow_fallback_to_tikv = ''")
tk.MustQuery("select @@session.tidb_allow_fallback_to_tikv").Check(testkit.Rows(""))
tk.MustExec("SET GLOBAL tidb_allow_fallback_to_tikv = 'tiflash'")
tk.MustQuery("select @@global.tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash"))
tk.MustExec("SET GLOBAL tidb_allow_fallback_to_tikv = ''")
tk.MustQuery("select @@global.tidb_allow_fallback_to_tikv").Check(testkit.Rows(""))
tk.MustExec("set @@tidb_allow_fallback_to_tikv = 'tiflash, tiflash, tiflash'")
tk.MustQuery("select @@tidb_allow_fallback_to_tikv").Check(testkit.Rows("tiflash"))

tk.MustGetErrMsg("SET SESSION tidb_allow_fallback_to_tikv = 'tikv,tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tikv,tiflash'")
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
tk.MustGetErrMsg("SET GLOBAL tidb_allow_fallback_to_tikv = 'tikv,tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tikv,tiflash'")
tk.MustGetErrMsg("set @@tidb_allow_fallback_to_tikv = 'tidb, tiflash, tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'tidb, tiflash, tiflash'")
tk.MustGetErrMsg("set @@tidb_allow_fallback_to_tikv = 'unknown, tiflash, tiflash'", "[variable:1231]Variable 'tidb_allow_fallback_to_tikv' can't be set to the value of 'unknown, tiflash, tiflash'")

// Test issue #22145
tk.MustExec(`set global sync_relay_log = "'"`)
Expand Down
14 changes: 10 additions & 4 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1495,7 +1495,8 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
}
retryable, err = cc.handleStmt(ctx, stmt, parserWarns, i == len(stmts)-1)
if err != nil {
if cc.ctx.GetSessionVars().EnableTiFlashFallbackTiKV && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable {
_, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
if allowTiFlashFallback && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable {
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
// When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash
// server and fallback to TiKV.
warns := append(parserWarns, stmtctx.SQLWarn{Level: stmtctx.WarnLevelError, Err: err})
Expand Down Expand Up @@ -1614,7 +1615,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return pointPlans, nil
}

// The first return value indicates whether the call of handleStmt has no side effect and can be retried to correct error.
// The first return value indicates whether the call of handleStmt has no side effect and can be retried.
// Currently the first return value is used to fallback to TiKV when TiFlash is down.
func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns []stmtctx.SQLWarn, lastStmt bool) (bool, error) {
ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{})
Expand Down Expand Up @@ -1792,9 +1793,14 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool
}

for {
failpoint.Inject("secondNextErr", func(value failpoint.Value) {
if value.(bool) && !firstNext {
failpoint.Inject("fetchNextErr", func(value failpoint.Value) {
switch value.(string) {
case "firstNext":
failpoint.Return(firstNext, tikv.ErrTiFlashServerTimeout)
case "secondNext":
if !firstNext {
failpoint.Return(firstNext, tikv.ErrTiFlashServerTimeout)
}
}
})
// Here server.tidbResultSet implements Next method.
Expand Down
7 changes: 4 additions & 3 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
}
ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{})
retryable, err := cc.executePreparedStmtAndWriteResult(ctx, stmt, args, useCursor)
if cc.ctx.GetSessionVars().EnableTiFlashFallbackTiKV && err != nil && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable {
_, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
if allowTiFlashFallback && err != nil && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable {
// When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash
// server and fallback to TiKV.
prevErr := err
Expand All @@ -210,8 +211,8 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
return err
}

// The first return value indicates whether the call of executePreparedStmtAndWriteResult has no side effect and can be retried
// to correct error. Currently the first return value is used to fallback to TiKV when TiFlash is down.
// The first return value indicates whether the call of executePreparedStmtAndWriteResult has no side effect and can be retried.
// Currently the first return value is used to fallback to TiKV when TiFlash is down.
func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stmt PreparedStatement, args []types.Datum, useCursor bool) (bool, error) {
rs, err := stmt.Execute(ctx, args)
if err != nil {
Expand Down
20 changes: 13 additions & 7 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,22 +761,28 @@ func (ts *ConnTestSuite) TestTiFlashFallback(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"), IsNil)
// test COM_STMT_EXECUTE
ctx := context.Background()
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1")
tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'")
c.Assert(cc.handleStmtPrepare(ctx, "select sum(a) from t"), IsNil)
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), IsNil)
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
// test COM_STMT_FETCH (cursor mode)
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0}), IsNil)
c.Assert(cc.handleStmtFetch(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), NotNil)
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=0")
tk.MustExec("set @@tidb_allow_fallback_to_tikv=''")
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), NotNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0"), IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/fetchNextErr", "return(\"firstNext\")"), IsNil)
// test COM_STMT_EXECUTE (cursor mode)
tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'")
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0}), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/fetchNextErr"), IsNil)

// test that TiDB would not retry if the first execution already sends data to client
c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/secondNextErr", "return(true)"), IsNil)
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/fetchNextErr", "return(\"secondNext\")"), IsNil)
tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'")
c.Assert(cc.handleQuery(ctx, "select * from t t1 join t t2 on t1.a = t2.a"), NotNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/secondNextErr"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/fetchNextErr"), IsNil)

// simple TiFlash query (unary + non-streaming)
tk.MustExec("set @@tidb_allow_batch_cop=0; set @@tidb_allow_mpp=0;")
Expand Down Expand Up @@ -809,9 +815,9 @@ func (ts *ConnTestSuite) TestTiFlashFallback(c *C) {

func testFallbackWork(c *C, tk *testkit.TestKit, cc *clientConn, sql string) {
ctx := context.Background()
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=0")
tk.MustExec("set @@tidb_allow_fallback_to_tikv=''")
c.Assert(tk.QueryToErr(sql), NotNil)
tk.MustExec("set @@tidb_enable_tiflash_fallback_tikv=1")
tk.MustExec("set @@tidb_allow_fallback_to_tikv='tiflash'")

c.Assert(cc.handleQuery(ctx, sql), IsNil)
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2496,7 +2496,7 @@ var builtinGlobalVariable = []string{
variable.TiDBTrackAggregateMemoryUsage,
variable.TiDBMultiStatementMode,
variable.TiDBEnableExchangePartition,
variable.TiDBEnableTiFlashFallbackTiKV,
variable.TiDBAllowFallbackToTiKV,
}

// loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session.
Expand Down
17 changes: 12 additions & 5 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,8 +815,9 @@ type SessionVars struct {
// TiDBEnableExchangePartition indicates whether to enable exchange partition
TiDBEnableExchangePartition bool

// EnableTiFlashFallbackTiKV indicates whether to fallback to TiKV when TiFlash is unavailable.
EnableTiFlashFallbackTiKV bool
// AllowFallbackToTiKV indicates the engine types whose unavailability triggers fallback to TiKV.
// Now we only support TiFlash.
AllowFallbackToTiKV map[kv.StoreType]struct{}
}

// CheckAndGetTxnScope will return the transaction scope we should use in the current session.
Expand Down Expand Up @@ -997,7 +998,7 @@ func NewSessionVars() *SessionVars {
GuaranteeLinearizability: DefTiDBGuaranteeLinearizability,
AnalyzeVersion: DefTiDBAnalyzeVersion,
EnableIndexMergeJoin: DefTiDBEnableIndexMergeJoin,
EnableTiFlashFallbackTiKV: DefTiDBEnableTiFlashFallbackTiKV,
AllowFallbackToTiKV: make(map[kv.StoreType]struct{}),
}
vars.KVVars = kv.NewVariables(&vars.Killed)
vars.Concurrency = Concurrency{
Expand Down Expand Up @@ -1749,8 +1750,14 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.MultiStatementMode = TiDBOptMultiStmt(val)
case TiDBEnableExchangePartition:
s.TiDBEnableExchangePartition = TiDBOptOn(val)
case TiDBEnableTiFlashFallbackTiKV:
s.EnableTiFlashFallbackTiKV = TiDBOptOn(val)
case TiDBAllowFallbackToTiKV:
s.AllowFallbackToTiKV = make(map[kv.StoreType]struct{})
for _, engine := range strings.Split(val, ",") {
switch engine {
case kv.TiFlash.Name():
s.AllowFallbackToTiKV[kv.TiFlash] = struct{}{}
}
}
}
s.systems[name] = val
return nil
Expand Down
25 changes: 24 additions & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,30 @@ var defaultSysVars = []*SysVar{
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableFastAnalyze, Value: BoolToOnOff(DefTiDBUseFastAnalyze), Type: TypeBool},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBSkipIsolationLevelCheck, Value: BoolToOnOff(DefTiDBSkipIsolationLevelCheck), Type: TypeBool},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableRateLimitAction, Value: BoolToOnOff(DefTiDBEnableRateLimitAction), Type: TypeBool},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableTiFlashFallbackTiKV, Value: BoolToOnOff(DefTiDBEnableTiFlashFallbackTiKV), Type: TypeBool},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowFallbackToTiKV, Value: "", Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
winoros marked this conversation as resolved.
Show resolved Hide resolved
if normalizedValue == "" {
return "", nil
}
engines := strings.Split(normalizedValue, ",")
var formatVal string
storeTypes := make(map[kv.StoreType]struct{})
for i, engine := range engines {
engine = strings.TrimSpace(engine)
switch {
case strings.EqualFold(engine, kv.TiFlash.Name()):
if _, ok := storeTypes[kv.TiFlash]; !ok {
if i != 0 {
formatVal += ","
}
formatVal += kv.TiFlash.Name()
storeTypes[kv.TiFlash] = struct{}{}
}
default:
return normalizedValue, ErrWrongValueForVar.GenWithStackByArgs(TiDBAllowFallbackToTiKV, normalizedValue)
}
}
return formatVal, nil
}},
/* The following variable is defined as session scope but is actually server scope. */
{Scope: ScopeSession, Name: TiDBGeneralLog, Value: BoolToOnOff(DefTiDBGeneralLog), Type: TypeBool},
{Scope: ScopeSession, Name: TiDBPProfSQLCPU, Value: strconv.Itoa(DefTiDBPProfSQLCPU), Type: TypeInt, MinValue: 0, MaxValue: 1},
Expand Down
6 changes: 3 additions & 3 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,9 @@ const (
// TiDBEnableExchangePartition indicates whether to enable exchange partition.
TiDBEnableExchangePartition = "tidb_enable_exchange_partition"

// TiDBEnableTiFlashFallbackTiKV indicates whether to fallback to TiKV when TiFlash is unavailable.
TiDBEnableTiFlashFallbackTiKV = "tidb_enable_tiflash_fallback_tikv"
// TiDBAllowFallbackToTiKV indicates the engine types whose unavailability triggers fallback to TiKV.
// Now we only support TiFlash.
TiDBAllowFallbackToTiKV = "tidb_allow_fallback_to_tikv"
)

// TiDB vars that have only global scope
Expand Down Expand Up @@ -668,7 +669,6 @@ const (
DefTiDBEnableIndexMergeJoin = false
DefTiDBTrackAggregateMemoryUsage = true
DefTiDBEnableExchangePartition = false
DefTiDBEnableTiFlashFallbackTiKV = false
)

// Process global variables.
Expand Down
7 changes: 6 additions & 1 deletion sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) {
c.Assert(vars.ShardAllocateStep, Equals, int64(DefTiDBShardAllocateStep))
c.Assert(vars.EnableChangeColumnType, Equals, DefTiDBChangeColumnType)
c.Assert(vars.AnalyzeVersion, Equals, DefTiDBAnalyzeVersion)
c.Assert(vars.EnableTiFlashFallbackTiKV, Equals, DefTiDBEnableTiFlashFallbackTiKV)

assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.MemQuota))
assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.BatchSize))
Expand Down Expand Up @@ -597,6 +596,12 @@ func (s *testVarsutilSuite) TestValidate(c *C) {
{TiDBEnableAmendPessimisticTxn, "0", false},
{TiDBEnableAmendPessimisticTxn, "1", false},
{TiDBEnableAmendPessimisticTxn, "256", true},
{TiDBAllowFallbackToTiKV, "", false},
{TiDBAllowFallbackToTiKV, "tiflash", false},
{TiDBAllowFallbackToTiKV, " tiflash ", false},
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
{TiDBAllowFallbackToTiKV, "tikv", true},
{TiDBAllowFallbackToTiKV, "tidb", true},
{TiDBAllowFallbackToTiKV, "tiflash,tikv,tidb", true},
}

for _, t := range tests {
Expand Down