Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv,puller(ticdc): add changefeed ID to kv client (#4373) #4392

Merged
106 changes: 86 additions & 20 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ var NewCDCKVClient func(
pd pd.Client,
kvStorage tikv.Storage,
grpcPool GrpcPool,
changefeed string,
) CDCKVClient = NewCDCClient

// CDCClient to get events from TiKV
Expand All @@ -319,12 +320,19 @@ type CDCClient struct {

regionCache *tikv.RegionCache
kvStorage TiKVStorage
changefeed string

regionLimiters *regionEventFeedLimiters
}

// NewCDCClient creates a CDCClient instance
func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool) (c CDCKVClient) {
func NewCDCClient(
ctx context.Context,
pd pd.Client,
kvStorage tikv.Storage,
grpcPool GrpcPool,
changefeed string,
) (c CDCKVClient) {
clusterID := pd.GetClusterID(ctx)

var store TiKVStorage
Expand All @@ -343,6 +351,7 @@ func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grp
kvStorage: store,
grpcPool: grpcPool,
regionCache: tikv.NewRegionCache(pd),
changefeed: changefeed,
regionLimiters: defaultRegionEventFeedLimiters,
}
return
Expand All @@ -369,27 +378,35 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
}()
conn, err = c.grpcPool.GetConn(addr)
if err != nil {
log.Info("get connection to store failed, retry later", zap.String("addr", addr), zap.Error(err))
log.Info("get connection to store failed, retry later",
zap.String("addr", addr), zap.Error(err),
zap.String("changefeed", c.changefeed))
return
}
err = version.CheckStoreVersion(ctx, c.pd, storeID)
if err != nil {
log.Error("check tikv version failed", zap.Error(err), zap.Uint64("storeID", storeID))
log.Error("check tikv version failed",
zap.Error(err), zap.Uint64("storeID", storeID),
zap.String("changefeed", c.changefeed))
return
}
client := cdcpb.NewChangeDataClient(conn.ClientConn)
var streamClient cdcpb.ChangeData_EventFeedClient
streamClient, err = client.EventFeed(ctx)
if err != nil {
err = cerror.WrapError(cerror.ErrTiKVEventFeed, err)
log.Info("establish stream to store failed, retry later", zap.String("addr", addr), zap.Error(err))
log.Info("establish stream to store failed, retry later",
zap.String("addr", addr), zap.Error(err),
zap.String("changefeed", c.changefeed))
return
}
stream = &eventFeedStream{
client: streamClient,
conn: conn,
}
log.Debug("created stream to store", zap.String("addr", addr))
log.Debug("created stream to store",
zap.String("addr", addr),
zap.String("changefeed", c.changefeed))
return nil
}, retry.WithBackoffBaseDelay(500), retry.WithMaxTries(2), retry.WithIsRetryableErr(cerror.IsRetryableError))
return
Expand Down Expand Up @@ -487,6 +504,8 @@ func newEventFeedSession(
) *eventFeedSession {
id := strconv.FormatUint(allocID(), 10)
kvClientCfg := config.GetGlobalServerConfig().KVClient
rangeLock := regionspan.NewRegionRangeLock(
totalSpan.Start, totalSpan.End, startTs, client.changefeed)
return &eventFeedSession{
client: client,
regionCache: regionCache,
Expand All @@ -498,7 +517,7 @@ func newEventFeedSession(
errCh: make(chan regionErrorInfo, defaultRegionChanSize),
requestRangeCh: make(chan rangeRequestTask, defaultRegionChanSize),
rateLimitQueue: make([]regionErrorInfo, 0, defaultRegionRateLimitQueueSize),
rangeLock: regionspan.NewRegionRangeLock(totalSpan.Start, totalSpan.End, startTs),
rangeLock: rangeLock,
enableOldValue: enableOldValue,
lockResolver: lockResolver,
isPullerInit: isPullerInit,
Expand All @@ -515,7 +534,9 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
eventFeedGauge.Inc()
defer eventFeedGauge.Dec()

log.Debug("event feed started", zap.Stringer("span", s.totalSpan), zap.Uint64("ts", ts))
log.Debug("event feed started",
zap.Stringer("span", s.totalSpan), zap.Uint64("ts", ts),
zap.String("changefeed", s.client.changefeed))

g, ctx := errgroup.WithContext(ctx)

Expand Down Expand Up @@ -574,6 +595,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
zapFieldAddr = zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr)
}
log.Info("EventFeed retry rate limited",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()),
zap.Uint64("ts", errInfo.singleRegionInfo.ts),
zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span),
Expand Down Expand Up @@ -627,6 +649,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
}
case regionspan.LockRangeStatusStale:
log.Info("request expired",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Stringer("span", sri.span),
zap.Reflect("retrySpans", res.RetryRanges))
Expand Down Expand Up @@ -674,7 +697,10 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
// error handling. This function is non blocking even if error channel is full.
// CAUTION: Note that this should only be called in a context that the region has locked it's range.
func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo, revokeToken bool) {
log.Debug("region failed", zap.Uint64("regionID", errorInfo.verID.GetID()), zap.Error(errorInfo.err))
log.Debug("region failed",
zap.Uint64("regionID", errorInfo.verID.GetID()),
zap.Error(errorInfo.err),
zap.String("changefeed", s.client.changefeed))
s.rangeLock.UnlockRange(errorInfo.span.Start, errorInfo.span.End, errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.ts)
if revokeToken {
s.regionRouter.Release(errorInfo.rpcCtx.Addr)
Expand Down Expand Up @@ -739,7 +765,9 @@ func (s *eventFeedSession) requestRegionToStore(
pendingRegions, ok = storePendingRegions[rpcCtx.Addr]
if !ok {
// Should never happen
log.Panic("pending regions is not found for store", zap.String("store", rpcCtx.Addr))
log.Panic("pending regions is not found for store",
zap.String("changefeed", s.client.changefeed),
zap.String("store", rpcCtx.Addr))
}
} else {
// when a new stream is established, always create a new pending
Expand All @@ -749,6 +777,7 @@ func (s *eventFeedSession) requestRegionToStore(
storePendingRegions[rpcCtx.Addr] = pendingRegions
storeID := rpcCtx.Peer.GetStoreId()
log.Info("creating new stream to store to send request",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Uint64("requestID", requestID),
zap.Uint64("storeID", storeID),
Expand All @@ -759,6 +788,7 @@ func (s *eventFeedSession) requestRegionToStore(
if err != nil {
// if get stream failed, maybe the store is down permanently, we should try to relocate the active store
log.Warn("get grpc stream client failed",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Uint64("requestID", requestID),
zap.Uint64("storeID", storeID),
Expand Down Expand Up @@ -792,22 +822,26 @@ func (s *eventFeedSession) requestRegionToStore(
if s.isPullerInit.IsInitialized() {
logReq = log.Info
}
logReq("start new request", zap.Reflect("request", req), zap.String("addr", rpcCtx.Addr))
logReq("start new request",
zap.String("changefeed", s.client.changefeed),
zap.Reflect("request", req), zap.String("addr", rpcCtx.Addr))

err = stream.client.Send(req)

// If Send error, the receiver should have received error too or will receive error soon. So we doesn't need
// to do extra work here.
if err != nil {
log.Warn("send request to stream failed",
zap.String("changefeed", s.client.changefeed),
zap.String("addr", rpcCtx.Addr),
zap.Uint64("storeID", getStoreID(rpcCtx)),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Uint64("requestID", requestID),
zap.Error(err))
err1 := stream.client.CloseSend()
if err1 != nil {
log.Warn("failed to close stream", zap.Error(err1))
log.Warn("failed to close stream",
zap.Error(err1), zap.String("changefeed", s.client.changefeed))
}
// Delete the stream from the map so that the next time the store is accessed, the stream will be
// re-established.
Expand Down Expand Up @@ -856,7 +890,9 @@ func (s *eventFeedSession) dispatchRequest(
s.regionChSizeGauge.Dec()
}

log.Debug("dispatching region", zap.Uint64("regionID", sri.verID.GetID()))
log.Debug("dispatching region",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", sri.verID.GetID()))

// Send a resolved ts to event channel first, for two reasons:
// 1. Since we have locked the region range, and have maintained correct
Expand Down Expand Up @@ -888,6 +924,7 @@ func (s *eventFeedSession) dispatchRequest(
if rpcCtx == nil {
// The region info is invalid. Retry the span.
log.Info("cannot get rpcCtx, retry span",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Stringer("span", sri.span))
errInfo := newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID})
Expand Down Expand Up @@ -927,17 +964,27 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
for _, region := range regions {
if region.GetMeta() == nil {
err = cerror.ErrMetaNotInRegion.GenWithStackByArgs()
log.Warn("batch load region", zap.Stringer("span", nextSpan), zap.Error(err))
log.Warn("batch load region",
zap.Stringer("span", nextSpan), zap.Error(err),
zap.String("changefeed", s.client.changefeed),
)
return err
}
metas = append(metas, region.GetMeta())
}
if !regionspan.CheckRegionsLeftCover(metas, nextSpan) {
err = cerror.ErrRegionsNotCoverSpan.GenWithStackByArgs(nextSpan, metas)
log.Warn("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas), zap.Error(err))
log.Warn("ScanRegions",
zap.Stringer("span", nextSpan),
zap.Reflect("regions", metas), zap.Error(err),
zap.String("changefeed", s.client.changefeed),
)
return err
}
log.Debug("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas))
log.Debug("ScanRegions",
zap.Stringer("span", nextSpan),
zap.Reflect("regions", metas),
zap.String("changefeed", s.client.changefeed))
return nil
}, retry.WithBackoffMaxDelay(50), retry.WithMaxTries(100), retry.WithIsRetryableErr(cerror.IsRetryableError))
if retryErr != nil {
Expand All @@ -950,13 +997,19 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
if err != nil {
return errors.Trace(err)
}
log.Debug("get partialSpan", zap.Stringer("span", partialSpan), zap.Uint64("regionID", region.Id))
log.Debug("get partialSpan",
zap.Stringer("span", partialSpan),
zap.Uint64("regionID", region.Id),
zap.String("changefeed", s.client.changefeed))

nextSpan.Start = region.EndKey

sri := newSingleRegionInfo(tiRegion.VerID(), partialSpan, ts, nil)
s.scheduleRegionRequest(ctx, sri)
log.Debug("partialSpan scheduled", zap.Stringer("span", partialSpan), zap.Uint64("regionID", region.Id))
log.Debug("partialSpan scheduled",
zap.Stringer("span", partialSpan),
zap.Uint64("regionID", region.Id),
zap.String("changefeed", s.client.changefeed))

// return if no more regions
if regionspan.EndCompare(nextSpan.Start, span.End) >= 0 {
Expand Down Expand Up @@ -1042,12 +1095,15 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
return errUnreachable
} else if compatibility := innerErr.GetCompatibility(); compatibility != nil {
log.Error("tikv reported compatibility error, which is not expected",
zap.String("changefeed", s.client.changefeed),
zap.String("rpcCtx", errInfo.rpcCtx.String()),
zap.Stringer("error", compatibility))
return cerror.ErrVersionIncompatible.GenWithStackByArgs(compatibility)
} else {
metricFeedUnknownErrorCounter.Inc()
log.Warn("receive empty or unknown error msg", zap.Stringer("error", innerErr))
log.Warn("receive empty or unknown error msg",
zap.String("changefeed", s.client.changefeed),
zap.Stringer("error", innerErr))
}
case *rpcCtxUnavailableErr:
metricFeedRPCCtxUnavailable.Inc()
Expand Down Expand Up @@ -1098,7 +1154,9 @@ func (s *eventFeedSession) receiveFromStream(
// Cancel the pending regions if the stream failed. Otherwise it will remain unhandled in the pendingRegions list
// however not registered in the new reconnected stream.
defer func() {
log.Info("stream to store closed", zap.String("addr", addr), zap.Uint64("storeID", storeID))
log.Info("stream to store closed",
zap.String("changefeed", s.client.changefeed),
zap.String("addr", addr), zap.Uint64("storeID", storeID))

failpoint.Inject("kvClientStreamCloseDelay", nil)

Expand Down Expand Up @@ -1143,12 +1201,14 @@ func (s *eventFeedSession) receiveFromStream(
if status.Code(errors.Cause(err)) == codes.Canceled {
log.Debug(
"receive from stream canceled",
zap.String("changefeed", s.client.changefeed),
zap.String("addr", addr),
zap.Uint64("storeID", storeID),
)
} else {
log.Warn(
"failed to receive from stream",
zap.String("changefeed", s.client.changefeed),
zap.String("addr", addr),
zap.Uint64("storeID", storeID),
zap.Error(err),
Expand Down Expand Up @@ -1188,7 +1248,8 @@ func (s *eventFeedSession) receiveFromStream(
regionCount = len(cevent.ResolvedTs.Regions)
}
log.Warn("change data event size too large",
zap.Int("size", size), zap.Int("event length", len(cevent.Events)),
zap.String("changefeed", s.client.changefeed),
zap.Int("size", size), zap.Int("eventLen", len(cevent.Events)),
zap.Int("resolved region count", regionCount))
}

Expand Down Expand Up @@ -1223,13 +1284,15 @@ func (s *eventFeedSession) sendRegionChangeEvent(
if ok {
if state.requestID < event.RequestId {
log.Debug("region state entry will be replaced because received message of newer requestID",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", event.RegionId),
zap.Uint64("oldRequestID", state.requestID),
zap.Uint64("requestID", event.RequestId),
zap.String("addr", addr))
isNewSubscription = true
} else if state.requestID > event.RequestId {
log.Warn("drop event due to event belongs to a stale request",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", event.RegionId),
zap.Uint64("requestID", event.RequestId),
zap.Uint64("currRequestID", state.requestID),
Expand All @@ -1246,6 +1309,7 @@ func (s *eventFeedSession) sendRegionChangeEvent(
state, ok = pendingRegions.take(event.RequestId)
if !ok {
log.Warn("drop event due to region feed is removed",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", event.RegionId),
zap.Uint64("requestID", event.RequestId),
zap.String("addr", addr))
Expand All @@ -1256,6 +1320,7 @@ func (s *eventFeedSession) sendRegionChangeEvent(
worker.setRegionState(event.RegionId, state)
} else if state.isStopped() {
log.Warn("drop event due to region feed stopped",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", event.RegionId),
zap.Uint64("requestID", event.RequestId),
zap.String("addr", addr))
Expand Down Expand Up @@ -1285,6 +1350,7 @@ func (s *eventFeedSession) sendResolvedTs(
if ok {
if state.isStopped() {
log.Debug("drop resolved ts due to region feed stopped",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", state.requestID),
zap.String("addr", addr))
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, "")
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -280,7 +280,7 @@ func prepareBench(b *testing.B, regionNum int) (
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, "")
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down
Loading