From 3bb1955d8e13c3430b5cc910668ce54f09c968e5 Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 29 Nov 2023 21:14:48 +0800 Subject: [PATCH] cdc: fixes minor bugs #10168 and #10169 (#10170) Signed-off-by: qupeng --- cdc/kv/client.go | 4 ++++ cdc/processor/sinkmanager/tasks.go | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 5b78e727006..cdf29fb06f0 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -73,6 +73,8 @@ const ( regionScheduleReload = false resolveLockMinInterval = 10 * time.Second + + scanRegionsConcurrency = 1024 ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -432,6 +434,8 @@ func (s *eventFeedSession) eventFeed(ctx context.Context) error { g.Go(func() error { return s.logSlowRegions(ctx) }) g.Go(func() error { + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(scanRegionsConcurrency) for { select { case <-ctx.Done(): diff --git a/cdc/processor/sinkmanager/tasks.go b/cdc/processor/sinkmanager/tasks.go index c6b6ea8757b..0ba96d761fe 100644 --- a/cdc/processor/sinkmanager/tasks.go +++ b/cdc/processor/sinkmanager/tasks.go @@ -33,8 +33,8 @@ var ( maxUpdateIntervalSize = defaultMaxUpdateIntervalSize // Sink manager schedules table tasks based on lag. Limit the max task range - // can be helpful to reduce changefeed latency. - maxTaskRange = 5 * time.Second + // can be helpful to reduce changefeed latency for large initial data. + maxTaskTimeRange = 30 * time.Minute ) // Used to record the progress of the table.