Skip to content

Commit

Permalink
kv,puller(ticdc): add changefeed ID to kv client (#4373)
Browse files Browse the repository at this point in the history
ref #3288
  • Loading branch information
overvenus authored Jan 18, 2022
1 parent dc47e9b commit 0538d37
Show file tree
Hide file tree
Showing 13 changed files with 168 additions and 69 deletions.
97 changes: 79 additions & 18 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ type CDCClient struct {
regionCache *tikv.RegionCache
kvStorage tikv.Storage
pdClock pdtime.Clock
changefeed string

regionLimiters *regionEventFeedLimiters
}
Expand All @@ -327,6 +328,7 @@ func NewCDCClient(
grpcPool GrpcPool,
regionCache *tikv.RegionCache,
pdClock pdtime.Clock,
changefeed string,
) (c CDCKVClient) {
clusterID := pd.GetClusterID(ctx)

Expand All @@ -337,6 +339,7 @@ func NewCDCClient(
grpcPool: grpcPool,
regionCache: regionCache,
pdClock: pdClock,
changefeed: changefeed,
regionLimiters: defaultRegionEventFeedLimiters,
}
return
Expand All @@ -356,27 +359,35 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
}()
conn, err = c.grpcPool.GetConn(addr)
if err != nil {
log.Info("get connection to store failed, retry later", zap.String("addr", addr), zap.Error(err))
log.Info("get connection to store failed, retry later",
zap.String("addr", addr), zap.Error(err),
zap.String("changefeed", c.changefeed))
return
}
err = version.CheckStoreVersion(ctx, c.pd, storeID)
if err != nil {
log.Error("check tikv version failed", zap.Error(err), zap.Uint64("storeID", storeID))
log.Error("check tikv version failed",
zap.Error(err), zap.Uint64("storeID", storeID),
zap.String("changefeed", c.changefeed))
return
}
client := cdcpb.NewChangeDataClient(conn.ClientConn)
var streamClient cdcpb.ChangeData_EventFeedClient
streamClient, err = client.EventFeed(ctx)
if err != nil {
err = cerror.WrapError(cerror.ErrTiKVEventFeed, err)
log.Info("establish stream to store failed, retry later", zap.String("addr", addr), zap.Error(err))
log.Info("establish stream to store failed, retry later",
zap.String("addr", addr), zap.Error(err),
zap.String("changefeed", c.changefeed))
return
}
stream = &eventFeedStream{
client: streamClient,
conn: conn,
}
log.Debug("created stream to store", zap.String("addr", addr))
log.Debug("created stream to store",
zap.String("addr", addr),
zap.String("changefeed", c.changefeed))
return nil
}, retry.WithBackoffBaseDelay(500), retry.WithMaxTries(2), retry.WithIsRetryableErr(cerror.IsRetryableError))
return
Expand Down Expand Up @@ -470,6 +481,8 @@ func newEventFeedSession(
) *eventFeedSession {
id := strconv.FormatUint(allocID(), 10)
kvClientCfg := config.GetGlobalServerConfig().KVClient
rangeLock := regionspan.NewRegionRangeLock(
totalSpan.Start, totalSpan.End, startTs, client.changefeed)
return &eventFeedSession{
client: client,
totalSpan: totalSpan,
Expand All @@ -479,7 +492,7 @@ func newEventFeedSession(
errCh: make(chan regionErrorInfo, defaultRegionChanSize),
requestRangeCh: make(chan rangeRequestTask, defaultRegionChanSize),
rateLimitQueue: make([]regionErrorInfo, 0, defaultRegionRateLimitQueueSize),
rangeLock: regionspan.NewRegionRangeLock(totalSpan.Start, totalSpan.End, startTs),
rangeLock: rangeLock,
enableOldValue: enableOldValue,
lockResolver: lockResolver,
isPullerInit: isPullerInit,
Expand All @@ -496,7 +509,9 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
eventFeedGauge.Inc()
defer eventFeedGauge.Dec()

log.Debug("event feed started", zap.Stringer("span", s.totalSpan), zap.Uint64("ts", ts))
log.Debug("event feed started",
zap.Stringer("span", s.totalSpan), zap.Uint64("ts", ts),
zap.String("changefeed", s.client.changefeed))

g, ctx := errgroup.WithContext(ctx)

Expand Down Expand Up @@ -555,6 +570,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
zapFieldAddr = zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr)
}
log.Info("EventFeed retry rate limited",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()),
zap.Uint64("ts", errInfo.singleRegionInfo.ts),
zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span),
Expand Down Expand Up @@ -608,6 +624,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
}
case regionspan.LockRangeStatusStale:
log.Info("request expired",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Stringer("span", sri.span),
zap.Reflect("retrySpans", res.RetryRanges))
Expand Down Expand Up @@ -655,7 +672,10 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
// error handling. This function is non blocking even if error channel is full.
// CAUTION: Note that this should only be called in a context that the region has locked it's range.
func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo, revokeToken bool) {
log.Debug("region failed", zap.Uint64("regionID", errorInfo.verID.GetID()), zap.Error(errorInfo.err))
log.Debug("region failed",
zap.Uint64("regionID", errorInfo.verID.GetID()),
zap.Error(errorInfo.err),
zap.String("changefeed", s.client.changefeed))
s.rangeLock.UnlockRange(errorInfo.span.Start, errorInfo.span.End, errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.ts)
if revokeToken {
s.regionRouter.Release(errorInfo.rpcCtx.Addr)
Expand Down Expand Up @@ -720,7 +740,9 @@ func (s *eventFeedSession) requestRegionToStore(
pendingRegions, ok = storePendingRegions[rpcCtx.Addr]
if !ok {
// Should never happen
log.Panic("pending regions is not found for store", zap.String("store", rpcCtx.Addr))
log.Panic("pending regions is not found for store",
zap.String("changefeed", s.client.changefeed),
zap.String("store", rpcCtx.Addr))
}
} else {
// when a new stream is established, always create a new pending
Expand All @@ -730,6 +752,7 @@ func (s *eventFeedSession) requestRegionToStore(
storePendingRegions[rpcCtx.Addr] = pendingRegions
storeID := rpcCtx.Peer.GetStoreId()
log.Info("creating new stream to store to send request",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Uint64("requestID", requestID),
zap.Uint64("storeID", storeID),
Expand All @@ -740,6 +763,7 @@ func (s *eventFeedSession) requestRegionToStore(
if err != nil {
// if get stream failed, maybe the store is down permanently, we should try to relocate the active store
log.Warn("get grpc stream client failed",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Uint64("requestID", requestID),
zap.Uint64("storeID", storeID),
Expand Down Expand Up @@ -773,22 +797,26 @@ func (s *eventFeedSession) requestRegionToStore(
if s.isPullerInit.IsInitialized() {
logReq = log.Info
}
logReq("start new request", zap.Reflect("request", req), zap.String("addr", rpcCtx.Addr))
logReq("start new request",
zap.String("changefeed", s.client.changefeed),
zap.Reflect("request", req), zap.String("addr", rpcCtx.Addr))

err = stream.client.Send(req)

// If Send error, the receiver should have received error too or will receive error soon. So we doesn't need
// to do extra work here.
if err != nil {
log.Warn("send request to stream failed",
zap.String("changefeed", s.client.changefeed),
zap.String("addr", rpcCtx.Addr),
zap.Uint64("storeID", getStoreID(rpcCtx)),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Uint64("requestID", requestID),
zap.Error(err))
err1 := stream.client.CloseSend()
if err1 != nil {
log.Warn("failed to close stream", zap.Error(err1))
log.Warn("failed to close stream",
zap.Error(err1), zap.String("changefeed", s.client.changefeed))
}
// Delete the stream from the map so that the next time the store is accessed, the stream will be
// re-established.
Expand Down Expand Up @@ -837,7 +865,9 @@ func (s *eventFeedSession) dispatchRequest(
s.regionChSizeGauge.Dec()
}

log.Debug("dispatching region", zap.Uint64("regionID", sri.verID.GetID()))
log.Debug("dispatching region",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", sri.verID.GetID()))

// Send a resolved ts to event channel first, for two reasons:
// 1. Since we have locked the region range, and have maintained correct
Expand Down Expand Up @@ -869,6 +899,7 @@ func (s *eventFeedSession) dispatchRequest(
if rpcCtx == nil {
// The region info is invalid. Retry the span.
log.Info("cannot get rpcCtx, retry span",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Stringer("span", sri.span))
errInfo := newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID})
Expand Down Expand Up @@ -908,17 +939,27 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
for _, region := range regions {
if region.GetMeta() == nil {
err = cerror.ErrMetaNotInRegion.GenWithStackByArgs()
log.Warn("batch load region", zap.Stringer("span", nextSpan), zap.Error(err))
log.Warn("batch load region",
zap.Stringer("span", nextSpan), zap.Error(err),
zap.String("changefeed", s.client.changefeed),
)
return err
}
metas = append(metas, region.GetMeta())
}
if !regionspan.CheckRegionsLeftCover(metas, nextSpan) {
err = cerror.ErrRegionsNotCoverSpan.GenWithStackByArgs(nextSpan, metas)
log.Warn("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas), zap.Error(err))
log.Warn("ScanRegions",
zap.Stringer("span", nextSpan),
zap.Reflect("regions", metas), zap.Error(err),
zap.String("changefeed", s.client.changefeed),
)
return err
}
log.Debug("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas))
log.Debug("ScanRegions",
zap.Stringer("span", nextSpan),
zap.Reflect("regions", metas),
zap.String("changefeed", s.client.changefeed))
return nil
}, retry.WithBackoffMaxDelay(50), retry.WithMaxTries(100), retry.WithIsRetryableErr(cerror.IsRetryableError))
if retryErr != nil {
Expand All @@ -931,13 +972,19 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
if err != nil {
return errors.Trace(err)
}
log.Debug("get partialSpan", zap.Stringer("span", partialSpan), zap.Uint64("regionID", region.Id))
log.Debug("get partialSpan",
zap.Stringer("span", partialSpan),
zap.Uint64("regionID", region.Id),
zap.String("changefeed", s.client.changefeed))

nextSpan.Start = region.EndKey

sri := newSingleRegionInfo(tiRegion.VerID(), partialSpan, ts, nil)
s.scheduleRegionRequest(ctx, sri)
log.Debug("partialSpan scheduled", zap.Stringer("span", partialSpan), zap.Uint64("regionID", region.Id))
log.Debug("partialSpan scheduled",
zap.Stringer("span", partialSpan),
zap.Uint64("regionID", region.Id),
zap.String("changefeed", s.client.changefeed))

// return if no more regions
if regionspan.EndCompare(nextSpan.Start, span.End) >= 0 {
Expand Down Expand Up @@ -1023,17 +1070,21 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
return errUnreachable
} else if compatibility := innerErr.GetCompatibility(); compatibility != nil {
log.Error("tikv reported compatibility error, which is not expected",
zap.String("changefeed", s.client.changefeed),
zap.String("rpcCtx", errInfo.rpcCtx.String()),
zap.Stringer("error", compatibility))
return cerror.ErrVersionIncompatible.GenWithStackByArgs(compatibility)
} else if mismatch := innerErr.GetClusterIdMismatch(); mismatch != nil {
log.Error("tikv reported the request cluster ID mismatch error, which is not expected",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("tikvCurrentClusterID", mismatch.Current),
zap.Uint64("requestClusterID", mismatch.Request))
return cerror.ErrClusterIDMismatch.GenWithStackByArgs(mismatch.Current, mismatch.Request)
} else {
metricFeedUnknownErrorCounter.Inc()
log.Warn("receive empty or unknown error msg", zap.Stringer("error", innerErr))
log.Warn("receive empty or unknown error msg",
zap.String("changefeed", s.client.changefeed),
zap.Stringer("error", innerErr))
}
case *rpcCtxUnavailableErr:
metricFeedRPCCtxUnavailable.Inc()
Expand Down Expand Up @@ -1084,7 +1135,9 @@ func (s *eventFeedSession) receiveFromStream(
// Cancel the pending regions if the stream failed. Otherwise it will remain unhandled in the pendingRegions list
// however not registered in the new reconnected stream.
defer func() {
log.Info("stream to store closed", zap.String("addr", addr), zap.Uint64("storeID", storeID))
log.Info("stream to store closed",
zap.String("changefeed", s.client.changefeed),
zap.String("addr", addr), zap.Uint64("storeID", storeID))

failpoint.Inject("kvClientStreamCloseDelay", nil)

Expand Down Expand Up @@ -1129,12 +1182,14 @@ func (s *eventFeedSession) receiveFromStream(
if status.Code(errors.Cause(err)) == codes.Canceled {
log.Debug(
"receive from stream canceled",
zap.String("changefeed", s.client.changefeed),
zap.String("addr", addr),
zap.Uint64("storeID", storeID),
)
} else {
log.Warn(
"failed to receive from stream",
zap.String("changefeed", s.client.changefeed),
zap.String("addr", addr),
zap.Uint64("storeID", storeID),
zap.Error(err),
Expand Down Expand Up @@ -1174,6 +1229,7 @@ func (s *eventFeedSession) receiveFromStream(
regionCount = len(cevent.ResolvedTs.Regions)
}
log.Warn("change data event size too large",
zap.String("changefeed", s.client.changefeed),
zap.Int("size", size), zap.Int("eventLen", len(cevent.Events)),
zap.Int("resolved region count", regionCount))
}
Expand Down Expand Up @@ -1209,13 +1265,15 @@ func (s *eventFeedSession) sendRegionChangeEvent(
if ok {
if state.requestID < event.RequestId {
log.Debug("region state entry will be replaced because received message of newer requestID",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", event.RegionId),
zap.Uint64("oldRequestID", state.requestID),
zap.Uint64("requestID", event.RequestId),
zap.String("addr", addr))
isNewSubscription = true
} else if state.requestID > event.RequestId {
log.Warn("drop event due to event belongs to a stale request",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", event.RegionId),
zap.Uint64("requestID", event.RequestId),
zap.Uint64("currRequestID", state.requestID),
Expand All @@ -1232,6 +1290,7 @@ func (s *eventFeedSession) sendRegionChangeEvent(
state, ok = pendingRegions.take(event.RequestId)
if !ok {
log.Warn("drop event due to region feed is removed",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", event.RegionId),
zap.Uint64("requestID", event.RequestId),
zap.String("addr", addr))
Expand All @@ -1242,6 +1301,7 @@ func (s *eventFeedSession) sendRegionChangeEvent(
worker.setRegionState(event.RegionId, state)
} else if state.isStopped() {
log.Warn("drop event due to region feed stopped",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", event.RegionId),
zap.Uint64("requestID", event.RequestId),
zap.String("addr", addr))
Expand Down Expand Up @@ -1271,6 +1331,7 @@ func (s *eventFeedSession) sendResolvedTs(
if ok {
if state.isStopped() {
log.Debug("drop resolved ts due to region feed stopped",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", state.requestID),
zap.String("addr", addr))
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
defer grpcPool.Close()
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test())
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "")
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -282,7 +282,7 @@ func prepareBench(b *testing.B, regionNum int) (
defer grpcPool.Close()
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test())
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "")
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down
Loading

0 comments on commit 0538d37

Please sign in to comment.