Skip to content

Commit

Permalink
Merge branch 'master' into kvfilter
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Apr 27, 2021
2 parents 13299c9 + 6bea505 commit 03380ec
Show file tree
Hide file tree
Showing 33 changed files with 217 additions and 199 deletions.
4 changes: 2 additions & 2 deletions ddl/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -138,7 +138,7 @@ func waitScatterRegionFinish(ctx context.Context, store kv.SplittableStore, regi
if err != nil {
logutil.BgLogger().Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
// We don't break for PDError because it may caused by ScatterRegion request failed.
if _, ok := errors.Cause(err).(*tikvstore.PDError); !ok {
if _, ok := errors.Cause(err).(*tikverr.PDError); !ok {
break
}
}
Expand Down
3 changes: 2 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
tikverr "github.com/pingcap/tidb/store/tikv/error"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -637,7 +638,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
}
txnCtx := sessVars.TxnCtx
var newForUpdateTS uint64
if deadlock, ok := errors.Cause(err).(*tikvstore.ErrDeadlock); ok {
if deadlock, ok := errors.Cause(err).(*tikverr.ErrDeadlock); ok {
if !deadlock.IsRetryable {
return nil, ErrDeadlock
}
Expand Down
12 changes: 6 additions & 6 deletions executor/point_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -536,15 +536,15 @@ func (s *testPointGetSuite) TestSelectCheckVisibility(c *C) {
c.Assert(expectErr.Equal(err), IsTrue)
}
// Test point get.
checkSelectResultError("select * from t where a='1'", tikvstore.ErrGCTooEarly)
checkSelectResultError("select * from t where a='1'", tikverr.ErrGCTooEarly)
// Test batch point get.
checkSelectResultError("select * from t where a in ('1','2')", tikvstore.ErrGCTooEarly)
checkSelectResultError("select * from t where a in ('1','2')", tikverr.ErrGCTooEarly)
// Test Index look up read.
checkSelectResultError("select * from t where b > 0 ", tikvstore.ErrGCTooEarly)
checkSelectResultError("select * from t where b > 0 ", tikverr.ErrGCTooEarly)
// Test Index read.
checkSelectResultError("select b from t where b > 0 ", tikvstore.ErrGCTooEarly)
checkSelectResultError("select b from t where b > 0 ", tikverr.ErrGCTooEarly)
// Test table read.
checkSelectResultError("select * from t", tikvstore.ErrGCTooEarly)
checkSelectResultError("select * from t", tikverr.ErrGCTooEarly)
}

func (s *testPointGetSuite) TestReturnValues(c *C) {
Expand Down
8 changes: 4 additions & 4 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/arena"
Expand Down Expand Up @@ -1569,7 +1569,7 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
retryable, err = cc.handleStmt(ctx, stmt, parserWarns, i == len(stmts)-1)
if err != nil {
_, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
if allowTiFlashFallback && errors.ErrorEqual(err, tikvstore.ErrTiFlashServerTimeout) && retryable {
if allowTiFlashFallback && errors.ErrorEqual(err, tikverr.ErrTiFlashServerTimeout) && retryable {
// When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash
// server and fallback to TiKV.
warns := append(parserWarns, stmtctx.SQLWarn{Level: stmtctx.WarnLevelError, Err: err})
Expand Down Expand Up @@ -1870,10 +1870,10 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool
failpoint.Inject("fetchNextErr", func(value failpoint.Value) {
switch value.(string) {
case "firstNext":
failpoint.Return(firstNext, tikvstore.ErrTiFlashServerTimeout)
failpoint.Return(firstNext, tikverr.ErrTiFlashServerTimeout)
case "secondNext":
if !firstNext {
failpoint.Return(firstNext, tikvstore.ErrTiFlashServerTimeout)
failpoint.Return(firstNext, tikverr.ErrTiFlashServerTimeout)
}
}
})
Expand Down
4 changes: 2 additions & 2 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import (
"github.com/pingcap/tidb/metrics"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx/stmtctx"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/execdetails"
Expand Down Expand Up @@ -198,7 +198,7 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
ctx = context.WithValue(ctx, util.ExecDetailsKey, &util.ExecDetails{})
retryable, err := cc.executePreparedStmtAndWriteResult(ctx, stmt, args, useCursor)
_, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
if allowTiFlashFallback && err != nil && errors.ErrorEqual(err, tikvstore.ErrTiFlashServerTimeout) && retryable {
if allowTiFlashFallback && err != nil && errors.ErrorEqual(err, tikverr.ErrTiFlashServerTimeout) && retryable {
// When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash
// server and fallback to TiKV.
prevErr := err
Expand Down
42 changes: 21 additions & 21 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -611,7 +611,7 @@ func (s *testPessimisticSuite) TestWaitLockKill(c *C) {
_, err := tk2.Exec("update test_kill set c = c + 1 where id = 1")
wg.Done()
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, tikvstore.ErrQueryInterrupted), IsTrue)
c.Assert(terror.ErrorEqual(err, tikverr.ErrQueryInterrupted), IsTrue)
tk.MustExec("rollback")
}

Expand Down Expand Up @@ -733,10 +733,10 @@ func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) {

timeoutErr := <-timeoutErrCh
c.Assert(timeoutErr, NotNil)
c.Assert(timeoutErr.Error(), Equals, tikvstore.ErrLockWaitTimeout.Error())
c.Assert(timeoutErr.Error(), Equals, tikverr.ErrLockWaitTimeout.Error())
timeoutErr = <-timeoutErrCh
c.Assert(timeoutErr, NotNil)
c.Assert(timeoutErr.Error(), Equals, tikvstore.ErrLockWaitTimeout.Error())
c.Assert(timeoutErr.Error(), Equals, tikverr.ErrLockWaitTimeout.Error())

// tk4 lock c1 = 2
tk4.MustExec("begin pessimistic")
Expand All @@ -749,7 +749,7 @@ func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) {
_, err := tk2.Exec("delete from tk where c1 = 2")
c.Check(time.Since(start), GreaterEqual, 1000*time.Millisecond)
c.Check(time.Since(start), Less, 3000*time.Millisecond) // unit test diff should not be too big
c.Check(err.Error(), Equals, tikvstore.ErrLockWaitTimeout.Error())
c.Check(err.Error(), Equals, tikverr.ErrLockWaitTimeout.Error())

tk4.MustExec("commit")

Expand All @@ -767,7 +767,7 @@ func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) {
_, err = tk2.Exec("delete from tk where c1 = 3") // tk2 tries to lock c1 = 3 fail, this delete should be rollback, but previous update should be keeped
c.Check(time.Since(start), GreaterEqual, 1000*time.Millisecond)
c.Check(time.Since(start), Less, 3000*time.Millisecond) // unit test diff should not be too big
c.Check(err.Error(), Equals, tikvstore.ErrLockWaitTimeout.Error())
c.Check(err.Error(), Equals, tikverr.ErrLockWaitTimeout.Error())

tk2.MustExec("commit")
tk3.MustExec("commit")
Expand Down Expand Up @@ -841,7 +841,7 @@ func (s *testPessimisticSuite) TestInnodbLockWaitTimeoutWaitStart(c *C) {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/PessimisticLockErrWriteConflict"), IsNil)
waitErr := <-done
c.Assert(waitErr, NotNil)
c.Check(waitErr.Error(), Equals, tikvstore.ErrLockWaitTimeout.Error())
c.Check(waitErr.Error(), Equals, tikverr.ErrLockWaitTimeout.Error())
c.Check(duration, GreaterEqual, 1000*time.Millisecond)
c.Check(duration, LessEqual, 3000*time.Millisecond)
tk2.MustExec("rollback")
Expand Down Expand Up @@ -1131,11 +1131,11 @@ func (s *testPessimisticSuite) TestPessimisticLockNonExistsKey(c *C) {

tk1.MustExec("begin pessimistic")
err := tk1.ExecToErr("select * from t where k = 2 for update nowait")
c.Check(tikvstore.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
c.Check(tikverr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
err = tk1.ExecToErr("select * from t where k = 4 for update nowait")
c.Check(tikvstore.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
c.Check(tikverr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
err = tk1.ExecToErr("select * from t where k = 7 for update nowait")
c.Check(tikvstore.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
c.Check(tikverr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
tk.MustExec("rollback")
tk1.MustExec("rollback")

Expand All @@ -1147,9 +1147,9 @@ func (s *testPessimisticSuite) TestPessimisticLockNonExistsKey(c *C) {

tk1.MustExec("begin pessimistic")
err = tk1.ExecToErr("select * from t where k = 2 for update nowait")
c.Check(tikvstore.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
c.Check(tikverr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
err = tk1.ExecToErr("select * from t where k = 6 for update nowait")
c.Check(tikvstore.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
c.Check(tikverr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
tk.MustExec("rollback")
tk1.MustExec("rollback")
}
Expand Down Expand Up @@ -1279,10 +1279,10 @@ func (s *testPessimisticSuite) TestBatchPointGetLockIndex(c *C) {
tk2.MustExec("begin pessimistic")
err := tk2.ExecToErr("insert into t1 values(2, 2, 2)")
c.Assert(err, NotNil)
c.Assert(tikvstore.ErrLockWaitTimeout.Equal(err), IsTrue)
c.Assert(tikverr.ErrLockWaitTimeout.Equal(err), IsTrue)
err = tk2.ExecToErr("select * from t1 where c2 = 3 for update nowait")
c.Assert(err, NotNil)
c.Assert(tikvstore.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
c.Assert(tikverr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
tk.MustExec("rollback")
tk2.MustExec("rollback")
}
Expand Down Expand Up @@ -1429,12 +1429,12 @@ func (s *testPessimisticSuite) TestGenerateColPointGet(c *C) {
tk2.MustExec("begin pessimistic")
err := tk2.ExecToErr("select * from tu where z = 3 for update nowait")
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, tikvstore.ErrLockAcquireFailAndNoWaitSet), IsTrue)
c.Assert(terror.ErrorEqual(err, tikverr.ErrLockAcquireFailAndNoWaitSet), IsTrue)
tk.MustExec("begin pessimistic")
tk.MustExec("insert into tu(x, y) values(2, 2);")
err = tk2.ExecToErr("select * from tu where z = 4 for update nowait")
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, tikvstore.ErrLockAcquireFailAndNoWaitSet), IsTrue)
c.Assert(terror.ErrorEqual(err, tikverr.ErrLockAcquireFailAndNoWaitSet), IsTrue)

// test batch point get lock
tk.MustExec("begin pessimistic")
Expand All @@ -1443,12 +1443,12 @@ func (s *testPessimisticSuite) TestGenerateColPointGet(c *C) {
tk2.MustExec("begin pessimistic")
err = tk2.ExecToErr("select x from tu where z in (3, 7, 9) for update nowait")
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, tikvstore.ErrLockAcquireFailAndNoWaitSet), IsTrue)
c.Assert(terror.ErrorEqual(err, tikverr.ErrLockAcquireFailAndNoWaitSet), IsTrue)
tk.MustExec("begin pessimistic")
tk.MustExec("insert into tu(x, y) values(5, 6);")
err = tk2.ExecToErr("select * from tu where z = 11 for update nowait")
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, tikvstore.ErrLockAcquireFailAndNoWaitSet), IsTrue)
c.Assert(terror.ErrorEqual(err, tikverr.ErrLockAcquireFailAndNoWaitSet), IsTrue)

tk.MustExec("commit")
tk2.MustExec("commit")
Expand Down Expand Up @@ -1996,11 +1996,11 @@ func (s *testPessimisticSuite) TestSelectForUpdateWaitSeconds(c *C) {
waitErr2 := <-errCh
waitErr3 := <-errCh
c.Assert(waitErr, NotNil)
c.Check(waitErr.Error(), Equals, tikvstore.ErrLockWaitTimeout.Error())
c.Check(waitErr.Error(), Equals, tikverr.ErrLockWaitTimeout.Error())
c.Assert(waitErr2, NotNil)
c.Check(waitErr2.Error(), Equals, tikvstore.ErrLockWaitTimeout.Error())
c.Check(waitErr2.Error(), Equals, tikverr.ErrLockWaitTimeout.Error())
c.Assert(waitErr3, NotNil)
c.Check(waitErr3.Error(), Equals, tikvstore.ErrLockWaitTimeout.Error())
c.Check(waitErr3.Error(), Equals, tikverr.ErrLockWaitTimeout.Error())
c.Assert(time.Since(start).Seconds(), Less, 45.0)
tk2.MustExec("commit")
tk3.MustExec("rollback")
Expand Down
5 changes: 3 additions & 2 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
Expand Down Expand Up @@ -268,7 +269,7 @@ func (b *batchCopIterator) recvFromRespCh(ctx context.Context) (resp *batchCopRe
return
case <-ticker.C:
if atomic.LoadUint32(b.vars.Killed) == 1 {
resp = &batchCopResponse{err: tikvstore.ErrQueryInterrupted}
resp = &batchCopResponse{err: tikverr.ErrQueryInterrupted}
ok = true
return
}
Expand Down Expand Up @@ -394,7 +395,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b
} else {
logutil.BgLogger().Info("stream unknown error", zap.Error(err))
}
return tikvstore.ErrTiFlashServerTimeout
return tikverr.ErrTiFlashServerTimeout
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions store/copr/batch_request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/kv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -72,7 +72,7 @@ func (ss *RegionBatchRequestSender) onSendFail(bo *tikv.Backoffer, ctxs []copTas
if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled {
return errors.Trace(err)
} else if atomic.LoadUint32(&tikv.ShuttingDown) > 0 {
return kv.ErrTiDBShuttingDown
return tikverr.ErrTiDBShuttingDown
}

for _, failedCtx := range ctxs {
Expand Down
8 changes: 4 additions & 4 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/pingcap/tidb/kv"
tidbmetrics "github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand Down Expand Up @@ -476,7 +476,7 @@ func (it *copIterator) recvFromRespCh(ctx context.Context, respCh <-chan *copRes
return
case <-ticker.C:
if atomic.LoadUint32(it.vars.Killed) == 1 {
resp = &copResponse{err: tikvstore.ErrQueryInterrupted}
resp = &copResponse{err: tikverr.ErrQueryInterrupted}
ok = true
return
}
Expand Down Expand Up @@ -980,11 +980,11 @@ type CopRuntimeStats struct {
func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, ch chan<- *copResponse) error {
errCode := errno.ErrUnknown
errMsg := err.Error()
if terror.ErrorEqual(err, tikvstore.ErrTiKVServerTimeout) {
if terror.ErrorEqual(err, tikverr.ErrTiKVServerTimeout) {
errCode = errno.ErrTiKVServerTimeout
errMsg = "TiDB server timeout, address is " + task.storeAddr
}
if terror.ErrorEqual(err, tikvstore.ErrTiFlashServerTimeout) {
if terror.ErrorEqual(err, tikverr.ErrTiFlashServerTimeout) {
errCode = errno.ErrTiFlashServerTimeout
errMsg = "TiDB server timeout, address is " + task.storeAddr
}
Expand Down
14 changes: 7 additions & 7 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"go.uber.org/zap"
Expand Down Expand Up @@ -225,7 +225,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer,
if sender.GetRPCError() != nil {
logutil.BgLogger().Error("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error()))
// we return timeout to trigger tikv's fallback
m.sendError(tikvstore.ErrTiFlashServerTimeout)
m.sendError(tikverr.ErrTiFlashServerTimeout)
return
}
} else {
Expand All @@ -235,7 +235,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer,
if err != nil {
logutil.BgLogger().Error("mpp dispatch meet error", zap.String("error", err.Error()))
// we return timeout to trigger tikv's fallback
m.sendError(tikvstore.ErrTiFlashServerTimeout)
m.sendError(tikverr.ErrTiFlashServerTimeout)
return
}

Expand All @@ -248,7 +248,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer,
failpoint.Inject("mppNonRootTaskError", func(val failpoint.Value) {
if val.(bool) && !req.IsRoot {
time.Sleep(1 * time.Second)
m.sendError(tikvstore.ErrTiFlashServerTimeout)
m.sendError(tikverr.ErrTiFlashServerTimeout)
return
}
})
Expand Down Expand Up @@ -311,7 +311,7 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR
if err != nil {
logutil.BgLogger().Error("establish mpp connection meet error", zap.String("error", err.Error()))
// we return timeout to trigger tikv's fallback
m.sendError(tikvstore.ErrTiFlashServerTimeout)
m.sendError(tikverr.ErrTiFlashServerTimeout)
return
}

Expand Down Expand Up @@ -343,7 +343,7 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR
logutil.BgLogger().Info("stream unknown error", zap.Error(err))
}
}
m.sendError(tikvstore.ErrTiFlashServerTimeout)
m.sendError(tikverr.ErrTiFlashServerTimeout)
return
}
}
Expand Down Expand Up @@ -398,7 +398,7 @@ func (m *mppIterator) nextImpl(ctx context.Context) (resp *mppResponse, ok bool,
return
case <-ticker.C:
if m.vars != nil && m.vars.Killed != nil && atomic.LoadUint32(m.vars.Killed) == 1 {
err = tikvstore.ErrQueryInterrupted
err = tikverr.ErrQueryInterrupted
exit = true
return
}
Expand Down
Loading

0 comments on commit 03380ec

Please sign in to comment.