Skip to content

Commit

Permalink
store: refine the error handling and retry mechanism for stale read (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
JmPotato committed Jun 3, 2021
1 parent d3de547 commit 69274d8
Show file tree
Hide file tree
Showing 14 changed files with 208 additions and 25 deletions.
1 change: 1 addition & 0 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
snapshot.SetOption(kv.TaskID, stmtCtx.TaskID)
snapshot.SetOption(kv.TxnScope, e.ctx.GetSessionVars().TxnCtx.TxnScope)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope {
Expand Down
1 change: 1 addition & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (e *PointGetExecutor) Open(context.Context) error {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
e.snapshot.SetOption(kv.TxnScope, e.ctx.GetSessionVars().TxnCtx.TxnScope)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
e.snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c
github.com/pingcap/kvproto v0.0.0-20210531063847-f42e582bf0bb
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/parser v0.0.0-20210525032559-c37778aff307
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c h1:cy87vgUJT0U4JuxC7R14PuwBrabI9fDawYhyKTbjOBQ=
github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210531063847-f42e582bf0bb h1:6isHwZRl1fc9i1Mggiza2iQcfvVvYAAerFIs5t9msXg=
github.com/pingcap/kvproto v0.0.0-20210531063847-f42e582bf0bb/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ type Request struct {
TaskID uint64
// TiDBServerID is the specified TiDB serverID to execute request. `0` means all TiDB instances.
TiDBServerID uint64
// TxnScope is the scope of the current txn.
TxnScope string
// IsStaleness indicates whether the request read staleness data
IsStaleness bool
// MatchStoreLabels indicates the labels the store should be matched
Expand Down
3 changes: 2 additions & 1 deletion store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,10 +713,11 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
if worker.kvclient.Stats == nil {
worker.kvclient.Stats = make(map[tikvrpc.CmdType]*tikv.RPCRuntimeStats)
}
req.TxnScope = worker.req.TxnScope
if worker.req.IsStaleness {
req.EnableStaleRead()
}
var ops []tikv.StoreSelectorOption
ops := make([]tikv.StoreSelectorOption, 0, 2)
if len(worker.req.MatchStoreLabels) > 0 {
ops = append(ops, tikv.WithMatchLabels(worker.req.MatchStoreLabels))
}
Expand Down
5 changes: 5 additions & 0 deletions store/mockstore/unistore/tikv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,11 @@ func (svr *Server) BatchCoprocessor(req *coprocessor.BatchRequest, batchCopServe
return nil
}

// RawCoprocessor implements implements the tikvpb.TikvServer interface.
func (svr *Server) RawCoprocessor(context.Context, *kvrpcpb.RawCoprocessorRequest) (*kvrpcpb.RawCoprocessorResponse, error) {
panic("unimplemented")
}

func (mrm *MockRegionManager) removeMPPTaskHandler(taskID int64, storeID uint64) error {
set := mrm.getMPPTaskSet(storeID)
if set == nil {
Expand Down
27 changes: 21 additions & 6 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,19 @@ func (r *RegionStore) follower(seed uint32, op *storeSelectorOp) AccessIndex {

// return next leader or follower store's index
func (r *RegionStore) kvPeer(seed uint32, op *storeSelectorOp) AccessIndex {
if op.leaderOnly {
return r.workTiKVIdx
}
candidates := make([]AccessIndex, 0, r.accessStoreNum(TiKVOnly))
for i := 0; i < r.accessStoreNum(TiKVOnly); i++ {
storeIdx, s := r.accessStore(TiKVOnly, AccessIndex(i))
if r.storeEpochs[storeIdx] != atomic.LoadUint32(&s.epoch) || !r.filterStoreCandidate(AccessIndex(i), op) {
accessIdx := AccessIndex(i)
storeIdx, s := r.accessStore(TiKVOnly, accessIdx)
if r.storeEpochs[storeIdx] != atomic.LoadUint32(&s.epoch) || !r.filterStoreCandidate(accessIdx, op) {
continue
}
candidates = append(candidates, AccessIndex(i))
candidates = append(candidates, accessIdx)
}
// If there is no candidates, send to current workTiKVIdx which generally is the leader.
if len(candidates) == 0 {
return r.workTiKVIdx
}
Expand Down Expand Up @@ -422,6 +427,8 @@ type RPCContext struct {
ProxyAccessIdx AccessIndex // valid when ProxyStore is not nil
ProxyAddr string // valid when ProxyStore is not nil
TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers.

tryTimes int
}

func (c *RPCContext) String() string {
Expand All @@ -438,16 +445,24 @@ func (c *RPCContext) String() string {
}

type storeSelectorOp struct {
labels []*metapb.StoreLabel
leaderOnly bool
labels []*metapb.StoreLabel
}

// StoreSelectorOption configures storeSelectorOp.
type StoreSelectorOption func(*storeSelectorOp)

// WithMatchLabels indicates selecting stores with matched labels
// WithMatchLabels indicates selecting stores with matched labels.
func WithMatchLabels(labels []*metapb.StoreLabel) StoreSelectorOption {
return func(op *storeSelectorOp) {
op.labels = labels
op.labels = append(op.labels, labels...)
}
}

// WithLeaderOnly indicates selecting stores with leader only.
func WithLeaderOnly() StoreSelectorOption {
return func(op *storeSelectorOp) {
op.leaderOnly = true
}
}

Expand Down
9 changes: 5 additions & 4 deletions store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
pd "github.com/tikv/pd/client"
)

Expand Down Expand Up @@ -1043,7 +1044,7 @@ func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash(c *C) {
r := ctxTiFlash.Meta
reqSend := NewRegionRequestSender(s.cache, nil)
regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{CurrentRegions: []*metapb.Region{r}}}
reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr)
reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr, nil)

// check leader read should not go to tiflash
lctx, err = s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0)
Expand Down Expand Up @@ -1406,11 +1407,11 @@ func (s *testRegionCacheSuite) TestFollowerMeetEpochNotMatch(c *C) {
c.Assert(ctxFollower1.Store.storeID, Equals, s.store2)

regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}
reqSend.onRegionError(s.bo, ctxFollower1, &followReqSeed, regionErr)
reqSend.onRegionError(s.bo, ctxFollower1, &tikvrpc.Request{ReplicaReadSeed: &followReqSeed}, regionErr, nil)
c.Assert(followReqSeed, Equals, uint32(1))

regionErr = &errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}
reqSend.onRegionError(s.bo, ctxFollower1, &followReqSeed, regionErr)
reqSend.onRegionError(s.bo, ctxFollower1, &tikvrpc.Request{ReplicaReadSeed: &followReqSeed}, regionErr, nil)
c.Assert(followReqSeed, Equals, uint32(2))
}

Expand All @@ -1437,7 +1438,7 @@ func (s *testRegionCacheSuite) TestMixedMeetEpochNotMatch(c *C) {
c.Assert(ctxFollower1.Store.storeID, Equals, s.store1)

regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}
reqSend.onRegionError(s.bo, ctxFollower1, &followReqSeed, regionErr)
reqSend.onRegionError(s.bo, ctxFollower1, &tikvrpc.Request{ReplicaReadSeed: &followReqSeed}, regionErr, nil)
c.Assert(followReqSeed, Equals, uint32(1))
}

Expand Down
45 changes: 43 additions & 2 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/store/tikv/util"
Expand Down Expand Up @@ -286,6 +287,9 @@ func (s *RegionRequestSender) SendReqCtx(
if err != nil {
return nil, nil, err
}
if rpcCtx != nil {
rpcCtx.tryTimes = tryTimes
}

failpoint.Inject("invalidCacheAndRetry", func() {
// cooperate with github.com/pingcap/tidb/store/gcworker/setGcResolveMaxBackoff
Expand Down Expand Up @@ -334,8 +338,14 @@ func (s *RegionRequestSender) SendReqCtx(
if err != nil {
return nil, nil, errors.Trace(err)
}
failpoint.Inject("mockDataIsNotReadyError", func(val failpoint.Value) {
regionErr = &errorpb.Error{}
if tryTimesLimit, ok := val.(int); ok && tryTimes <= tryTimesLimit {
regionErr.DataIsNotReady = &errorpb.DataIsNotReady{}
}
})
if regionErr != nil {
retry, err = s.onRegionError(bo, rpcCtx, req.ReplicaReadSeed, regionErr)
retry, err = s.onRegionError(bo, rpcCtx, req, regionErr, &opts)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down Expand Up @@ -644,14 +654,20 @@ func regionErrorToLabel(e *errorpb.Error) string {
return "unknown"
}

func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed *uint32, regionErr *errorpb.Error) (shouldRetry bool, err error) {
func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error, opts *[]StoreSelectorOption) (shouldRetry bool, err error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tikv.onRegionError", opentracing.ChildOf(span.Context()))
defer span1.Finish()
// TODO(MyonKeminta): Make sure trace works without cloning the backoffer.
// bo = bo.Clone()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
// Stale Read request will retry the leader or next peer on error,
// if txnScope is global, we will only retry the leader by using the WithLeaderOnly option,
// if txnScope is local, we will retry both other peers and the leader by the incresing seed.
if ctx.tryTimes < 1 && req != nil && req.TxnScope == oracle.GlobalTxnScope && req.GetStaleRead() {
*opts = append(*opts, WithLeaderOnly())
}

metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr)).Inc()
if notLeader := regionErr.GetNotLeader(); notLeader != nil {
Expand Down Expand Up @@ -687,6 +703,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed
return true, nil
}

seed := req.GetReplicaReadSeed()
if epochNotMatch := regionErr.GetEpochNotMatch(); epochNotMatch != nil {
logutil.BgLogger().Debug("tikv reports `EpochNotMatch` retry later",
zap.Stringer("EpochNotMatch", epochNotMatch),
Expand Down Expand Up @@ -723,6 +740,30 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed
logutil.BgLogger().Warn("tikv reports `RaftEntryTooLarge`", zap.Stringer("ctx", ctx))
return false, errors.New(regionErr.String())
}
// A stale read request may be sent to a peer which the data is not ready yet, we should retry in this case.
if regionErr.GetDataIsNotReady() != nil {
logutil.BgLogger().Warn("tikv reports `DataIsNotReady` retry later",
zap.Uint64("store-id", ctx.Store.storeID),
zap.Uint64("peer-id", regionErr.GetDataIsNotReady().GetPeerId()),
zap.Uint64("region-id", regionErr.GetDataIsNotReady().GetRegionId()),
zap.Uint64("safe-ts", regionErr.GetDataIsNotReady().GetSafeTs()),
zap.Stringer("ctx", ctx))
if seed != nil {
*seed = *seed + 1
}
return true, nil
}
// A read request may be sent to a peer which has not been initialized yet, we should retry in this case.
if regionErr.GetRegionNotInitialized() != nil {
logutil.BgLogger().Warn("tikv reports `RegionNotInitialized` retry later",
zap.Uint64("store-id", ctx.Store.storeID),
zap.Uint64("region-id", regionErr.GetRegionNotInitialized().GetRegionId()),
zap.Stringer("ctx", ctx))
if seed != nil {
*seed = *seed + 1
}
return true, nil
}
if regionErr.GetRegionNotFound() != nil && seed != nil {
logutil.BgLogger().Debug("tikv reports `RegionNotFound` in follow-reader",
zap.Stringer("ctx", ctx), zap.Uint32("seed", *seed))
Expand Down
91 changes: 91 additions & 0 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/coprocessor_v2"
"github.com/pingcap/kvproto/pkg/errorpb"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/pingcap/kvproto/pkg/tikvpb"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/retry"

"github.com/pingcap/tidb/store/tikv/config"
Expand Down Expand Up @@ -185,6 +187,92 @@ func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit(c *C) {
kv.StoreLimit.Store(oldStoreLimit)
}

// Test whether the Stale Read request will retry the leader or other peers on error.
func (s *testRegionRequestToThreeStoresSuite) TestStaleReadRetry(c *C) {
var seed uint32 = 0
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadMixed, &seed)
req.EnableStaleRead()

// Test whether a global Stale Read request will only retry on the leader.
req.TxnScope = oracle.GlobalTxnScope
region, err := s.cache.LocateRegionByID(s.bo, s.regionID)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
// Retry 1 time.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(1)`), IsNil)
resp, ctx, err := s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
c.Assert(ctx, NotNil)
c.Assert(ctx.Peer.GetId(), Equals, s.leaderPeer)

seed = 0
region, err = s.cache.LocateRegionByID(s.bo, s.regionID)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
// Retry 2 times.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(2)`), IsNil)
resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
c.Assert(ctx, NotNil)
c.Assert(ctx.Peer.GetId(), Equals, s.leaderPeer)

seed = 0
region, err = s.cache.LocateRegionByID(s.bo, s.regionID)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
// Retry 3 times.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(3)`), IsNil)
resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
c.Assert(ctx, NotNil)
c.Assert(ctx.Peer.GetId(), Equals, s.leaderPeer)

// Test whether a local Stale Read request will retry on the leader and other peers.
req.TxnScope = "local"
seed = 0
region, err = s.cache.LocateRegionByID(s.bo, s.regionID)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
// Retry 1 time.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(1)`), IsNil)
resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
c.Assert(ctx, NotNil)
peerID1 := ctx.Peer.GetId()

seed = 0
region, err = s.cache.LocateRegionByID(s.bo, s.regionID)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
// Retry 2 times.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(2)`), IsNil)
resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
c.Assert(ctx, NotNil)
peerID2 := ctx.Peer.GetId()
c.Assert(peerID2, Not(Equals), peerID1)

seed = 0
region, err = s.cache.LocateRegionByID(s.bo, s.regionID)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
// Retry 3 times.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(3)`), IsNil)
resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
c.Assert(ctx, NotNil)
peerID3 := ctx.Peer.GetId()
c.Assert(peerID3, Not(Equals), peerID1)
c.Assert(peerID3, Not(Equals), peerID2)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError"), IsNil)
}

func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestart(c *C) {
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Expand Down Expand Up @@ -435,6 +523,9 @@ func (s *mockTikvGrpcServer) Coprocessor(context.Context, *coprocessor.Request)
func (s *mockTikvGrpcServer) BatchCoprocessor(*coprocessor.BatchRequest, tikvpb.Tikv_BatchCoprocessorServer) error {
return errors.New("unreachable")
}
func (s *mockTikvGrpcServer) RawCoprocessor(context.Context, *kvrpcpb.RawCoprocessorRequest) (*kvrpcpb.RawCoprocessorResponse, error) {
return nil, errors.New("unreachable")
}
func (s *mockTikvGrpcServer) DispatchMPPTask(context.Context, *mpp.DispatchTaskRequest) (*mpp.DispatchTaskResponse, error) {
return nil, errors.New("unreachable")
}
Expand Down
Loading

0 comments on commit 69274d8

Please sign in to comment.