Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/client: Fix incorrect behaviors when reconnecting the stream #499

Merged
merged 3 commits into from
Apr 27, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,23 +611,20 @@ MainLoop:
}

state := newRegionFeedState(sri, requestID)
hasOld := pendingRegions.insert(requestID, state)
if hasOld {
log.Error("region is already pending for the first response while trying to send another request."+
"region merge may have happened which is not supported yet",
zap.Uint64("regionID", sri.verID.GetID()), zap.Uint64("requestID", requestID))
}
pendingRegions.insert(requestID, state)

stream, ok := streams[rpcCtx.Addr]
// Establish the stream if it has not been connected yet.
if !ok {
stream, err = s.client.newStream(ctx, rpcCtx.Addr)
if err != nil {
// if get stream failed, maybe the store is down permanently, we should try to relocate the active store
log.Warn("get grpc stream client failed", zap.Error(err))
log.Warn("get grpc stream client failed",
zap.Uint64("regionID", sri.verID.GetID()), zap.Uint64("requestID", requestID), zap.Error(err))
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
s.client.regionCache.OnSendFail(bo, rpcCtx, needReloadRegion(sri.failStoreIDs, rpcCtx), err)
// Retry connecting and sending the request.
// Delete the pendingRegion info from `pendingRegions` and retry connecting and sending the request.
pendingRegions.take(requestID)
continue
}
streams[rpcCtx.Addr] = stream
Expand All @@ -647,6 +644,7 @@ MainLoop:
log.Error("send request to stream failed",
zap.String("addr", rpcCtx.Addr),
zap.Uint64("storeID", getStoreID(rpcCtx)),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Uint64("requestID", requestID),
zap.Error(err))
err1 := stream.CloseSend()
Expand All @@ -656,6 +654,11 @@ MainLoop:
// Delete the stream from the map so that the next time the store is accessed, the stream will be
// re-established.
delete(streams, rpcCtx.Addr)
// Delete `pendingRegions` from `storePendingRegions` so that the next time a region of this store is
// requested, it will create a new one. So if the `receiveFromStream` goroutine tries to stop all
// pending regions, the new pending regions that are requested after reconnecting won't be stopped
// incorrectly.
delete(storePendingRegions, rpcCtx.Addr)

// Remove the region from pendingRegions. If it's already removed, it should be already retried by
// `receiveFromStream`, so no need to retry here.
Expand Down Expand Up @@ -968,7 +971,7 @@ func (s *eventFeedSession) receiveFromStream(
zap.Uint64("regionID", event.RegionId),
zap.Uint64("requestID", event.RequestId),
zap.String("addr", addr))
return errors.Errorf("received event regionID %v, requestID %v from %v but neither pending"+
return errors.Errorf("received event regionID %v, requestID %v from %v but neither pending "+
"region nor running region was found", event.RegionId, event.RequestId, addr)
}

Expand Down