Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: polish the tiflash-tikv fallback function. #23078

Merged
merged 11 commits into from
Mar 4, 2021
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/kr/text v0.2.0 // indirect
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef
github.com/ngaut/unistore v0.0.0-20210219030914-d0fb1ee6f3d2
github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.1.0
Expand All @@ -44,7 +44,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6
github.com/pingcap/kvproto v0.0.0-20210223121704-3cd2fc5fad22
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8
github.com/pingcap/parser v0.0.0-20210303061548-f6776f61e268
github.com/pingcap/sysutil v0.0.0-20210221112134-a07bda3bde99
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,8 @@ github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 h1:7KAv7KMGTTqSmYZtNdc
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8n5lC/jjNEhwcjbRDpl5NT7n2h+4UNcI=
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k=
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8=
github.com/ngaut/unistore v0.0.0-20210219030914-d0fb1ee6f3d2 h1:Vx3qsoBtFHSQ5GTARXRh1AwNRVJ8SXaedLzIohnxClE=
github.com/ngaut/unistore v0.0.0-20210219030914-d0fb1ee6f3d2/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4=
github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb h1:2rGvEhflp/uK1l1rNUmoHA4CiHpbddHGxg52H71Fke8=
github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4=
github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
Expand Down Expand Up @@ -411,6 +411,8 @@ github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLy
github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6 h1:lNGXD00uNXOKMM2pnTe9XvUv3IOEOtFhqNQljlTDZKc=
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210223121704-3cd2fc5fad22 h1:O95vOUHHmAcjdw01D233Cvn5YsxsBDBCMGb3RZcHzgM=
github.com/pingcap/kvproto v0.0.0-20210223121704-3cd2fc5fad22/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
94 changes: 64 additions & 30 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/arena"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
Expand All @@ -46,7 +51,17 @@ var _ = SerialSuites(&ConnTestSuite{})
func (ts *ConnTestSuite) SetUpSuite(c *C) {
testleak.BeforeTest()
var err error
ts.store, err = mockstore.NewMockStore()
ts.store, err = mockstore.NewMockStore(
mockstore.WithClusterInspector(func(c cluster.Cluster) {
mockCluster := c.(*unistore.Cluster)
_, _, region1 := mockstore.BootstrapWithSingleStore(c)
store := c.AllocID()
peer := c.AllocID()
mockCluster.AddStore(store, "tiflash0", &metapb.StoreLabel{Key: "engine", Value: "tiflash"})
mockCluster.AddPeer(region1, store, peer)
}),
mockstore.WithStoreType(mockstore.EmbedUnistore),
)
c.Assert(err, IsNil)
ts.dom, err = session.BootstrapSession(ts.store)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -712,8 +727,17 @@ func (ts *ConnTestSuite) TestPrefetchPointKeys(c *C) {
tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 3", "2 2 6", "3 3 5"))
}

func (ts *ConnTestSuite) TestFallbackToTiKVWhenTiFlashIsDown(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/copr/errorMockTiFlashServerTimeout", "return(true)"), IsNil)
func testGetTableByName(c *C, ctx sessionctx.Context, db, table string) table.Table {
dom := domain.GetDomain(ctx)
// Make sure the table schema is the new schema.
err := dom.Reload()
c.Assert(err, IsNil)
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(table))
c.Assert(err, IsNil)
return tbl
}

func (ts *ConnTestSuite) TestTiFlashFallback(c *C) {
cc := &clientConn{
alloc: arena.NewAllocator(1024),
pkt: &packetIO{
Expand All @@ -723,37 +747,47 @@ func (ts *ConnTestSuite) TestFallbackToTiKVWhenTiFlashIsDown(c *C) {
tk := testkit.NewTestKitWithInit(c, ts.store)
cc.ctx = &TiDBContext{Session: tk.Se, stmts: make(map[int]*TiDBStatement)}

tk.MustExec("set @@session.tidb_enable_tiflash_fallback_tikv = 1")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int)")
tk.MustExec("insert into t values (3, 4), (6, 7), (9, 10)")

// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

tk.MustQuery("explain select sum(a) from t").Check(testkit.Rows(
"StreamAgg_20 1.00 root funcs:sum(Column#6)->Column#4",
"└─TableReader_21 1.00 root data:StreamAgg_8",
" └─StreamAgg_8 1.00 cop[tiflash] funcs:sum(test.t.a)->Column#6",
" └─TableFullScan_19 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo"))

tk.MustExec("create table t(a int not null primary key, b int not null)")
tk.MustExec("alter table t set tiflash replica 1")
tb := testGetTableByName(c, tk.Se, "test", "t")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("insert into t values(1,0)")
tk.MustExec("insert into t values(2,0)")
tk.MustExec("insert into t values(3,0)")
tk.MustQuery("select count(*) from t").Check(testkit.Rows("3"))
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"), IsNil)
// test batch cop send req error
testFallbackWork(c, tk, cc, "select sum(a) from t")
ctx := context.Background()
c.Assert(cc.handleQuery(ctx, "select sum(a) from t"), IsNil)
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
c.Assert(cc.handleStmtPrepare(ctx, "select sum(a) from t"), IsNil)
c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), IsNil)

tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0"), IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/batchCopRecvTimeout", "return(true)"), IsNil)
testFallbackWork(c, tk, cc, "select sum(a) from t")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/batchCopRecvTimeout"), IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout", "return(true)"), IsNil)
tk.MustExec("set @@session.tidb_allow_mpp=1")
testFallbackWork(c, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout"), IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppRecvTimeout", "return(-1)"), IsNil)
tk.MustExec("set @@session.tidb_allow_mpp=1")
testFallbackWork(c, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppRecvTimeout"), IsNil)
}

