Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: retry executing sql without tiflash after tiflash is down #22459

Merged
merged 17 commits into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,17 @@ func (s *testSerialSuite1) TestSetVar(c *C) {
tk.MustExec("SET GLOBAL tidb_enable_extended_stats = off")
tk.MustQuery("select @@global.tidb_enable_extended_stats").Check(testkit.Rows("0"))

tk.MustExec("SET SESSION tidb_enable_tiflash_fallback_tikv = on")
tk.MustQuery("select @@session.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("1"))
tk.MustExec("SET SESSION tidb_enable_tiflash_fallback_tikv = off")
tk.MustQuery("select @@session.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("0"))
tk.MustExec("SET GLOBAL tidb_enable_tiflash_fallback_tikv = on")
tk.MustQuery("select @@global.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("1"))
tk.MustExec("SET GLOBAL tidb_enable_tiflash_fallback_tikv = off")
tk.MustQuery("select @@global.tidb_enable_tiflash_fallback_tikv").Check(testkit.Rows("0"))
c.Assert(tk.ExecToErr("SET SESSION tidb_enable_tiflash_fallback_tikv = 123"), NotNil)
c.Assert(tk.ExecToErr("SET GLOBAL tidb_enable_tiflash_fallback_tikv = 321"), NotNil)

// Test issue #22145
tk.MustExec(`set global sync_relay_log = "'"`)

Expand Down
66 changes: 46 additions & 20 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/arena"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -1457,14 +1458,31 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
if len(pointPlans) > 0 {
defer cc.ctx.ClearValue(plannercore.PointPlanKey)
}
var retryable bool
for i, stmt := range stmts {
if len(pointPlans) > 0 {
// Save the point plan in Session so we don't need to build the point plan again.
cc.ctx.SetValue(plannercore.PointPlanKey, plannercore.PointPlanVal{Plan: pointPlans[i]})
}
err = cc.handleStmt(ctx, stmt, parserWarns, i == len(stmts)-1)
retryable, err = cc.handleStmt(ctx, stmt, parserWarns, i == len(stmts)-1)
if err != nil {
break
if cc.ctx.GetSessionVars().EnableTiFlashFallbackTiKV && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable {
// When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash
// server and fallback to TiKV.
warns := append(parserWarns, stmtctx.SQLWarn{Level: stmtctx.WarnLevelError, Err: err})
func() {
delete(cc.ctx.GetSessionVars().IsolationReadEngines, kv.TiFlash)
defer func() {
cc.ctx.GetSessionVars().IsolationReadEngines[kv.TiFlash] = struct{}{}
}()
_, err = cc.handleStmt(ctx, stmt, warns, i == len(stmts)-1)
}()
if err != nil {
break
}
} else {
break
}
}
}
if err != nil {
Expand Down Expand Up @@ -1567,7 +1585,9 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return pointPlans, nil
}

func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns []stmtctx.SQLWarn, lastStmt bool) error {
// The first return value indicates whether the call of handleStmt has no side effect and can be retried to correct error.
// Currently the first return value is used to fallback to TiKV when TiFlash is down.
func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns []stmtctx.SQLWarn, lastStmt bool) (bool, error) {
ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{})
reg := trace.StartRegion(ctx, "ExecuteStmt")
rs, err := cc.ctx.ExecuteStmt(ctx, stmt)
Expand All @@ -1578,7 +1598,7 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns [
defer terror.Call(rs.Close)
}
if err != nil {
return err
return true, err
}

if lastStmt {
Expand All @@ -1593,12 +1613,12 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns [
if rs != nil {
connStatus := atomic.LoadInt32(&cc.status)
if connStatus == connStatusShutdown {
return executor.ErrQueryInterrupted
return false, executor.ErrQueryInterrupted
}

err = cc.writeResultset(ctx, rs, false, status, 0)
retryable, err := cc.writeResultset(ctx, rs, false, status, 0)
if err != nil {
return err
return retryable, err
}
} else {
handled, err := cc.handleQuerySpecial(ctx, status)
Expand All @@ -1609,10 +1629,10 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns [
}
}
if err != nil {
return err
return false, err
}
}
return nil
return false, nil
}

func (cc *clientConn) handleQuerySpecial(ctx context.Context, status uint16) (bool, error) {
Expand Down Expand Up @@ -1678,7 +1698,10 @@ func (cc *clientConn) handleFieldList(ctx context.Context, sql string) (err erro
// If binary is true, the data would be encoded in BINARY format.
// serverStatus, a flag bit represents server information.
// fetchSize, the desired number of rows to be fetched each time when client uses cursor.
func (cc *clientConn) writeResultset(ctx context.Context, rs ResultSet, binary bool, serverStatus uint16, fetchSize int) (runErr error) {
// retryable indicates whether the call of writeResultset has no side effect and can be retried to correct error. The call
// has side effect in cursor mode or once data has been sent to client. Currently retryable is used to fallback to TiKV when
// TiFlash is down.
func (cc *clientConn) writeResultset(ctx context.Context, rs ResultSet, binary bool, serverStatus uint16, fetchSize int) (retryable bool, runErr error) {
defer func() {
// close ResultSet when cursor doesn't exist
r := recover()
Expand All @@ -1699,13 +1722,13 @@ func (cc *clientConn) writeResultset(ctx context.Context, rs ResultSet, binary b
if mysql.HasCursorExistsFlag(serverStatus) {
err = cc.writeChunksWithFetchSize(ctx, rs, serverStatus, fetchSize)
} else {
err = cc.writeChunks(ctx, rs, binary, serverStatus)
retryable, err = cc.writeChunks(ctx, rs, binary, serverStatus)
}
if err != nil {
return err
return retryable, err
}

return cc.flush(ctx)
return false, cc.flush(ctx)
}

func (cc *clientConn) writeColumnInfo(columns []*ColumnInfo, serverStatus uint16) error {
Expand All @@ -1727,10 +1750,12 @@ func (cc *clientConn) writeColumnInfo(columns []*ColumnInfo, serverStatus uint16
// writeChunks writes data from a Chunk, which filled data by a ResultSet, into a connection.
// binary specifies the way to dump data. It throws any error while dumping data.
// serverStatus, a flag bit represents server information
func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool, serverStatus uint16) error {
// The first return value indicates whether error occurs at the first call of ResultSet.Next.
func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool, serverStatus uint16) (bool, error) {
data := cc.alloc.AllocWithLen(4, 1024)
req := rs.NewChunk()
gotColumnInfo := false
firstNext := true
var stmtDetail *execdetails.StmtExecDetails
stmtDetailRaw := ctx.Value(execdetails.StmtExecDetailKey)
if stmtDetailRaw != nil {
Expand All @@ -1741,15 +1766,16 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool
// Here server.tidbResultSet implements Next method.
err := rs.Next(ctx, req)
if err != nil {
return err
return firstNext, err
}
firstNext = false
if !gotColumnInfo {
// We need to call Next before we get columns.
// Otherwise, we will get incorrect columns info.
columns := rs.Columns()
err = cc.writeColumnInfo(columns, serverStatus)
if err != nil {
return err
return false, err
}
gotColumnInfo = true
}
Expand All @@ -1768,19 +1794,19 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool
}
if err != nil {
reg.End()
return err
return false, err
}
if err = cc.writePacket(data); err != nil {
reg.End()
return err
return false, err
}
}
reg.End()
if stmtDetail != nil {
stmtDetail.WriteSQLRespDuration += time.Since(start)
}
}
return cc.writeEOF(serverStatus)
return false, cc.writeEOF(serverStatus)
}

// writeChunksWithFetchSize writes data from a Chunk, which filled data by a ResultSet, into a connection.
Expand Down Expand Up @@ -1873,7 +1899,7 @@ func (cc *clientConn) writeMultiResultset(ctx context.Context, rss []ResultSet,
if !lastRs {
status |= mysql.ServerMoreResultsExists
}
if err := cc.writeResultset(ctx, rs, binary, status, 0); err != nil {
if _, err := cc.writeResultset(ctx, rs, binary, status, 0); err != nil {
return err
}
}
Expand Down
37 changes: 29 additions & 8 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
Expand Down Expand Up @@ -192,12 +194,31 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
}
}
ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{})
retryable, err := cc.executePreparedStmtAndWriteResult(ctx, stmt, args, useCursor)
if cc.ctx.GetSessionVars().EnableTiFlashFallbackTiKV && err != nil && errors.ErrorEqual(err, tikv.ErrTiFlashServerTimeout) && retryable {
// When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash
// server and fallback to TiKV.
prevErr := err
delete(cc.ctx.GetSessionVars().IsolationReadEngines, kv.TiFlash)
defer func() {
cc.ctx.GetSessionVars().IsolationReadEngines[kv.TiFlash] = struct{}{}
}()
_, err = cc.executePreparedStmtAndWriteResult(ctx, stmt, args, useCursor)
// We append warning after the retry because `ResetContextOfStmt` may be called during the retry, which clears warnings.
cc.ctx.GetSessionVars().StmtCtx.AppendError(prevErr)
}
return err
}

// The first return value indicates whether the call of executePreparedStmtAndWriteResult has no side effect and can be retried
// to correct error. Currently the first return value is used to fallback to TiKV when TiFlash is down.
func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stmt PreparedStatement, args []types.Datum, useCursor bool) (bool, error) {
rs, err := stmt.Execute(ctx, args)
if err != nil {
return errors.Annotate(err, cc.preparedStmt2String(stmtID))
return true, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID())))
}
if rs == nil {
return cc.writeOK(ctx)
return false, cc.writeOK(ctx)
}

// if the client wants to use cursor
Expand All @@ -207,20 +228,20 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
stmt.StoreResultSet(rs)
err = cc.writeColumnInfo(rs.Columns(), mysql.ServerStatusCursorExists)
if err != nil {
return err
return false, err
}
if cl, ok := rs.(fetchNotifier); ok {
cl.OnFetchReturned()
}
// explicitly flush columnInfo to client.
return cc.flush(ctx)
return false, cc.flush(ctx)
}
defer terror.Call(rs.Close)
err = cc.writeResultset(ctx, rs, true, 0, 0)
retryable, err := cc.writeResultset(ctx, rs, true, 0, 0)
if err != nil {
return errors.Annotate(err, cc.preparedStmt2String(stmtID))
return retryable, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID())))
}
return nil
return false, nil
}

