Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Jan 19, 2022
1 parent 490cb79 commit d11a733
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 393 deletions.
30 changes: 1 addition & 29 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ var NewCDCKVClient func(
pd pd.Client,
kvStorage tikv.Storage,
grpcPool GrpcPool,
changefeed string,
) CDCKVClient = NewCDCClient

// CDCClient to get events from TiKV
Expand All @@ -318,31 +319,20 @@ type CDCClient struct {
grpcPool GrpcPool

regionCache *tikv.RegionCache
<<<<<<< HEAD
kvStorage TiKVStorage
=======
kvStorage tikv.Storage
pdClock pdtime.Clock
changefeed string
>>>>>>> 0538d371e (kv,puller(ticdc): add changefeed ID to kv client (#4373))

regionLimiters *regionEventFeedLimiters
}

// NewCDCClient creates a CDCClient instance
<<<<<<< HEAD
func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool) (c CDCKVClient) {
=======
func NewCDCClient(
ctx context.Context,
pd pd.Client,
kvStorage tikv.Storage,
grpcPool GrpcPool,
regionCache *tikv.RegionCache,
pdClock pdtime.Clock,
changefeed string,
) (c CDCKVClient) {
>>>>>>> 0538d371e (kv,puller(ticdc): add changefeed ID to kv client (#4373))
clusterID := pd.GetClusterID(ctx)

var store TiKVStorage
Expand All @@ -360,13 +350,8 @@ func NewCDCClient(
pd: pd,
kvStorage: store,
grpcPool: grpcPool,
<<<<<<< HEAD
regionCache: tikv.NewRegionCache(pd),
=======
regionCache: regionCache,
pdClock: pdClock,
changefeed: changefeed,
>>>>>>> 0538d371e (kv,puller(ticdc): add changefeed ID to kv client (#4373))
regionLimiters: defaultRegionEventFeedLimiters,
}
return
Expand Down Expand Up @@ -1114,15 +1099,6 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
zap.String("rpcCtx", errInfo.rpcCtx.String()),
zap.Stringer("error", compatibility))
return cerror.ErrVersionIncompatible.GenWithStackByArgs(compatibility)
<<<<<<< HEAD
=======
} else if mismatch := innerErr.GetClusterIdMismatch(); mismatch != nil {
log.Error("tikv reported the request cluster ID mismatch error, which is not expected",
zap.String("changefeed", s.client.changefeed),
zap.Uint64("tikvCurrentClusterID", mismatch.Current),
zap.Uint64("requestClusterID", mismatch.Request))
return cerror.ErrClusterIDMismatch.GenWithStackByArgs(mismatch.Current, mismatch.Request)
>>>>>>> 0538d371e (kv,puller(ticdc): add changefeed ID to kv client (#4373))
} else {
metricFeedUnknownErrorCounter.Inc()
log.Warn("receive empty or unknown error msg",
Expand Down Expand Up @@ -1272,12 +1248,8 @@ func (s *eventFeedSession) receiveFromStream(
regionCount = len(cevent.ResolvedTs.Regions)
}
log.Warn("change data event size too large",
<<<<<<< HEAD
zap.Int("size", size), zap.Int("event length", len(cevent.Events)),
=======
zap.String("changefeed", s.client.changefeed),
zap.Int("size", size), zap.Int("eventLen", len(cevent.Events)),
>>>>>>> 0538d371e (kv,puller(ticdc): add changefeed ID to kv client (#4373))
zap.Int("resolved region count", regionCount))
}

Expand Down
16 changes: 2 additions & 14 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
<<<<<<< HEAD
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
=======
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "")
>>>>>>> 0538d371e (kv,puller(ticdc): add changefeed ID to kv client (#4373))
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, "")
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -286,13 +280,7 @@ func prepareBench(b *testing.B, regionNum int) (
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
<<<<<<< HEAD
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
=======
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "")
>>>>>>> 0538d371e (kv,puller(ticdc): add changefeed ID to kv client (#4373))
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, "")
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down
Loading

0 comments on commit d11a733

Please sign in to comment.