c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/copr/errorMockTiFlashServerTimeout"), IsNil)
func testFallbackWork(c *C, tk *testkit.TestKit, cc *clientConn, sql string) {
ctx := context.Background()
tk.MustExec("set @@session.tidb_enable_tiflash_fallback_tikv = 0")
c.Assert(tk.QueryToErr(sql), NotNil)
tk.MustExec("set @@session.tidb_enable_tiflash_fallback_tikv = 1")

c.Assert(cc.handleQuery(ctx, sql), IsNil)
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
}
12 changes: 1 addition & 11 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -149,15 +148,6 @@ func buildBatchCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tik
if needRetry {
// Backoff once for each retry.
err = bo.Backoff(tikv.BoRegionMiss, errors.New("Cannot find region with TiFlash peer"))
// Actually ErrRegionUnavailable would be thrown out rather than ErrTiFlashServerTimeout. However, since currently
// we don't have MockTiFlash, we inject ErrTiFlashServerTimeout to simulate the situation that TiFlash is down.
if storeType == kv.TiFlash {
failpoint.Inject("errorMockTiFlashServerTimeout", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, errors.Trace(tikv.ErrTiFlashServerTimeout))
}
})
}
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -401,7 +391,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b
} else {
logutil.BgLogger().Info("stream unknown error", zap.Error(err))
}
return errors.Trace(err)
return tikv.ErrTiFlashServerTimeout
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion store/copr/batch_request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,6 @@ func (ss *RegionBatchRequestSender) onSendFail(bo *tikv.Backoffer, ctxs []copTas
// When a store is not available, the leader of related region should be elected quickly.
// TODO: the number of retry time should be limited:since region may be unavailable
// when some unrecoverable disaster happened.
err = bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctxs: %v, try next peer later", err, ctxs))
err = bo.Backoff(tikv.BoTiFlashRPC, errors.Errorf("send tikv request error: %v, ctxs: %v, try next peer later", err, ctxs))
return errors.Trace(err)
}
10 changes: 7 additions & 3 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,19 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer,
// TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling.
// That's a hard job but we can try it in the future.
if sender.GetRPCError() != nil {
m.sendError(sender.GetRPCError())
logutil.BgLogger().Error("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error()))
// we return timeout to trigger tikv's fallback
m.sendError(tikv.ErrTiFlashServerTimeout)
return
}
} else {
rpcResp, err = m.store.GetTiKVClient().SendRequest(ctx, originalTask.storeAddr, wrappedReq, tikv.ReadTimeoutMedium)
}

if err != nil {
m.sendError(err)
logutil.BgLogger().Error("mpp dispatch meet error", zap.String("error", err.Error()))
// we return timeout to trigger tikv's fallback
m.sendError(tikv.ErrTiFlashServerTimeout)
return
}

Expand Down Expand Up @@ -277,7 +281,7 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR
}
}
m.sendToRespCh(&mppResponse{
err: errors.New(resp.Error.Msg),
err: tikv.ErrTiFlashServerTimeout,
})
return
}
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/unistore/cophandler/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,11 @@ func HandleMPPDAGReq(dbReader *dbreader.DBReader, req *coprocessor.Request, mppC
}
err = mppExec.open()
if err != nil {
return &coprocessor.Response{OtherError: err.Error()}
panic("open phase find error: " + err.Error())
}
_, err = mppExec.next()
if err != nil {
return &coprocessor.Response{OtherError: err.Error()}
panic("running phase find error: " + err.Error())
}
return &coprocessor.Response{}
}
Expand Down
21 changes: 19 additions & 2 deletions store/mockstore/unistore/cophandler/mpp_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,14 @@ type exchRecvExec struct {
lock sync.Mutex
wg sync.WaitGroup
err error
inited bool
}

