From cb2e3fcdff981135509ec809235581f87d1123e7 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Wed, 3 Mar 2021 14:50:11 +0800 Subject: [PATCH 1/6] store/copr: polish the tiflash-tikv fallback function. --- server/conn_test.go | 99 ++++++++++++++++++++---------- store/copr/batch_coprocessor.go | 12 +--- store/copr/batch_request_sender.go | 2 +- store/copr/mpp.go | 8 ++- store/mockstore/unistore/rpc.go | 15 +++++ 5 files changed, 91 insertions(+), 45 deletions(-) diff --git a/server/conn_test.go b/server/conn_test.go index 63087533afdc2..960d85c333251 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -23,14 +23,19 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/mockstore/cluster" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util/arena" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" @@ -46,7 +51,19 @@ var _ = SerialSuites(&ConnTestSuite{}) func (ts *ConnTestSuite) SetUpSuite(c *C) { testleak.BeforeTest() var err error - ts.store, err = mockstore.NewMockStore() + ts.store, err = mockstore.NewMockStore( + mockstore.WithClusterInspector(func(c cluster.Cluster) { + mockCluster := c.(*unistore.Cluster) + _, _, region1 := mockstore.BootstrapWithSingleStore(c) + tiflashIdx := 0 + store := c.AllocID() + peer := c.AllocID() + mockCluster.AddStore(store, "tiflash0", &metapb.StoreLabel{Key: "engine", Value: "tiflash"}) + mockCluster.AddPeer(region1, store, peer) + tiflashIdx++ + }), + mockstore.WithStoreType(mockstore.EmbedUnistore), + ) c.Assert(err, IsNil) ts.dom, err = session.BootstrapSession(ts.store) c.Assert(err, IsNil) @@ -712,8 +729,17 @@ func (ts *ConnTestSuite) TestPrefetchPointKeys(c *C) { tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 3", "2 2 6", "3 3 5")) } -func (ts *ConnTestSuite) TestFallbackToTiKVWhenTiFlashIsDown(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/copr/errorMockTiFlashServerTimeout", "return(true)"), IsNil) +func testGetTableByName(c *C, ctx sessionctx.Context, db, table string) table.Table { + dom := domain.GetDomain(ctx) + // Make sure the table schema is the new schema. + err := dom.Reload() + c.Assert(err, IsNil) + tbl, err := dom.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(table)) + c.Assert(err, IsNil) + return tbl +} + +func (ts *ConnTestSuite) TestTiFlashFallback(c *C) { cc := &clientConn{ alloc: arena.NewAllocator(1024), pkt: &packetIO{ @@ -723,37 +749,48 @@ func (ts *ConnTestSuite) TestFallbackToTiKVWhenTiFlashIsDown(c *C) { tk := testkit.NewTestKitWithInit(c, ts.store) cc.ctx = &TiDBContext{Session: tk.Se, stmts: make(map[int]*TiDBStatement)} - tk.MustExec("set @@session.tidb_enable_tiflash_fallback_tikv = 1") + tk.MustExec("use test") tk.MustExec("drop table if exists t") - tk.MustExec("create table t (a int, b int)") - tk.MustExec("insert into t values (3, 4), (6, 7), (9, 10)") - - // Create virtual tiflash replica info. - dom := domain.GetDomain(tk.Se) - is := dom.InfoSchema() - db, exists := is.SchemaByName(model.NewCIStr("test")) - c.Assert(exists, IsTrue) - for _, tblInfo := range db.Tables { - if tblInfo.Name.L == "t" { - tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ - Count: 1, - Available: true, - } - } - } - - tk.MustQuery("explain select sum(a) from t").Check(testkit.Rows( - "StreamAgg_20 1.00 root funcs:sum(Column#6)->Column#4", - "└─TableReader_21 1.00 root data:StreamAgg_8", - " └─StreamAgg_8 1.00 cop[tiflash] funcs:sum(test.t.a)->Column#6", - " └─TableFullScan_19 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo")) - - ctx := context.Background() - c.Assert(cc.handleQuery(ctx, "select sum(a) from t"), IsNil) - tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout")) + tk.MustExec("create table t(a int not null primary key, b int not null)") + tk.MustExec("alter table t set tiflash replica 1") + tb := testGetTableByName(c, tk.Se, "test", "t") + err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) + c.Assert(err, IsNil) + tk.MustExec("insert into t values(1,0)") + tk.MustExec("insert into t values(2,0)") + tk.MustExec("insert into t values(3,0)") + tk.MustQuery("select count(*) from t").Check(testkit.Rows("3")) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"), IsNil) + // test batch cop send req error + testFallbackWork(c, tk, cc, "select sum(a) from t") + ctx:=context.Background() c.Assert(cc.handleStmtPrepare(ctx, "select sum(a) from t"), IsNil) c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), IsNil) + tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout")) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0"), IsNil) + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/batchCopRecvTimeout", "return(true)"), IsNil) + testFallbackWork(c, tk, cc, "select sum(a) from t") + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/batchCopRecvTimeout"), IsNil) - c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/copr/errorMockTiFlashServerTimeout"), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout", "return(true)"), IsNil) + tk.MustExec("set @@session.tidb_allow_mpp=1") + testFallbackWork(c, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a") + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout"), IsNil) + + //c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppRecvTimeout", "return(true)"), IsNil) + //tk.MustExec("set @@session.tidb_allow_mpp=1") + //testFallbackWork(c, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a") + //c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppRecvTimeout"), IsNil) } + +func testFallbackWork(c *C, tk *testkit.TestKit, cc *clientConn, sql string) { + ctx := context.Background() + tk.MustExec("set @@session.tidb_enable_tiflash_fallback_tikv = 0") + c.Assert(tk.QueryToErr(sql), NotNil) + tk.MustExec("set @@session.tidb_enable_tiflash_fallback_tikv = 1") + + c.Assert(cc.handleQuery(ctx, sql), IsNil) + tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout")) +} \ No newline at end of file diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 5dc9aa488b298..e5ce311030425 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -21,7 +21,6 @@ import ( "time" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" @@ -149,15 +148,6 @@ func buildBatchCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tik if needRetry { // Backoff once for each retry. err = bo.Backoff(tikv.BoRegionMiss, errors.New("Cannot find region with TiFlash peer")) - // Actually ErrRegionUnavailable would be thrown out rather than ErrTiFlashServerTimeout. However, since currently - // we don't have MockTiFlash, we inject ErrTiFlashServerTimeout to simulate the situation that TiFlash is down. - if storeType == kv.TiFlash { - failpoint.Inject("errorMockTiFlashServerTimeout", func(val failpoint.Value) { - if val.(bool) { - failpoint.Return(nil, errors.Trace(tikv.ErrTiFlashServerTimeout)) - } - }) - } if err != nil { return nil, errors.Trace(err) } @@ -401,7 +391,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b } else { logutil.BgLogger().Info("stream unknown error", zap.Error(err)) } - return errors.Trace(err) + return tikv.ErrTiFlashServerTimeout } } } diff --git a/store/copr/batch_request_sender.go b/store/copr/batch_request_sender.go index 8dd0cbb237468..7865380f56ee5 100644 --- a/store/copr/batch_request_sender.go +++ b/store/copr/batch_request_sender.go @@ -85,6 +85,6 @@ func (ss *RegionBatchRequestSender) onSendFail(bo *tikv.Backoffer, ctxs []copTas // When a store is not available, the leader of related region should be elected quickly. // TODO: the number of retry time should be limited:since region may be unavailable // when some unrecoverable disaster happened. - err = bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctxs: %v, try next peer later", err, ctxs)) + err = bo.Backoff(tikv.BoTiFlashRPC, errors.Errorf("send tikv request error: %v, ctxs: %v, try next peer later", err, ctxs)) return errors.Trace(err) } diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 46b0490845e9e..0837413d6db63 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -201,7 +201,9 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, // TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling. // That's a hard job but we can try it in the future. if sender.GetRPCError() != nil { - m.sendError(sender.GetRPCError()) + logutil.BgLogger().Error("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error())) + // we return timeout to trigger tikv's fallback + m.sendError(tikv.ErrTiFlashServerTimeout) return } } else { @@ -209,7 +211,9 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, } if err != nil { - m.sendError(err) + logutil.BgLogger().Error("mpp dispatch meet error", zap.String("error", err.Error())) + // we return timeout to trigger tikv's fallback + m.sendError(tikv.ErrTiFlashServerTimeout) return } diff --git a/store/mockstore/unistore/rpc.go b/store/mockstore/unistore/rpc.go index fe88a286d3c2c..9cd69d2bc80e2 100644 --- a/store/mockstore/unistore/rpc.go +++ b/store/mockstore/unistore/rpc.go @@ -242,6 +242,11 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R case tikvrpc.CmdMPPConn: resp.Resp, err = c.handleEstablishMPPConnection(ctx, req.EstablishMPPConn(), timeout, storeID) case tikvrpc.CmdMPPTask: + failpoint.Inject("mppDispatchTimeout", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(nil, errors.New("rpc error")) + } + }) resp.Resp, err = c.handleDispatchMPPTask(ctx, req.DispatchMPPTask(), storeID) case tikvrpc.CmdMvccGetByKey: resp.Resp, err = c.usSvr.MvccGetByKey(ctx, req.MvccGetByKey()) @@ -455,6 +460,11 @@ func (mock *mockBatchCopClient) Recv() (*coprocessor.BatchResponse, error) { } return ret, err } + failpoint.Inject("batchCopRecvTimeout", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(nil, context.Canceled) + } + }) return nil, io.EOF } @@ -470,6 +480,11 @@ func (mock *mockMPPConnectionClient) Recv() (*mpp.MPPDataPacket, error) { mock.idx++ return ret, nil } + failpoint.Inject("mppRecvTimeout", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(nil, context.Canceled) + } + }) return nil, io.EOF } From b8f8987b1bbd707585160e2b2222132e620d4294 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Wed, 3 Mar 2021 16:55:14 +0800 Subject: [PATCH 2/6] store/copr: polish the tiflash-tikv fallback function. --- go.mod | 4 ++-- go.sum | 4 ++++ server/conn_test.go | 22 +++++++++---------- store/copr/mpp.go | 2 +- store/mockstore/unistore/cophandler/mpp.go | 4 ++-- .../mockstore/unistore/cophandler/mpp_exec.go | 21 ++++++++++++++++-- store/mockstore/unistore/rpc.go | 6 +++-- store/tikv/region_request_test.go | 4 ++++ 8 files changed, 47 insertions(+), 20 deletions(-) diff --git a/go.mod b/go.mod index 780e508eced80..5d37c9c29e438 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef - github.com/ngaut/unistore v0.0.0-20210219030914-d0fb1ee6f3d2 + github.com/ngaut/unistore v0.0.0-20210303083724-410ac25eb133 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/opentracing/basictracer-go v1.0.0 github.com/opentracing/opentracing-go v1.1.0 @@ -44,7 +44,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6 + github.com/pingcap/kvproto v0.0.0-20210223121704-3cd2fc5fad22 github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 github.com/pingcap/parser v0.0.0-20210301085222-267e02a2fc12 github.com/pingcap/sysutil v0.0.0-20210221112134-a07bda3bde99 diff --git a/go.sum b/go.sum index 0b8b4fa16772e..3bc479e9780d1 100644 --- a/go.sum +++ b/go.sum @@ -355,6 +355,8 @@ github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3 github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8= github.com/ngaut/unistore v0.0.0-20210219030914-d0fb1ee6f3d2 h1:Vx3qsoBtFHSQ5GTARXRh1AwNRVJ8SXaedLzIohnxClE= github.com/ngaut/unistore v0.0.0-20210219030914-d0fb1ee6f3d2/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4= +github.com/ngaut/unistore v0.0.0-20210303083724-410ac25eb133 h1:viz4pREs1mTaZFkjv+N3Gz5OSRkXSVYWVG+63YigO3M= +github.com/ngaut/unistore v0.0.0-20210303083724-410ac25eb133/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4= github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= @@ -409,6 +411,8 @@ github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLy github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6 h1:lNGXD00uNXOKMM2pnTe9XvUv3IOEOtFhqNQljlTDZKc= github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20210223121704-3cd2fc5fad22 h1:O95vOUHHmAcjdw01D233Cvn5YsxsBDBCMGb3RZcHzgM= +github.com/pingcap/kvproto v0.0.0-20210223121704-3cd2fc5fad22/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/server/conn_test.go b/server/conn_test.go index 960d85c333251..84a3862a8cd62 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -56,11 +56,11 @@ func (ts *ConnTestSuite) SetUpSuite(c *C) { mockCluster := c.(*unistore.Cluster) _, _, region1 := mockstore.BootstrapWithSingleStore(c) tiflashIdx := 0 - store := c.AllocID() - peer := c.AllocID() - mockCluster.AddStore(store, "tiflash0", &metapb.StoreLabel{Key: "engine", Value: "tiflash"}) - mockCluster.AddPeer(region1, store, peer) - tiflashIdx++ + store := c.AllocID() + peer := c.AllocID() + mockCluster.AddStore(store, "tiflash0", &metapb.StoreLabel{Key: "engine", Value: "tiflash"}) + mockCluster.AddPeer(region1, store, peer) + tiflashIdx++ }), mockstore.WithStoreType(mockstore.EmbedUnistore), ) @@ -763,7 +763,7 @@ func (ts *ConnTestSuite) TestTiFlashFallback(c *C) { c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"), IsNil) // test batch cop send req error testFallbackWork(c, tk, cc, "select sum(a) from t") - ctx:=context.Background() + ctx := context.Background() c.Assert(cc.handleStmtPrepare(ctx, "select sum(a) from t"), IsNil) c.Assert(cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}), IsNil) @@ -779,10 +779,10 @@ func (ts *ConnTestSuite) TestTiFlashFallback(c *C) { testFallbackWork(c, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a") c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout"), IsNil) - //c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppRecvTimeout", "return(true)"), IsNil) - //tk.MustExec("set @@session.tidb_allow_mpp=1") - //testFallbackWork(c, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a") - //c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppRecvTimeout"), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppRecvTimeout", "return(-1)"), IsNil) + tk.MustExec("set @@session.tidb_allow_mpp=1") + testFallbackWork(c, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a") + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppRecvTimeout"), IsNil) } func testFallbackWork(c *C, tk *testkit.TestKit, cc *clientConn, sql string) { @@ -793,4 +793,4 @@ func testFallbackWork(c *C, tk *testkit.TestKit, cc *clientConn, sql string) { c.Assert(cc.handleQuery(ctx, sql), IsNil) tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout")) -} \ No newline at end of file +} diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 0837413d6db63..da008937f65cf 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -281,7 +281,7 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR } } m.sendToRespCh(&mppResponse{ - err: errors.New(resp.Error.Msg), + err: tikv.ErrTiFlashServerTimeout, }) return } diff --git a/store/mockstore/unistore/cophandler/mpp.go b/store/mockstore/unistore/cophandler/mpp.go index dcb189161cbad..7e89550e24f87 100644 --- a/store/mockstore/unistore/cophandler/mpp.go +++ b/store/mockstore/unistore/cophandler/mpp.go @@ -309,11 +309,11 @@ func HandleMPPDAGReq(dbReader *dbreader.DBReader, req *coprocessor.Request, mppC } err = mppExec.open() if err != nil { - return &coprocessor.Response{OtherError: err.Error()} + panic("open phase find error: " + err.Error()) } _, err = mppExec.next() if err != nil { - return &coprocessor.Response{OtherError: err.Error()} + panic("running phase find error: " + err.Error()) } return &coprocessor.Response{} } diff --git a/store/mockstore/unistore/cophandler/mpp_exec.go b/store/mockstore/unistore/cophandler/mpp_exec.go index 806b7e26eaf66..4ee68707439c2 100644 --- a/store/mockstore/unistore/cophandler/mpp_exec.go +++ b/store/mockstore/unistore/cophandler/mpp_exec.go @@ -209,9 +209,14 @@ type exchRecvExec struct { lock sync.Mutex wg sync.WaitGroup err error + inited bool } func (e *exchRecvExec) open() error { + return nil +} + +func (e *exchRecvExec) init() error { e.chk = chunk.NewChunkWithCapacity(e.fieldTypes, 0) serverMetas := make([]*mpp.TaskMeta, 0, len(e.exchangeReceiver.EncodedTaskMeta)) for _, encodedMeta := range e.exchangeReceiver.EncodedTaskMeta { @@ -231,6 +236,12 @@ func (e *exchRecvExec) open() error { } func (e *exchRecvExec) next() (*chunk.Chunk, error) { + if !e.inited { + e.inited = true + if err := e.init(); err != nil { + return nil, err + } + } if e.chk != nil { defer func() { e.chk = nil @@ -326,6 +337,7 @@ type joinExec struct { idx int reservedRows []chunk.Row + inited bool } func (e *joinExec) buildHashTable() error { @@ -399,11 +411,16 @@ func (e *joinExec) open() error { if err != nil { return errors.Trace(err) } - err = e.buildHashTable() - return errors.Trace(err) + return nil } func (e *joinExec) next() (*chunk.Chunk, error) { + if !e.inited { + e.inited = true + if err := e.buildHashTable(); err != nil { + return nil, err + } + } for { if e.idx < len(e.reservedRows) { idx := e.idx diff --git a/store/mockstore/unistore/rpc.go b/store/mockstore/unistore/rpc.go index 9cd69d2bc80e2..52bdc5e34a513 100644 --- a/store/mockstore/unistore/rpc.go +++ b/store/mockstore/unistore/rpc.go @@ -297,7 +297,7 @@ func (c *RPCClient) handleEstablishMPPConnection(ctx context.Context, r *mpp.Est if err != nil { return nil, err } - var mockClient = mockMPPConnectionClient{mppResponses: mockServer.mppResponses, idx: 0} + var mockClient = mockMPPConnectionClient{mppResponses: mockServer.mppResponses, idx: 0, targetTask: r.ReceiverMeta} streamResp := &tikvrpc.MPPStreamResponse{Tikv_EstablishMPPConnectionClient: &mockClient} _, cancel := context.WithCancel(ctx) streamResp.Lease.Cancel = cancel @@ -472,6 +472,8 @@ type mockMPPConnectionClient struct { mockClientStream mppResponses []*mpp.MPPDataPacket idx int + + targetTask *mpp.TaskMeta } func (mock *mockMPPConnectionClient) Recv() (*mpp.MPPDataPacket, error) { @@ -481,7 +483,7 @@ func (mock *mockMPPConnectionClient) Recv() (*mpp.MPPDataPacket, error) { return ret, nil } failpoint.Inject("mppRecvTimeout", func(val failpoint.Value) { - if val.(bool) { + if int64(val.(int)) == mock.targetTask.TaskId { failpoint.Return(nil, context.Canceled) } }) diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index 555049d18242a..d2df27b16e8c7 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -493,6 +493,10 @@ func (s *mockTikvGrpcServer) CheckLeader(context.Context, *kvrpcpb.CheckLeaderRe return nil, errors.New("unreachable") } +func (s *mockTikvGrpcServer) RawGetKeyTTL(ctx context.Context, req *kvrpcpb.RawGetKeyTTLRequest) (*kvrpcpb.RawGetKeyTTLResponse, error) { + return nil, errors.New("unreachable") +} + func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled(c *C) { // prepare a mock tikv grpc server addr := "localhost:56341" From 0b413611ff7844ecfa9896c2d446e04344f981d0 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Wed, 3 Mar 2021 17:25:05 +0800 Subject: [PATCH 3/6] fix make dev --- go.sum | 2 -- server/conn_test.go | 2 -- 2 files changed, 4 deletions(-) diff --git a/go.sum b/go.sum index 3bc479e9780d1..22ef2723c1307 100644 --- a/go.sum +++ b/go.sum @@ -353,8 +353,6 @@ github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 h1:7KAv7KMGTTqSmYZtNdc github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8n5lC/jjNEhwcjbRDpl5NT7n2h+4UNcI= github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k= github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8= -github.com/ngaut/unistore v0.0.0-20210219030914-d0fb1ee6f3d2 h1:Vx3qsoBtFHSQ5GTARXRh1AwNRVJ8SXaedLzIohnxClE= -github.com/ngaut/unistore v0.0.0-20210219030914-d0fb1ee6f3d2/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4= github.com/ngaut/unistore v0.0.0-20210303083724-410ac25eb133 h1:viz4pREs1mTaZFkjv+N3Gz5OSRkXSVYWVG+63YigO3M= github.com/ngaut/unistore v0.0.0-20210303083724-410ac25eb133/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4= github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q= diff --git a/server/conn_test.go b/server/conn_test.go index 84a3862a8cd62..6003633517b54 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -55,12 +55,10 @@ func (ts *ConnTestSuite) SetUpSuite(c *C) { mockstore.WithClusterInspector(func(c cluster.Cluster) { mockCluster := c.(*unistore.Cluster) _, _, region1 := mockstore.BootstrapWithSingleStore(c) - tiflashIdx := 0 store := c.AllocID() peer := c.AllocID() mockCluster.AddStore(store, "tiflash0", &metapb.StoreLabel{Key: "engine", Value: "tiflash"}) mockCluster.AddPeer(region1, store, peer) - tiflashIdx++ }), mockstore.WithStoreType(mockstore.EmbedUnistore), ) From d9d51ed399751473a66d73244b3870acf15a8227 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Thu, 4 Mar 2021 12:34:31 +0800 Subject: [PATCH 4/6] address comments --- server/conn_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/conn_test.go b/server/conn_test.go index 6003633517b54..8feb24d4692c8 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -747,7 +747,6 @@ func (ts *ConnTestSuite) TestTiFlashFallback(c *C) { tk := testkit.NewTestKitWithInit(c, ts.store) cc.ctx = &TiDBContext{Session: tk.Se, stmts: make(map[int]*TiDBStatement)} - tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int not null primary key, b int not null)") tk.MustExec("alter table t set tiflash replica 1") From 8e259b0697563a566abc2b5001d7770100eb727d Mon Sep 17 00:00:00 2001 From: Han Fei Date: Thu, 4 Mar 2021 17:33:52 +0800 Subject: [PATCH 5/6] fix data race --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 6c6e7af121512..376e0582325d2 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef - github.com/ngaut/unistore v0.0.0-20210303083724-410ac25eb133 + github.com/ngaut/unistore v0.0.0-20210304092846-7881b7d3707d github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/opentracing/basictracer-go v1.0.0 github.com/opentracing/opentracing-go v1.1.0 diff --git a/go.sum b/go.sum index 31787fc2cadb6..7a22c93117352 100644 --- a/go.sum +++ b/go.sum @@ -355,8 +355,8 @@ github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 h1:7KAv7KMGTTqSmYZtNdc github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8n5lC/jjNEhwcjbRDpl5NT7n2h+4UNcI= github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k= github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8= -github.com/ngaut/unistore v0.0.0-20210303083724-410ac25eb133 h1:viz4pREs1mTaZFkjv+N3Gz5OSRkXSVYWVG+63YigO3M= -github.com/ngaut/unistore v0.0.0-20210303083724-410ac25eb133/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4= +github.com/ngaut/unistore v0.0.0-20210304092846-7881b7d3707d h1:KbZAPenFgGdIvFWqYxTSr/p18C9mNbMAMc2YzMS680A= +github.com/ngaut/unistore v0.0.0-20210304092846-7881b7d3707d/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4= github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= From 31af8ab8cfda57884cfe17f900e23523f21c0211 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Thu, 4 Mar 2021 18:54:48 +0800 Subject: [PATCH 6/6] fix --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 376e0582325d2..a061bc33f1fed 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef - github.com/ngaut/unistore v0.0.0-20210304092846-7881b7d3707d + github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/opentracing/basictracer-go v1.0.0 github.com/opentracing/opentracing-go v1.1.0 diff --git a/go.sum b/go.sum index 7a22c93117352..56dbf7cbe2398 100644 --- a/go.sum +++ b/go.sum @@ -355,8 +355,8 @@ github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 h1:7KAv7KMGTTqSmYZtNdc github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8n5lC/jjNEhwcjbRDpl5NT7n2h+4UNcI= github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k= github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8= -github.com/ngaut/unistore v0.0.0-20210304092846-7881b7d3707d h1:KbZAPenFgGdIvFWqYxTSr/p18C9mNbMAMc2YzMS680A= -github.com/ngaut/unistore v0.0.0-20210304092846-7881b7d3707d/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4= +github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb h1:2rGvEhflp/uK1l1rNUmoHA4CiHpbddHGxg52H71Fke8= +github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4= github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=