Skip to content

Commit

Permalink
kv/client: Fix incorrect behaviors when reconnecting the stream (#499)
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta authored Apr 27, 2020
1 parent 8b17582 commit a6ddaa4
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,23 +611,20 @@ MainLoop:
}

state := newRegionFeedState(sri, requestID)
hasOld := pendingRegions.insert(requestID, state)
if hasOld {
log.Error("region is already pending for the first response while trying to send another request."+
"region merge may have happened which is not supported yet",
zap.Uint64("regionID", sri.verID.GetID()), zap.Uint64("requestID", requestID))
}
pendingRegions.insert(requestID, state)

stream, ok := streams[rpcCtx.Addr]
// Establish the stream if it has not been connected yet.
if !ok {
stream, err = s.client.newStream(ctx, rpcCtx.Addr)
if err != nil {
// if get stream failed, maybe the store is down permanently, we should try to relocate the active store
log.Warn("get grpc stream client failed", zap.Error(err))
log.Warn("get grpc stream client failed",
zap.Uint64("regionID", sri.verID.GetID()), zap.Uint64("requestID", requestID), zap.Error(err))
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
s.client.regionCache.OnSendFail(bo, rpcCtx, needReloadRegion(sri.failStoreIDs, rpcCtx), err)
// Retry connecting and sending the request.
// Delete the pendingRegion info from `pendingRegions` and retry connecting and sending the request.
pendingRegions.take(requestID)
continue
}
streams[rpcCtx.Addr] = stream
Expand All @@ -647,6 +644,7 @@ MainLoop:
log.Error("send request to stream failed",
zap.String("addr", rpcCtx.Addr),
zap.Uint64("storeID", getStoreID(rpcCtx)),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Uint64("requestID", requestID),
zap.Error(err))
err1 := stream.CloseSend()
Expand All @@ -656,6 +654,11 @@ MainLoop:
// Delete the stream from the map so that the next time the store is accessed, the stream will be
// re-established.
delete(streams, rpcCtx.Addr)
// Delete `pendingRegions` from `storePendingRegions` so that the next time a region of this store is
// requested, it will create a new one. So if the `receiveFromStream` goroutine tries to stop all
// pending regions, the new pending regions that are requested after reconnecting won't be stopped
// incorrectly.
delete(storePendingRegions, rpcCtx.Addr)

// Remove the region from pendingRegions. If it's already removed, it should be already retried by
// `receiveFromStream`, so no need to retry here.
Expand Down Expand Up @@ -968,7 +971,7 @@ func (s *eventFeedSession) receiveFromStream(
zap.Uint64("regionID", event.RegionId),
zap.Uint64("requestID", event.RequestId),
zap.String("addr", addr))
return errors.Errorf("received event regionID %v, requestID %v from %v but neither pending"+
return errors.Errorf("received event regionID %v, requestID %v from %v but neither pending "+
"region nor running region was found", event.RegionId, event.RequestId, addr)
}

Expand Down

0 comments on commit a6ddaa4

Please sign in to comment.