func (e *exchRecvExec) open() error {
return nil
}

func (e *exchRecvExec) init() error {
e.chk = chunk.NewChunkWithCapacity(e.fieldTypes, 0)
serverMetas := make([]*mpp.TaskMeta, 0, len(e.exchangeReceiver.EncodedTaskMeta))
for _, encodedMeta := range e.exchangeReceiver.EncodedTaskMeta {
Expand All @@ -231,6 +236,12 @@ func (e *exchRecvExec) open() error {
}

func (e *exchRecvExec) next() (*chunk.Chunk, error) {
if !e.inited {
winoros marked this conversation as resolved.
Show resolved Hide resolved
e.inited = true
if err := e.init(); err != nil {
return nil, err
}
}
if e.chk != nil {
defer func() {
e.chk = nil
Expand Down Expand Up @@ -326,6 +337,7 @@ type joinExec struct {

idx int
reservedRows []chunk.Row
inited bool
}

func (e *joinExec) buildHashTable() error {
Expand Down Expand Up @@ -399,11 +411,16 @@ func (e *joinExec) open() error {
if err != nil {
return errors.Trace(err)
}
err = e.buildHashTable()
return errors.Trace(err)
return nil
}

func (e *joinExec) next() (*chunk.Chunk, error) {
if !e.inited {
e.inited = true
if err := e.buildHashTable(); err != nil {
return nil, err
}
}
for {
if e.idx < len(e.reservedRows) {
idx := e.idx
Expand Down
19 changes: 18 additions & 1 deletion store/mockstore/unistore/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
case tikvrpc.CmdMPPConn:
resp.Resp, err = c.handleEstablishMPPConnection(ctx, req.EstablishMPPConn(), timeout, storeID)
case tikvrpc.CmdMPPTask:
failpoint.Inject("mppDispatchTimeout", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, errors.New("rpc error"))
}
})
resp.Resp, err = c.handleDispatchMPPTask(ctx, req.DispatchMPPTask(), storeID)
case tikvrpc.CmdMvccGetByKey:
resp.Resp, err = c.usSvr.MvccGetByKey(ctx, req.MvccGetByKey())
Expand Down Expand Up @@ -292,7 +297,7 @@ func (c *RPCClient) handleEstablishMPPConnection(ctx context.Context, r *mpp.Est
if err != nil {
return nil, err
}
var mockClient = mockMPPConnectionClient{mppResponses: mockServer.mppResponses, idx: 0}
var mockClient = mockMPPConnectionClient{mppResponses: mockServer.mppResponses, idx: 0, targetTask: r.ReceiverMeta}
streamResp := &tikvrpc.MPPStreamResponse{Tikv_EstablishMPPConnectionClient: &mockClient}
_, cancel := context.WithCancel(ctx)
streamResp.Lease.Cancel = cancel
Expand Down Expand Up @@ -455,13 +460,20 @@ func (mock *mockBatchCopClient) Recv() (*coprocessor.BatchResponse, error) {
}
return ret, err
}
failpoint.Inject("batchCopRecvTimeout", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, context.Canceled)
}
})
return nil, io.EOF
}

type mockMPPConnectionClient struct {
mockClientStream
mppResponses []*mpp.MPPDataPacket
idx int

targetTask *mpp.TaskMeta
}

func (mock *mockMPPConnectionClient) Recv() (*mpp.MPPDataPacket, error) {
Expand All @@ -470,6 +482,11 @@ func (mock *mockMPPConnectionClient) Recv() (*mpp.MPPDataPacket, error) {
mock.idx++
return ret, nil
}
failpoint.Inject("mppRecvTimeout", func(val failpoint.Value) {
if int64(val.(int)) == mock.targetTask.TaskId {
failpoint.Return(nil, context.Canceled)
}
})
return nil, io.EOF
}

Expand Down
4 changes: 4 additions & 0 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,10 @@ func (s *mockTikvGrpcServer) CheckLeader(context.Context, *kvrpcpb.CheckLeaderRe
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) RawGetKeyTTL(ctx context.Context, req *kvrpcpb.RawGetKeyTTLRequest) (*kvrpcpb.RawGetKeyTTLResponse, error) {
return nil, errors.New("unreachable")
}

func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled(c *C) {
// prepare a mock tikv grpc server
addr := "localhost:56341"
Expand Down