// maxFetchSize constants
Expand Down Expand Up @@ -252,7 +273,7 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch_rs"), cc.preparedStmt2String(stmtID))
}

err = cc.writeResultset(ctx, rs, true, mysql.ServerStatusCursorExists, int(fetchSize))
_, err = cc.writeResultset(ctx, rs, true, mysql.ServerStatusCursorExists, int(fetchSize))
if err != nil {
return errors.Annotate(err, cc.preparedStmt2String(stmtID))
}
Expand Down
47 changes: 47 additions & 0 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
Expand Down Expand Up @@ -710,3 +711,49 @@ func (ts *ConnTestSuite) TestPrefetchPointKeys(c *C) {
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 3", "2 2 6", "3 3 5"))
}

func (ts *ConnTestSuite) TestFallbackToTiKVWhenTiFlashIsDown(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/errorMockTiFlashServerTimeout", "return(true)"), IsNil)
cc := &clientConn{
alloc: arena.NewAllocator(1024),
pkt: &packetIO{
bufWriter: bufio.NewWriter(bytes.NewBuffer(nil)),
},
}
tk := testkit.NewTestKitWithInit(c, ts.store)
cc.ctx = &TiDBContext{Session: tk.Se, stmts: make(map[int]*TiDBStatement)}

tk.MustExec("set @@session.tidb_enable_tiflash_fallback_tikv = 1")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int)")
tk.MustExec("insert into t values (3, 4), (6, 7), (9, 10)")

// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

tk.MustQuery("explain select sum(a) from t").Check(testkit.Rows(
"StreamAgg_20 1.00 root funcs:sum(Column#6)->Column#4",
"└─TableReader_21 1.00 root data:StreamAgg_8",
" └─StreamAgg_8 1.00 cop[tiflash] funcs:sum(test.t.a)->Column#6",
" └─TableFullScan_19 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo"))

ctx := context.Background()
c.Assert(cc.handleQuery(ctx, "select sum(a) from t"), IsNil)
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
c.Assert(cc.handleStmtPrepare(ctx, "select sum(a) from t"), IsNil)
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), IsNil)
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))

c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/errorMockTiFlashServerTimeout"), IsNil)
}
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2640,6 +2640,7 @@ var builtinGlobalVariable = []string{
variable.TiDBTrackAggregateMemoryUsage,
variable.TiDBMultiStatementMode,
variable.TiDBEnableExchangePartition,
variable.TiDBEnableTiFlashFallbackTiKV,
}

var (
Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,9 @@ type SessionVars struct {

// TiDBEnableExchangePartition indicates whether to enable exchange partition
TiDBEnableExchangePartition bool

// EnableTiFlashFallbackTiKV indicates whether to fallback to TiKV when TiFlash is unavailable.
EnableTiFlashFallbackTiKV bool
}

// CheckAndGetTxnScope will return the transaction scope we should use in the current session.
Expand Down Expand Up @@ -967,6 +970,7 @@ func NewSessionVars() *SessionVars {
GuaranteeLinearizability: DefTiDBGuaranteeLinearizability,
AnalyzeVersion: DefTiDBAnalyzeVersion,
EnableIndexMergeJoin: DefTiDBEnableIndexMergeJoin,
EnableTiFlashFallbackTiKV: DefTiDBEnableTiFlashFallbackTiKV,
}
vars.KVVars = kv.NewVariables(&vars.Killed)
vars.Concurrency = Concurrency{
Expand Down Expand Up @@ -1697,6 +1701,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.MultiStatementMode = TiDBOptMultiStmt(val)
case TiDBEnableExchangePartition:
s.TiDBEnableExchangePartition = TiDBOptOn(val)
case TiDBEnableTiFlashFallbackTiKV:
s.EnableTiFlashFallbackTiKV = TiDBOptOn(val)
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ var defaultSysVars = []*SysVar{
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableFastAnalyze, Value: BoolToOnOff(DefTiDBUseFastAnalyze), Type: TypeBool},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBSkipIsolationLevelCheck, Value: BoolToOnOff(DefTiDBSkipIsolationLevelCheck), Type: TypeBool},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableRateLimitAction, Value: BoolToOnOff(DefTiDBEnableRateLimitAction), Type: TypeBool},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableTiFlashFallbackTiKV, Value: BoolToOnOff(DefTiDBEnableTiFlashFallbackTiKV), Type: TypeBool},
/* The following variable is defined as session scope but is actually server scope. */
{Scope: ScopeSession, Name: TiDBGeneralLog, Value: BoolToOnOff(DefTiDBGeneralLog), Type: TypeBool},
{Scope: ScopeSession, Name: TiDBPProfSQLCPU, Value: strconv.Itoa(DefTiDBPProfSQLCPU), Type: TypeInt, MinValue: 0, MaxValue: 1},
Expand Down
Loading