From 87b86e4301ac30ec850da92221a5ef5d84a29655 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 29 Jan 2024 12:00:24 +0800 Subject: [PATCH 01/14] fix kvClient reconnect loop --- cdc/kv/client.go | 174 ++++++++++++++++++++--------------- cdc/kv/region_worker.go | 60 ++++++------ cdc/kv/region_worker_test.go | 6 +- 3 files changed, 134 insertions(+), 106 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 3d3abf1dc2a..5cec1ec6687 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -136,6 +136,14 @@ func newRegionErrorInfo(info singleRegionInfo, err error) regionErrorInfo { type eventFeedStream struct { client cdcpb.ChangeData_EventFeedClient conn *sharedConn + // addr is the address of the TiKV store + addr string + // storeID is the ID of the TiKV store + storeID uint64 + // id is the stream ID, which is used to identify the stream. + id uint64 + // cancel is used to cancel the gRPC stream + cancel context.CancelFunc } // CDCKVClient is an interface to receives kv changed logs from TiKV @@ -232,7 +240,12 @@ func NewCDCClient( return c } -func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream *eventFeedStream, newStreamErr error) { +func (c *CDCClient) newStream( + ctx context.Context, + cancel context.CancelFunc, + addr string, + storeID uint64, +) (stream *eventFeedStream, newStreamErr error) { streamFunc := func() (err error) { var conn *sharedConn defer func() { @@ -255,8 +268,12 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) return cerror.WrapError(cerror.ErrTiKVEventFeed, err) } stream = &eventFeedStream{ - client: streamClient, - conn: conn, + client: streamClient, + conn: conn, + addr: addr, + storeID: storeID, + id: allocID(), + cancel: cancel, } log.Info("created stream to store", zap.String("namespace", c.changefeed.Namespace), @@ -371,9 +388,11 @@ type eventFeedSession struct { errChSizeGauge prometheus.Gauge rangeChSizeGauge prometheus.Gauge - streams map[string]*eventFeedStream - streamsLock sync.RWMutex - streamsCanceller map[string]context.CancelFunc + // storeStreamsCache is used to cache the established gRPC streams to TiKV stores. + storeStreamsCache map[string]*eventFeedStream + // storeStreamsCacheLock is used to protect storeStreamsCache, since the cache + // is shared by multiple goroutines. + storeStreamsCacheLock sync.RWMutex // use sync.Pool to store resolved ts event only, because resolved ts event // has the same size and generate cycle. @@ -414,8 +433,7 @@ func newEventFeedSession( client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"), rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"), - streams: make(map[string]*eventFeedStream), - streamsCanceller: make(map[string]context.CancelFunc), + storeStreamsCache: make(map[string]*eventFeedStream), resolvedTsPool: sync.Pool{ New: func() any { return ®ionStatefulEvent{ @@ -605,7 +623,7 @@ func (s *eventFeedSession) requestRegionToStore( // Stores pending regions info for each stream. After sending a new request, the region info wil be put to the map, // and it will be loaded by the receiver thread when it receives the first response from that region. We need this // to pass the region info to the receiver since the region info cannot be inferred from the response from TiKV. - storePendingRegions := make(map[string]*syncRegionFeedStateMap) + storePendingRegions := make(map[uint64]*syncRegionFeedStateMap) header := &cdcpb.Header{ ClusterId: s.client.clusterID, @@ -653,10 +671,9 @@ func (s *eventFeedSession) requestRegionToStore( // regions map, the old map will be used in old `receiveFromStream` // and won't be deleted until that goroutine exits. pendingRegions := newSyncRegionFeedStateMap() - storePendingRegions[storeAddr] = pendingRegions + storePendingRegions[stream.id] = pendingRegions streamCtx, streamCancel := context.WithCancel(ctx) - _ = streamCancel // to avoid possible context leak warning from govet - stream, err = s.client.newStream(streamCtx, storeAddr, storeID) + stream, err = s.client.newStream(streamCtx, streamCancel, storeAddr, storeID) if err != nil { // get stream failed, maybe the store is down permanently, we should try to relocate the active store log.Warn("get grpc stream client failed", @@ -681,7 +698,7 @@ func (s *eventFeedSession) requestRegionToStore( s.onRegionFail(ctx, errInfo) continue } - s.addStream(storeAddr, stream, streamCancel) + s.addStream(stream) log.Info("creating new stream to store to send request", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), @@ -690,15 +707,15 @@ func (s *eventFeedSession) requestRegionToStore( zap.Uint64("regionID", regionID), zap.Uint64("requestID", requestID), zap.Uint64("storeID", storeID), - zap.String("store", storeAddr)) + zap.String("store", storeAddr), + zap.Uint64("streamID", stream.id)) g.Go(func() error { - defer s.deleteStream(storeAddr) - return s.receiveFromStream(ctx, storeAddr, storeID, stream.client, pendingRegions) + return s.receiveFromStream(ctx, stream, pendingRegions) }) } - pendingRegions, ok := storePendingRegions[storeAddr] + pendingRegions, ok := storePendingRegions[stream.id] if !ok { // Should never happen log.Error("pending regions is not found for store", @@ -728,9 +745,8 @@ func (s *eventFeedSession) requestRegionToStore( zap.Stringer("span", &sri.span)) err = stream.client.Send(req) - - // If Send error, the receiver should have received error too or will receive error soon. So we don't need - // to do extra work here. + // If Send returns an error, the stream.client.Recv (In s.receiveFromStream) + // would also receive an error. if err != nil { log.Warn("send request to stream failed", zap.String("namespace", s.changefeed.Namespace), @@ -752,14 +768,15 @@ func (s *eventFeedSession) requestRegionToStore( zap.String("store", storeAddr), zap.Error(err)) } - // Delete the stream from the map so that the next time the store is accessed, the stream will be - // re-established. - s.deleteStream(storeAddr) + + // Delete the stream from the cache so that when next time the store is accessed, + // the stream can be re-established. + s.deleteStream(stream) // Delete `pendingRegions` from `storePendingRegions` so that the next time a region of this store is // requested, it will create a new one. So if the `receiveFromStream` goroutine tries to stop all // pending regions, the new pending regions that are requested after reconnecting won't be stopped // incorrectly. - delete(storePendingRegions, storeAddr) + delete(storePendingRegions, stream.id) // Remove the region from pendingRegions. If it's already removed, it should be already retried by // `receiveFromStream`, so no need to retry here. @@ -1024,14 +1041,12 @@ func (s *eventFeedSession) getRPCContextForRegion(ctx context.Context, id tikv.R // routine exits to establish these regions. func (s *eventFeedSession) receiveFromStream( parentCtx context.Context, - addr string, - storeID uint64, - stream cdcpb.ChangeData_EventFeedClient, + stream *eventFeedStream, pendingRegions *syncRegionFeedStateMap, ) error { var tsStat *tableStoreStat s.client.tableStoreStats.Lock() - key := fmt.Sprintf("%d_%d", s.totalSpan.TableID, storeID) + key := fmt.Sprintf("%d_%d", s.totalSpan.TableID, stream.storeID) if tsStat = s.client.tableStoreStats.v[key]; tsStat == nil { tsStat = new(tableStoreStat) s.client.tableStoreStats.v[key] = tsStat @@ -1047,8 +1062,9 @@ func (s *eventFeedSession) receiveFromStream( zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), - zap.Uint64("storeID", storeID), - zap.String("store", addr)) + zap.String("store", stream.addr), + zap.Uint64("storeID", stream.storeID), + zap.Uint64("streamID", stream.id)) failpoint.Inject("kvClientStreamCloseDelay", nil) @@ -1069,13 +1085,19 @@ func (s *eventFeedSession) receiveFromStream( metricSendEventBatchResolvedSize := batchResolvedEventSize. WithLabelValues(s.changefeed.Namespace, s.changefeed.ID) metricReceiveBusyRatio := workerBusyRatio.WithLabelValues( - s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-receiver") + s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), stream.addr, "event-receiver") metricProcessBusyRatio := workerBusyRatio.WithLabelValues( - s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-processor") + s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), stream.addr, "event-processor") // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic - worker := newRegionWorker(parentCtx, s.changefeed, s, addr, pendingRegions) + worker := newRegionWorker( + parentCtx, + stream.cancel, + s.changefeed, + s, + stream.addr, + pendingRegions) defer worker.evictAllRegions() ctx, cancel := context.WithCancel(parentCtx) @@ -1098,8 +1120,9 @@ func (s *eventFeedSession) receiveFromStream( zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), - zap.Any("store", addr), - zap.Any("storeID", storeID), + zap.String("store", stream.addr), + zap.Uint64("storeID", stream.storeID), + zap.Uint64("streamID", stream.id), zap.Error(err)) } return err @@ -1113,7 +1136,7 @@ func (s *eventFeedSession) receiveFromStream( maxCommitTs := model.Ts(0) for { startToReceive := time.Now() - cevent, err := stream.Recv() + cevent, err := stream.client.Recv() if s.enableTableMonitor { receiveTime += time.Since(startToReceive) @@ -1151,17 +1174,18 @@ func (s *eventFeedSession) receiveFromStream( zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), - zap.String("tableName", s.tableName), - zap.Uint64("storeID", storeID), - zap.String("store", addr)) + zap.String("store", stream.addr), + zap.Uint64("storeID", stream.storeID), + zap.Uint64("streamID", stream.id)) } else { log.Warn("failed to receive from stream", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), - zap.Uint64("storeID", storeID), - zap.String("store", addr), + zap.String("store", stream.addr), + zap.Uint64("storeID", stream.storeID), + zap.Uint64("streamID", stream.id), zap.Error(err)) // Note that pd need at lease 10s+ to tag a kv node as disconnect if kv node down // tikv raft need wait (raft-base-tick-interval * raft-election-timeout-ticks) 10s to start a new @@ -1173,13 +1197,9 @@ func (s *eventFeedSession) receiveFromStream( // needs time to recover, kv client doesn't need to retry frequently. // TODO: add a better retry backoff or rate limitter time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) - - // TODO: better to closes the send direction of the stream to notify - // the other side, but it is not safe to call CloseSend concurrently - // with SendMsg, in future refactor we should refine the recv loop - s.deleteStream(addr) - // send nil regionStatefulEvent to signal worker exit + // worker.sendEvents will return error if ctx is canceled + // In this case, we should return the error to the caller to cancel the whole job. err = worker.sendEvents(ctx, []*regionStatefulEvent{nil}) if err != nil { return err @@ -1386,37 +1406,47 @@ func (s *eventFeedSession) sendResolvedTs( return nil } -func (s *eventFeedSession) addStream(storeAddr string, stream *eventFeedStream, cancel context.CancelFunc) { - s.streamsLock.Lock() - defer s.streamsLock.Unlock() - s.streams[storeAddr] = stream - s.streamsCanceller[storeAddr] = cancel +func (s *eventFeedSession) addStream(stream *eventFeedStream) { + s.storeStreamsCacheLock.Lock() + defer s.storeStreamsCacheLock.Unlock() + s.storeStreamsCache[stream.addr] = stream } -func (s *eventFeedSession) deleteStream(storeAddr string) { - s.streamsLock.Lock() - defer s.streamsLock.Unlock() - if stream, ok := s.streams[storeAddr]; ok { - s.client.grpcPool.ReleaseConn(stream.conn, storeAddr) - delete(s.streams, storeAddr) - } - if cancel, ok := s.streamsCanceller[storeAddr]; ok { - cancel() - delete(s.streamsCanceller, storeAddr) +// deleteStream deletes a stream from the session.streams. +// If the stream is not found, it will be ignored. +func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { + s.storeStreamsCacheLock.Lock() + defer s.storeStreamsCacheLock.Unlock() + + if streamInMap, ok := s.storeStreamsCache[streamToDelete.addr]; ok { + if streamInMap.id != streamToDelete.id { + log.Info("delete stream failed, stream id mismatch, ignore it", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("streamID", streamToDelete.id), + zap.Uint64("streamIDInMap", streamInMap.id)) + return + } + s.client.grpcPool.ReleaseConn(streamToDelete.conn, streamToDelete.addr) + delete(s.storeStreamsCache, streamToDelete.addr) } + streamToDelete.cancel() + log.Info("A stream to store has been removed", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.String("store", streamToDelete.addr), + zap.Uint64("storeID", streamToDelete.storeID), + zap.Uint64("streamID", streamToDelete.id)) } func (s *eventFeedSession) getStream(storeAddr string) (stream *eventFeedStream, ok bool) { - s.streamsLock.RLock() - defer s.streamsLock.RUnlock() - stream, ok = s.streams[storeAddr] - return -} - -func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.CancelFunc, ok bool) { - s.streamsLock.RLock() - defer s.streamsLock.RUnlock() - cancel, ok = s.streamsCanceller[storeAddr] + s.storeStreamsCacheLock.RLock() + defer s.storeStreamsCacheLock.RUnlock() + stream, ok = s.storeStreamsCache[storeAddr] return } diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 7b7a6571be4..18b5c9ca2ef 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -114,8 +114,8 @@ type regionWorker struct { metrics *regionWorkerMetrics - storeAddr string - + storeAddr string + streamCancel func() // how many pending input events inputPending int32 @@ -157,23 +157,27 @@ func newRegionWorkerMetrics(changefeedID model.ChangeFeedID, tableID string, sto } func newRegionWorker( - ctx context.Context, changefeedID model.ChangeFeedID, s *eventFeedSession, addr string, + ctx context.Context, + streamCancel context.CancelFunc, + changefeedID model.ChangeFeedID, + s *eventFeedSession, + addr string, pendingRegions *syncRegionFeedStateMap, ) *regionWorker { return ®ionWorker{ - parentCtx: ctx, - session: s, - inputCh: make(chan []*regionStatefulEvent, regionWorkerInputChanSize), - outputCh: s.eventCh, - errorCh: make(chan error, 1), - statesManager: newRegionStateManager(-1), - rtsManager: newRegionTsManager(), - rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), - storeAddr: addr, - concurrency: int(s.client.config.KVClient.WorkerConcurrent), - metrics: newRegionWorkerMetrics(changefeedID, strconv.FormatInt(s.tableID, 10), addr), - inputPending: 0, - + parentCtx: ctx, + session: s, + inputCh: make(chan []*regionStatefulEvent, regionWorkerInputChanSize), + outputCh: s.eventCh, + errorCh: make(chan error, 1), + statesManager: newRegionStateManager(-1), + rtsManager: newRegionTsManager(), + rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), + storeAddr: addr, + streamCancel: streamCancel, + concurrency: int(s.client.config.KVClient.WorkerConcurrent), + metrics: newRegionWorkerMetrics(changefeedID, strconv.FormatInt(s.tableID, 10), addr), + inputPending: 0, pendingRegions: pendingRegions, } } @@ -595,21 +599,15 @@ func (w *regionWorker) checkErrorReconnect(err error) error { } func (w *regionWorker) cancelStream(delay time.Duration) { - cancel, ok := w.session.getStreamCancel(w.storeAddr) - if ok { - // cancel the stream to trigger strem.Recv with context cancel error - // Note use context cancel is the only way to terminate a gRPC stream - cancel() - // Failover in stream.Recv has 0-100ms delay, the onRegionFail - // should be called after stream has been deleted. Add a delay here - // to avoid too frequent region rebuilt. - time.Sleep(delay) - } else { - log.Warn("gRPC stream cancel func not found", - zap.String("addr", w.storeAddr), - zap.String("namespace", w.session.client.changefeed.Namespace), - zap.String("changefeed", w.session.client.changefeed.ID)) - } + // cancel the stream to make strem.Recv returns a context cancel error + // This will make the receiveFromStream goroutine exit and the stream can + // be re-established by the caller. + // Note: use context cancel is the only way to terminate a gRPC stream. + w.streamCancel() + // Failover in stream.Recv has 0-100ms delay, the onRegionFail + // should be called after stream has been deleted. Add a delay here + // to avoid too frequent region rebuilt. + time.Sleep(delay) } func (w *regionWorker) run(enableTableMonitor bool) error { diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index c8041246a96..20f2881c53a 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -159,7 +159,7 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) { &tikv.RPCContext{}), 0) state.sri.lockedRange = ®ionlock.LockedRange{} state.start() - worker := newRegionWorker(ctx, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) + worker := newRegionWorker(ctx, cancel, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) require.Equal(t, 2, cap(worker.outputCh)) // Receive prewrite2 with empty value. @@ -311,7 +311,7 @@ func TestRegionWorkerHandleResolvedTs(t *testing.T) { } func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) s := createFakeEventFeedSession() s.eventCh = make(chan model.RegionFeedEvent, 2) s1 := newRegionFeedState(newSingleRegionInfo( @@ -322,7 +322,7 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { s1.sri.lockedRange = ®ionlock.LockedRange{} s1.sri.lockedRange.CheckpointTs.Store(9) s1.start() - w := newRegionWorker(ctx, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) + w := newRegionWorker(ctx, cancel, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) err := w.handleResolvedTs(ctx, &resolvedTsEvent{ resolvedTs: 5, From 58087886f5ed352bffee1d0138111c5aa7bf4e8d Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 29 Jan 2024 12:07:30 +0800 Subject: [PATCH 02/14] add sleep to help debug --- cdc/kv/client.go | 5 +++-- cdc/kv/region_worker.go | 6 +++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 5cec1ec6687..c6b3e560230 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -768,7 +768,8 @@ func (s *eventFeedSession) requestRegionToStore( zap.String("store", storeAddr), zap.Error(err)) } - + // TODO(dongmen): remove this line after testing. + time.Sleep(time.Second * 5) // Delete the stream from the cache so that when next time the store is accessed, // the stream can be re-established. s.deleteStream(stream) @@ -1420,7 +1421,7 @@ func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { if streamInMap, ok := s.storeStreamsCache[streamToDelete.addr]; ok { if streamInMap.id != streamToDelete.id { - log.Info("delete stream failed, stream id mismatch, ignore it", + log.Warn("delete stream failed, stream id mismatch, ignore it", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 18b5c9ca2ef..0c3f79cfde0 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -114,7 +114,8 @@ type regionWorker struct { metrics *regionWorkerMetrics - storeAddr string + storeAddr string + // streamCancel is used to cancel the gRPC stream of this region worker's stream. streamCancel func() // how many pending input events inputPending int32 @@ -603,6 +604,9 @@ func (w *regionWorker) cancelStream(delay time.Duration) { // This will make the receiveFromStream goroutine exit and the stream can // be re-established by the caller. // Note: use context cancel is the only way to terminate a gRPC stream. + + // TODO(dongmen): remove this line after testing. + time.Sleep(time.Second * 5) w.streamCancel() // Failover in stream.Recv has 0-100ms delay, the onRegionFail // should be called after stream has been deleted. Add a delay here From dc035b2a38fae5826b60c8dfe99a253d23fbeabb Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 29 Jan 2024 12:26:45 +0800 Subject: [PATCH 03/14] fix panic --- cdc/kv/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index c6b3e560230..0d7876138ec 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -671,9 +671,9 @@ func (s *eventFeedSession) requestRegionToStore( // regions map, the old map will be used in old `receiveFromStream` // and won't be deleted until that goroutine exits. pendingRegions := newSyncRegionFeedStateMap() - storePendingRegions[stream.id] = pendingRegions streamCtx, streamCancel := context.WithCancel(ctx) stream, err = s.client.newStream(streamCtx, streamCancel, storeAddr, storeID) + storePendingRegions[stream.id] = pendingRegions if err != nil { // get stream failed, maybe the store is down permanently, we should try to relocate the active store log.Warn("get grpc stream client failed", From f15445c246338741afc6c60951159cfa56139da3 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 29 Jan 2024 14:20:45 +0800 Subject: [PATCH 04/14] fix ut error --- cdc/kv/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 0d7876138ec..cfa7d84ec4a 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -673,7 +673,6 @@ func (s *eventFeedSession) requestRegionToStore( pendingRegions := newSyncRegionFeedStateMap() streamCtx, streamCancel := context.WithCancel(ctx) stream, err = s.client.newStream(streamCtx, streamCancel, storeAddr, storeID) - storePendingRegions[stream.id] = pendingRegions if err != nil { // get stream failed, maybe the store is down permanently, we should try to relocate the active store log.Warn("get grpc stream client failed", @@ -698,6 +697,8 @@ func (s *eventFeedSession) requestRegionToStore( s.onRegionFail(ctx, errInfo) continue } + + storePendingRegions[stream.id] = pendingRegions s.addStream(stream) log.Info("creating new stream to store to send request", zap.String("namespace", s.changefeed.Namespace), From 11b8f3f1557b51fa3a2f177cbedab1eb2aeb4877 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 29 Jan 2024 15:34:22 +0800 Subject: [PATCH 05/14] remove debug hard code --- cdc/kv/client.go | 2 -- cdc/kv/region_worker.go | 2 -- 2 files changed, 4 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index cfa7d84ec4a..779d36de3b4 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -769,8 +769,6 @@ func (s *eventFeedSession) requestRegionToStore( zap.String("store", storeAddr), zap.Error(err)) } - // TODO(dongmen): remove this line after testing. - time.Sleep(time.Second * 5) // Delete the stream from the cache so that when next time the store is accessed, // the stream can be re-established. s.deleteStream(stream) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 0c3f79cfde0..d6cb2e2e90b 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -605,8 +605,6 @@ func (w *regionWorker) cancelStream(delay time.Duration) { // be re-established by the caller. // Note: use context cancel is the only way to terminate a gRPC stream. - // TODO(dongmen): remove this line after testing. - time.Sleep(time.Second * 5) w.streamCancel() // Failover in stream.Recv has 0-100ms delay, the onRegionFail // should be called after stream has been deleted. Add a delay here From 85a6d11571e5bd9375a4e12fa7c0eb860d51d7c3 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 30 Jan 2024 10:03:32 +0800 Subject: [PATCH 06/14] resolve comments --- cdc/entry/schema_test_helper.go | 4 - cdc/kv/client.go | 57 +++++----- cdc/kv/client_test.go | 192 ++++++++++++++++---------------- cdc/kv/region_worker.go | 21 +++- 4 files changed, 146 insertions(+), 128 deletions(-) diff --git a/cdc/entry/schema_test_helper.go b/cdc/entry/schema_test_helper.go index cc7857aa7cf..06bac8f4002 100644 --- a/cdc/entry/schema_test_helper.go +++ b/cdc/entry/schema_test_helper.go @@ -15,13 +15,11 @@ package entry import ( "context" - "encoding/hex" "encoding/json" "strings" "testing" "time" - "github.com/pingcap/log" ticonfig "github.com/pingcap/tidb/pkg/config" tiddl "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/domain" @@ -39,7 +37,6 @@ import ( "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" - "go.uber.org/zap" ) // SchemaTestHelper is a test helper for schema which creates an internal tidb instance to generate DDL jobs with meta information @@ -197,7 +194,6 @@ func (s *SchemaTestHelper) DML2Event(dml string, schema, table string) *model.Ro polymorphicEvent := model.NewPolymorphicEvent(rawKV) err := s.mounter.DecodeEvent(context.Background(), polymorphicEvent) require.NoError(s.t, err) - log.Info("fizz dml event", zap.String("key", hex.EncodeToString(polymorphicEvent.RawKV.Key))) return polymorphicEvent.Row } diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 779d36de3b4..3084ffa1211 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -272,7 +272,7 @@ func (c *CDCClient) newStream( conn: conn, addr: addr, storeID: storeID, - id: allocID(), + id: allocateStreamID(), cancel: cancel, } log.Info("created stream to store", @@ -347,15 +347,21 @@ func (c *CDCClient) CommitTs() model.Ts { return ingressCommitTs } -var currentID uint64 = 0 +var currentRequestID uint64 = 0 -func allocID() uint64 { - return atomic.AddUint64(¤tID, 1) +func allocateRequestID() uint64 { + return atomic.AddUint64(¤tRequestID, 1) } // used in test only -func currentRequestID() uint64 { - return atomic.LoadUint64(¤tID) +func getCurrentRequestID() uint64 { + return atomic.LoadUint64(¤tRequestID) +} + +var currentStreamID uint64 = 0 + +func allocateStreamID() uint64 { + return atomic.AddUint64(¤tStreamID, 1) } type eventFeedSession struct { @@ -389,10 +395,10 @@ type eventFeedSession struct { rangeChSizeGauge prometheus.Gauge // storeStreamsCache is used to cache the established gRPC streams to TiKV stores. - storeStreamsCache map[string]*eventFeedStream - // storeStreamsCacheLock is used to protect storeStreamsCache, since the cache - // is shared by multiple goroutines. - storeStreamsCacheLock sync.RWMutex + storeStreamsCache struct { + sync.RWMutex + m map[string]*eventFeedStream + } // use sync.Pool to store resolved ts event only, because resolved ts event // has the same size and generate cycle. @@ -411,11 +417,11 @@ func newEventFeedSession( eventCh chan<- model.RegionFeedEvent, enableTableMonitor bool, ) *eventFeedSession { - id := allocID() + id := allocateRequestID() rangeLock := regionlock.NewRegionRangeLock( id, totalSpan.StartKey, totalSpan.EndKey, startTs, client.changefeed.Namespace+"."+client.changefeed.ID) - return &eventFeedSession{ + res := &eventFeedSession{ client: client, startTs: startTs, changefeed: client.changefeed, @@ -433,7 +439,6 @@ func newEventFeedSession( client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"), rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"), - storeStreamsCache: make(map[string]*eventFeedStream), resolvedTsPool: sync.Pool{ New: func() any { return ®ionStatefulEvent{ @@ -442,6 +447,8 @@ func newEventFeedSession( }, }, } + res.storeStreamsCache.m = make(map[string]*eventFeedStream) + return res } func (s *eventFeedSession) eventFeed(ctx context.Context) error { @@ -639,7 +646,7 @@ func (s *eventFeedSession) requestRegionToStore( return errors.Trace(ctx.Err()) case sri = <-s.regionRouter.Out(): } - requestID := allocID() + requestID := allocateRequestID() rpcCtx := sri.rpcCtx regionID := rpcCtx.Meta.GetId() @@ -733,7 +740,6 @@ func (s *eventFeedSession) requestRegionToStore( state := newRegionFeedState(sri, requestID) pendingRegions.setByRequestID(requestID, state) - s.client.logRegionDetails("start new request", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), @@ -784,7 +790,6 @@ func (s *eventFeedSession) requestRegionToStore( if !ok { continue } - s.client.logRegionDetails("region send to store failed", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), @@ -1407,18 +1412,18 @@ func (s *eventFeedSession) sendResolvedTs( } func (s *eventFeedSession) addStream(stream *eventFeedStream) { - s.storeStreamsCacheLock.Lock() - defer s.storeStreamsCacheLock.Unlock() - s.storeStreamsCache[stream.addr] = stream + s.storeStreamsCache.Lock() + defer s.storeStreamsCache.Unlock() + s.storeStreamsCache.m[stream.addr] = stream } // deleteStream deletes a stream from the session.streams. // If the stream is not found, it will be ignored. func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { - s.storeStreamsCacheLock.Lock() - defer s.storeStreamsCacheLock.Unlock() + s.storeStreamsCache.Lock() + defer s.storeStreamsCache.Unlock() - if streamInMap, ok := s.storeStreamsCache[streamToDelete.addr]; ok { + if streamInMap, ok := s.storeStreamsCache.m[streamToDelete.addr]; ok { if streamInMap.id != streamToDelete.id { log.Warn("delete stream failed, stream id mismatch, ignore it", zap.String("namespace", s.changefeed.Namespace), @@ -1430,7 +1435,7 @@ func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { return } s.client.grpcPool.ReleaseConn(streamToDelete.conn, streamToDelete.addr) - delete(s.storeStreamsCache, streamToDelete.addr) + delete(s.storeStreamsCache.m, streamToDelete.addr) } streamToDelete.cancel() log.Info("A stream to store has been removed", @@ -1444,9 +1449,9 @@ func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { } func (s *eventFeedSession) getStream(storeAddr string) (stream *eventFeedStream, ok bool) { - s.storeStreamsCacheLock.RLock() - defer s.storeStreamsCacheLock.RUnlock() - stream, ok = s.storeStreamsCache[storeAddr] + s.storeStreamsCache.RLock() + defer s.storeStreamsCache.RUnlock() + stream, ok = s.storeStreamsCache.m[storeAddr] return } diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index f2dde8e7f1f..5651104e16f 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -282,10 +282,10 @@ func newMockServiceSpecificAddr( // waitRequestID waits request ID larger than the given allocated ID func waitRequestID(t *testing.T, allocatedID uint64) { err := retry.Do(context.Background(), func() error { - if currentRequestID() > allocatedID { + if getCurrentRequestID() > allocatedID { return nil } - return errors.Errorf("request id %d is not larger than %d", currentRequestID(), allocatedID) + return errors.Errorf("request id %d is not larger than %d", getCurrentRequestID(), allocatedID) }, retry.WithBackoffBaseDelay(10), retry.WithMaxTries(20)) require.Nil(t, err) @@ -317,7 +317,7 @@ func TestConnectOfflineTiKV(t *testing.T) { // {1,2} is the storeID, {4,5} is the peerID, means peer4 is in the store1 cluster.Bootstrap(3, []uint64{1, 2}, []uint64{4, 5}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -347,7 +347,7 @@ func TestConnectOfflineTiKV(t *testing.T) { Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ ResolvedTs: ts, }, @@ -360,7 +360,7 @@ func TestConnectOfflineTiKV(t *testing.T) { require.Equal(t, ts, event.Resolved.ResolvedTs) } - initialized := mockInitializedEvent(3 /* regionID */, currentRequestID()) + initialized := mockInitializedEvent(3 /* regionID */, getCurrentRequestID()) ch2 <- initialized cluster.ChangeLeader(3, 5) @@ -385,13 +385,11 @@ func TestConnectOfflineTiKV(t *testing.T) { require.FailNow(t, "reconnection not succeed in 1 second") } checkEvent(event, ver.Ver) - // check gRPC connection active counter is updated correctly bucket, ok := grpcPool.bucketConns[invalidStore] require.True(t, ok) empty := bucket.recycle() require.True(t, empty) - cancel() } @@ -419,7 +417,7 @@ func TestRecvLargeMessageSize(t *testing.T) { cluster.AddStore(2, addr) cluster.Bootstrap(3, []uint64{2}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -442,7 +440,7 @@ func TestRecvLargeMessageSize(t *testing.T) { // new session, new request waitRequestID(t, baseAllocatedID+1) - initialized := mockInitializedEvent(3 /* regionID */, currentRequestID()) + initialized := mockInitializedEvent(3 /* regionID */, getCurrentRequestID()) ch2 <- initialized var event model.RegionFeedEvent @@ -457,7 +455,7 @@ func TestRecvLargeMessageSize(t *testing.T) { largeMsg := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -519,7 +517,7 @@ func TestHandleError(t *testing.T) { cluster.SplitRaw(region3, region4, []byte("b"), []uint64{6, 7}, 6) cluster.SplitRaw(region4, region5, []byte("c"), []uint64{8, 9}, 9) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -546,7 +544,7 @@ func TestHandleError(t *testing.T) { notLeader := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{ NotLeader: &errorpb.NotLeader{ @@ -568,7 +566,7 @@ func TestHandleError(t *testing.T) { epochNotMatch := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{ EpochNotMatch: &errorpb.EpochNotMatch{}, @@ -582,7 +580,7 @@ func TestHandleError(t *testing.T) { regionNotFound := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{ RegionNotFound: &errorpb.RegionNotFound{}, @@ -596,7 +594,7 @@ func TestHandleError(t *testing.T) { unknownErr := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{}, }, @@ -621,7 +619,7 @@ consumePreResolvedTs: // new session, no leader request, epoch not match request, // region not found request, unknown error request, normal request waitRequestID(t, baseAllocatedID+5) - initialized := mockInitializedEvent(3 /* regionID */, currentRequestID()) + initialized := mockInitializedEvent(3 /* regionID */, getCurrentRequestID()) ch2 <- initialized makeEvent := func(ts uint64) *cdcpb.ChangeDataEvent { @@ -629,7 +627,7 @@ consumePreResolvedTs: Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ ResolvedTs: ts, }, @@ -678,7 +676,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { cluster.AddStore(1, addr1) cluster.Bootstrap(3, []uint64{1}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -704,7 +702,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { incompatibility := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{ Compatibility: &cdcpb.Compatibility{ @@ -745,7 +743,7 @@ func TestClusterIDMismatch(t *testing.T) { cluster.AddStore(1, addr) cluster.Bootstrap(3, []uint64{1}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -772,7 +770,7 @@ func TestClusterIDMismatch(t *testing.T) { clusterIDMismatchEvent := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{ ClusterIdMismatch: &cdcpb.ClusterIDMismatch{ @@ -814,7 +812,7 @@ func testHandleFeedEvent(t *testing.T) { cluster.AddStore(1, addr1) cluster.Bootstrap(3, []uint64{1}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -842,7 +840,7 @@ func testHandleFeedEvent(t *testing.T) { // simulate commit comes before prewrite { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -857,7 +855,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -874,7 +872,7 @@ func testHandleFeedEvent(t *testing.T) { // prewrite and commit in the normal sequence { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -889,7 +887,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -906,7 +904,7 @@ func testHandleFeedEvent(t *testing.T) { // commit event before initializtion without prewrite matched will be ignored { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -921,7 +919,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -937,7 +935,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -952,11 +950,11 @@ func testHandleFeedEvent(t *testing.T) { }, }, }} - initialized := mockInitializedEvent(3 /*regionID */, currentRequestID()) + initialized := mockInitializedEvent(3 /*regionID */, getCurrentRequestID()) eventsAfterInit := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -970,7 +968,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -985,7 +983,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -999,7 +997,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -1014,7 +1012,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -1030,7 +1028,7 @@ func testHandleFeedEvent(t *testing.T) { // simulate TiKV sends txn heartbeat, which is a prewrite event with empty value { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -1044,7 +1042,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -1061,7 +1059,7 @@ func testHandleFeedEvent(t *testing.T) { eventResolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 135}, }, }} @@ -1387,7 +1385,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientStreamRecvError") }() - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -1409,7 +1407,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { // wait request id allocated with: new session, new request waitRequestID(t, baseAllocatedID+1) - initialized1 := mockInitializedEvent(regionID, currentRequestID()) + initialized1 := mockInitializedEvent(regionID, getCurrentRequestID()) ch1 <- initialized1 err = retry.Do(context.Background(), func() error { if len(ch1) == 0 { @@ -1431,13 +1429,13 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { // wait request id allocated with: new session, new request*2 waitRequestID(t, baseAllocatedID+2) - initialized2 := mockInitializedEvent(regionID, currentRequestID()) + initialized2 := mockInitializedEvent(regionID, getCurrentRequestID()) ch1 <- initialized2 resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: regionID, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120}, }, }} @@ -1520,7 +1518,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { cluster.AddStore(1, addr1) cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -1544,15 +1542,15 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { // wait request id allocated with: new session, new request waitRequestID(t, baseAllocatedID+1) err = retry.Do(context.Background(), func() error { - if atomic.LoadUint64(&requestID) == currentRequestID() { + if atomic.LoadUint64(&requestID) == getCurrentRequestID() { return nil } return errors.Errorf("request is not received, requestID: %d, expected: %d", - atomic.LoadUint64(&requestID), currentRequestID()) + atomic.LoadUint64(&requestID), getCurrentRequestID()) }, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(10)) require.Nil(t, err) - initialized1 := mockInitializedEvent(regionID, currentRequestID()) + initialized1 := mockInitializedEvent(regionID, getCurrentRequestID()) ch1 <- initialized1 err = retry.Do(context.Background(), func() error { if len(ch1) == 0 { @@ -1566,7 +1564,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: regionID, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120}, }, }} @@ -1598,15 +1596,15 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { // wait request id allocated with: new session, new request*2 waitRequestID(t, baseAllocatedID+2) err = retry.Do(context.Background(), func() error { - if atomic.LoadUint64(&requestID) == currentRequestID() { + if atomic.LoadUint64(&requestID) == getCurrentRequestID() { return nil } return errors.Errorf("request is not received, requestID: %d, expected: %d", - atomic.LoadUint64(&requestID), currentRequestID()) + atomic.LoadUint64(&requestID), getCurrentRequestID()) }, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(10)) require.Nil(t, err) - initialized2 := mockInitializedEvent(regionID, currentRequestID()) + initialized2 := mockInitializedEvent(regionID, getCurrentRequestID()) ch1 <- initialized2 err = retry.Do(context.Background(), func() error { if len(ch1) == 0 { @@ -1620,7 +1618,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { resolved = &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: regionID, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 130}, }, }} @@ -1806,7 +1804,7 @@ func TestNoPendingRegionError(t *testing.T) { cluster.AddStore(1, addr1) cluster.Bootstrap(3, []uint64{1}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -1832,13 +1830,13 @@ func TestNoPendingRegionError(t *testing.T) { noPendingRegionEvent := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID() + 1, // an invalid request id + RequestId: getCurrentRequestID() + 1, // an invalid request id Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 100}, }, }} ch1 <- noPendingRegionEvent - initialized := mockInitializedEvent(3, currentRequestID()) + initialized := mockInitializedEvent(3, getCurrentRequestID()) ch1 <- initialized ev := <-eventCh require.NotNil(t, ev.Resolved) @@ -1847,7 +1845,7 @@ func TestNoPendingRegionError(t *testing.T) { resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 200}, }, }} @@ -1885,7 +1883,7 @@ func TestDropStaleRequest(t *testing.T) { cluster.AddStore(1, addr1) cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -1908,22 +1906,22 @@ func TestDropStaleRequest(t *testing.T) { // wait request id allocated with: new session, new request waitRequestID(t, baseAllocatedID+1) - initialized := mockInitializedEvent(regionID, currentRequestID()) + initialized := mockInitializedEvent(regionID, getCurrentRequestID()) eventsAfterInit := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: regionID, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120}, }, // This event will be dropped { RegionId: regionID, - RequestId: currentRequestID() - 1, + RequestId: getCurrentRequestID() - 1, Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 125}, }, { RegionId: regionID, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 130}, }, }} @@ -1999,7 +1997,7 @@ func TestResolveLock(t *testing.T) { defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientResolveLockInterval") }() - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -2021,7 +2019,7 @@ func TestResolveLock(t *testing.T) { // wait request id allocated with: new session, new request waitRequestID(t, baseAllocatedID+1) - initialized := mockInitializedEvent(regionID, currentRequestID()) + initialized := mockInitializedEvent(regionID, getCurrentRequestID()) ch1 <- initialized physical, logical, err := pdClient.GetTS(ctx) require.Nil(t, err) @@ -2029,7 +2027,7 @@ func TestResolveLock(t *testing.T) { resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: regionID, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: tso}, }, }} @@ -2104,7 +2102,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientErrUnreachable") }() - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -2129,7 +2127,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { waitRequestID(t, baseAllocatedID+1) for _, event := range events { for _, ev := range event.Events { - ev.RequestId = currentRequestID() + ev.RequestId = getCurrentRequestID() } ch1 <- event } @@ -2143,7 +2141,7 @@ func TestCommittedFallback(t *testing.T) { {Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -2168,7 +2166,7 @@ func TestDuplicateRequest(t *testing.T) { {Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{ DuplicateRequest: &cdcpb.DuplicateRequest{RegionId: 3}, @@ -2232,7 +2230,7 @@ func testEventAfterFeedStop(t *testing.T) { defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientSingleFeedProcessDelay") }() - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -2258,7 +2256,7 @@ func testEventAfterFeedStop(t *testing.T) { epochNotMatch := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{ EpochNotMatch: &errorpb.EpochNotMatch{}, @@ -2273,7 +2271,7 @@ func testEventAfterFeedStop(t *testing.T) { committed := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -2288,7 +2286,7 @@ func testEventAfterFeedStop(t *testing.T) { }, }, }} - initialized := mockInitializedEvent(regionID, currentRequestID()) + initialized := mockInitializedEvent(regionID, getCurrentRequestID()) resolved := &cdcpb.ChangeDataEvent{ ResolvedTs: &cdcpb.ResolvedTs{ Regions: []uint64{3}, @@ -2338,8 +2336,8 @@ func testEventAfterFeedStop(t *testing.T) { require.Nil(t, err) // wait request id allocated with: new session, 2 * new request - committedClone.Events[0].RequestId = currentRequestID() - initializedClone.Events[0].RequestId = currentRequestID() + committedClone.Events[0].RequestId = getCurrentRequestID() + initializedClone.Events[0].RequestId = getCurrentRequestID() ch2 <- committedClone ch2 <- initializedClone ch2 <- resolvedClone @@ -2419,7 +2417,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { cluster.AddStore(1, addr1) cluster.Bootstrap(3, []uint64{1}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -2446,7 +2444,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { // will be filtered out { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -2462,7 +2460,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -2477,12 +2475,12 @@ func TestOutOfRegionRangeEvent(t *testing.T) { }, }, }} - initialized := mockInitializedEvent(3 /*regionID */, currentRequestID()) + initialized := mockInitializedEvent(3 /*regionID */, getCurrentRequestID()) eventsAfterInit := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ // will be filtered out { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -2498,7 +2496,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { // will be filtered out { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -2513,7 +2511,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -2528,7 +2526,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -2637,7 +2635,7 @@ func TestResolveLockNoCandidate(t *testing.T) { cluster.AddStore(storeID, addr1) cluster.Bootstrap(regionID, []uint64{storeID}, []uint64{peerID}, peerID) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -2659,7 +2657,7 @@ func TestResolveLockNoCandidate(t *testing.T) { // wait request id allocated with: new session, new request waitRequestID(t, baseAllocatedID+1) - initialized := mockInitializedEvent(regionID, currentRequestID()) + initialized := mockInitializedEvent(regionID, getCurrentRequestID()) ch1 <- initialized var wg2 sync.WaitGroup @@ -2673,7 +2671,7 @@ func TestResolveLockNoCandidate(t *testing.T) { resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: regionID, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: tso}, }, }} @@ -2733,7 +2731,7 @@ func TestFailRegionReentrant(t *testing.T) { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientRegionReentrantError") _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientRegionReentrantErrorDelay") }() - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -2758,7 +2756,7 @@ func TestFailRegionReentrant(t *testing.T) { unknownErr := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{}, }, @@ -2766,7 +2764,7 @@ func TestFailRegionReentrant(t *testing.T) { }} ch1 <- unknownErr // use a fake event to trigger one more stream.Recv - initialized := mockInitializedEvent(regionID, currentRequestID()) + initialized := mockInitializedEvent(regionID, getCurrentRequestID()) ch1 <- initialized // since re-establish new region request is delayed by `kvClientRegionReentrantErrorDelay` // there will be reentrant region failover, the kv client should not panic. @@ -2904,14 +2902,14 @@ func testClientErrNoPendingRegion(t *testing.T) { require.Equal(t, context.Canceled, errors.Cause(err)) }() - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() // wait the second region is scheduled time.Sleep(time.Millisecond * 500) waitRequestID(t, baseAllocatedID+1) - initialized := mockInitializedEvent(regionID3, currentRequestID()) + initialized := mockInitializedEvent(regionID3, getCurrentRequestID()) ch1 <- initialized waitRequestID(t, baseAllocatedID+2) - initialized = mockInitializedEvent(regionID4, currentRequestID()) + initialized = mockInitializedEvent(regionID4, getCurrentRequestID()) ch1 <- initialized // wait the kvClientPendingRegionDelay ends, and the second region is processed time.Sleep(time.Second * 2) @@ -2982,9 +2980,9 @@ func testKVClientForceReconnect(t *testing.T) { require.Equal(t, context.Canceled, errors.Cause(err)) }() - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() waitRequestID(t, baseAllocatedID+1) - initialized := mockInitializedEvent(regionID3, currentRequestID()) + initialized := mockInitializedEvent(regionID3, getCurrentRequestID()) ch1 <- initialized // Connection close for timeout @@ -3230,7 +3228,7 @@ func TestEvTimeUpdate(t *testing.T) { reconnectInterval = originalReconnectInterval }() - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -3258,7 +3256,7 @@ func TestEvTimeUpdate(t *testing.T) { events := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -3356,7 +3354,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { cluster.AddStore(1, addr1) cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -3382,7 +3380,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { epochNotMatch := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{ EpochNotMatch: &errorpb.EpochNotMatch{}, @@ -3459,7 +3457,7 @@ func TestPrewriteNotMatchError(t *testing.T) { ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() wg.Add(1) go func() { diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index d6cb2e2e90b..e1e41c86cf8 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -216,6 +216,13 @@ func (w *regionWorker) checkShouldExit() error { // If there is no region maintained by this region worker, exit it and // cancel the gRPC stream. if empty && w.pendingRegions.len() == 0 { + log.Info("A single region error happens before, "+ + "and there is no region maintained by this region worker, "+ + "exit it and cancel the gRPC stream", + zap.String("namespace", w.session.client.changefeed.Namespace), + zap.String("changefeed", w.session.client.changefeed.ID), + zap.String("storeAddr", w.storeAddr)) + w.cancelStream(time.Duration(0)) return cerror.ErrRegionWorkerExit.GenWithStackByArgs() } @@ -259,6 +266,15 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState // `ErrPrewriteNotMatch` would cause duplicated request to the same region, // so cancel the original gRPC stream before restarts a new stream. if cerror.ErrPrewriteNotMatch.Equal(err) { + log.Info("meet ErrPrewriteNotMatch error, cancel the gRPC stream", + zap.String("namespace", w.session.client.changefeed.Namespace), + zap.String("changefeed", w.session.client.changefeed.ID), + zap.String("storeAddr", w.storeAddr), + zap.Uint64("regionID", regionID), + zap.Uint64("requestID", state.requestID), + zap.Stringer("span", &state.sri.span), + zap.Uint64("resolvedTs", state.sri.resolvedTs()), + zap.Error(err)) w.cancelStream(time.Second) } @@ -592,6 +608,10 @@ func (w *regionWorker) collectWorkpoolError(ctx context.Context) error { func (w *regionWorker) checkErrorReconnect(err error) error { if errors.Cause(err) == errReconnect { + log.Info("kv client reconnect triggered, cancel the gRPC stream", + zap.String("namespace", w.session.client.changefeed.Namespace), + zap.String("changefeed", w.session.client.changefeed.ID), + zap.String("addr", w.storeAddr)) w.cancelStream(time.Second) // if stream is already deleted, just ignore errReconnect return nil @@ -604,7 +624,6 @@ func (w *regionWorker) cancelStream(delay time.Duration) { // This will make the receiveFromStream goroutine exit and the stream can // be re-established by the caller. // Note: use context cancel is the only way to terminate a gRPC stream. - w.streamCancel() // Failover in stream.Recv has 0-100ms delay, the onRegionFail // should be called after stream has been deleted. Add a delay here From 671a5536b238ea47ea6f7feca88a95e576402216 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 30 Jan 2024 11:29:07 +0800 Subject: [PATCH 07/14] refine region worker --- cdc/kv/client.go | 11 ++++++---- cdc/kv/region_worker.go | 40 +++++++++++++++++++++--------------- cdc/kv/region_worker_test.go | 14 ++++++++++--- 3 files changed, 41 insertions(+), 24 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 3084ffa1211..5a4033ddd0c 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -280,7 +280,9 @@ func (c *CDCClient) newStream( zap.String("changefeed", c.changefeed.ID), zap.Int64("tableID", c.tableID), zap.String("tableName", c.tableName), - zap.String("store", addr)) + zap.String("store", addr), + zap.Uint64("storeID", storeID), + zap.Uint64("streamID", stream.id)) return nil } if c.config.Debug.EnableKVConnectBackOff { @@ -762,6 +764,7 @@ func (s *eventFeedSession) requestRegionToStore( zap.String("tableName", s.tableName), zap.Uint64("storeID", storeID), zap.String("store", storeAddr), + zap.Uint64("streamID", stream.id), zap.Uint64("regionID", regionID), zap.Uint64("requestID", requestID), zap.Error(err)) @@ -771,6 +774,7 @@ func (s *eventFeedSession) requestRegionToStore( zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), + zap.Uint64("streamID", stream.id), zap.Uint64("storeID", storeID), zap.String("store", storeAddr), zap.Error(err)) @@ -1098,10 +1102,8 @@ func (s *eventFeedSession) receiveFromStream( // to call exactly once from outer code logic worker := newRegionWorker( parentCtx, - stream.cancel, - s.changefeed, + stream, s, - stream.addr, pendingRegions) defer worker.evictAllRegions() @@ -1179,6 +1181,7 @@ func (s *eventFeedSession) receiveFromStream( zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), zap.String("store", stream.addr), zap.Uint64("storeID", stream.storeID), zap.Uint64("streamID", stream.id)) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index e1e41c86cf8..0f106ce9b46 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -98,6 +98,7 @@ for event processing to increase throughput. type regionWorker struct { parentCtx context.Context session *eventFeedSession + stream *eventFeedStream inputCh chan []*regionStatefulEvent outputCh chan<- model.RegionFeedEvent @@ -114,12 +115,8 @@ type regionWorker struct { metrics *regionWorkerMetrics - storeAddr string - // streamCancel is used to cancel the gRPC stream of this region worker's stream. - streamCancel func() // how many pending input events - inputPending int32 - + inputPending int32 pendingRegions *syncRegionFeedStateMap } @@ -159,10 +156,8 @@ func newRegionWorkerMetrics(changefeedID model.ChangeFeedID, tableID string, sto func newRegionWorker( ctx context.Context, - streamCancel context.CancelFunc, - changefeedID model.ChangeFeedID, + stream *eventFeedStream, s *eventFeedSession, - addr string, pendingRegions *syncRegionFeedStateMap, ) *regionWorker { return ®ionWorker{ @@ -174,10 +169,8 @@ func newRegionWorker( statesManager: newRegionStateManager(-1), rtsManager: newRegionTsManager(), rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), - storeAddr: addr, - streamCancel: streamCancel, concurrency: int(s.client.config.KVClient.WorkerConcurrent), - metrics: newRegionWorkerMetrics(changefeedID, strconv.FormatInt(s.tableID, 10), addr), + metrics: newRegionWorkerMetrics(s.changefeed, strconv.FormatInt(s.tableID, 10), stream.addr), inputPending: 0, pendingRegions: pendingRegions, } @@ -221,8 +214,10 @@ func (w *regionWorker) checkShouldExit() error { "exit it and cancel the gRPC stream", zap.String("namespace", w.session.client.changefeed.Namespace), zap.String("changefeed", w.session.client.changefeed.ID), - zap.String("storeAddr", w.storeAddr)) - + zap.String("storeAddr", w.stream.addr), + zap.Uint64("streamID", w.stream.id), + zap.Int64("tableID", w.session.tableID), + zap.String("tableName", w.session.tableName)) w.cancelStream(time.Duration(0)) return cerror.ErrRegionWorkerExit.GenWithStackByArgs() } @@ -269,7 +264,10 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState log.Info("meet ErrPrewriteNotMatch error, cancel the gRPC stream", zap.String("namespace", w.session.client.changefeed.Namespace), zap.String("changefeed", w.session.client.changefeed.ID), - zap.String("storeAddr", w.storeAddr), + zap.String("storeAddr", w.stream.addr), + zap.Uint64("streamID", w.stream.id), + zap.Int64("tableID", w.session.tableID), + zap.String("tableName", w.session.tableName), zap.Uint64("regionID", regionID), zap.Uint64("requestID", state.requestID), zap.Stringer("span", &state.sri.span), @@ -370,7 +368,10 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock", zap.String("namespace", w.session.client.changefeed.Namespace), zap.String("changefeed", w.session.client.changefeed.ID), - zap.String("addr", w.storeAddr), + zap.String("storeAddr", w.stream.addr), + zap.Uint64("streamID", w.stream.id), + zap.Int64("tableID", w.session.tableID), + zap.String("tableName", w.session.tableName), zap.Uint64("regionID", rts.regionID), zap.Stringer("span", &state.sri.span), zap.Duration("duration", sinceLastResolvedTs), @@ -611,7 +612,10 @@ func (w *regionWorker) checkErrorReconnect(err error) error { log.Info("kv client reconnect triggered, cancel the gRPC stream", zap.String("namespace", w.session.client.changefeed.Namespace), zap.String("changefeed", w.session.client.changefeed.ID), - zap.String("addr", w.storeAddr)) + zap.String("storeAddr", w.stream.addr), + zap.Uint64("streamID", w.stream.id), + zap.Int64("tableID", w.session.tableID), + zap.String("tableName", w.session.tableName)) w.cancelStream(time.Second) // if stream is already deleted, just ignore errReconnect return nil @@ -619,12 +623,14 @@ func (w *regionWorker) checkErrorReconnect(err error) error { return err } +// Note(dongmen): Please log the reason of calling this function in the caller. +// This will be helpful for troubleshooting. func (w *regionWorker) cancelStream(delay time.Duration) { // cancel the stream to make strem.Recv returns a context cancel error // This will make the receiveFromStream goroutine exit and the stream can // be re-established by the caller. // Note: use context cancel is the only way to terminate a gRPC stream. - w.streamCancel() + w.stream.cancel() // Failover in stream.Recv has 0-100ms delay, the onRegionFail // should be called after stream has been deleted. Add a delay here // to avoid too frequent region rebuilt. diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index 20f2881c53a..370b28f5380 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -159,7 +159,11 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) { &tikv.RPCContext{}), 0) state.sri.lockedRange = ®ionlock.LockedRange{} state.start() - worker := newRegionWorker(ctx, cancel, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) + stream := &eventFeedStream{ + storeID: 1, + id: 2, + } + worker := newRegionWorker(ctx, stream, s, newSyncRegionFeedStateMap()) require.Equal(t, 2, cap(worker.outputCh)) // Receive prewrite2 with empty value. @@ -311,7 +315,7 @@ func TestRegionWorkerHandleResolvedTs(t *testing.T) { } func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() s := createFakeEventFeedSession() s.eventCh = make(chan model.RegionFeedEvent, 2) s1 := newRegionFeedState(newSingleRegionInfo( @@ -322,7 +326,11 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { s1.sri.lockedRange = ®ionlock.LockedRange{} s1.sri.lockedRange.CheckpointTs.Store(9) s1.start() - w := newRegionWorker(ctx, cancel, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) + stream := &eventFeedStream{ + storeID: 1, + id: 2, + } + w := newRegionWorker(ctx, stream, s, newSyncRegionFeedStateMap()) err := w.handleResolvedTs(ctx, &resolvedTsEvent{ resolvedTs: 5, From ddcc3215d41b7bad0a803a117e3339b985c48ec1 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 30 Jan 2024 11:51:02 +0800 Subject: [PATCH 08/14] add a time sleep to prevent stream delete frequency --- cdc/kv/client.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 5a4033ddd0c..1454a9de6d9 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -77,6 +77,8 @@ const ( scanRegionsConcurrency = 1024 tableMonitorInterval = 2 * time.Second + + streamDeleteInterval = 2 * time.Second ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -400,6 +402,10 @@ type eventFeedSession struct { storeStreamsCache struct { sync.RWMutex m map[string]*eventFeedStream + // lastDeleteTime is used to record last time a stream is deleted from the cache. + // It is used to avoid creation/deleting streams too frequently, which may cause + // huge overhead of incremental region scanning in TiKV. + lastDeleteTime map[string]time.Time } // use sync.Pool to store resolved ts event only, because resolved ts event @@ -450,6 +456,7 @@ func newEventFeedSession( }, } res.storeStreamsCache.m = make(map[string]*eventFeedStream) + res.storeStreamsCache.lastDeleteTime = make(map[string]time.Time) return res } @@ -1437,8 +1444,21 @@ func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { zap.Uint64("streamIDInMap", streamInMap.id)) return } + if lastTime, ok := s.storeStreamsCache.lastDeleteTime[streamToDelete.addr]; ok { + if time.Since(lastTime) < streamDeleteInterval { + log.Warn("delete stream failed, delete too frequently, wait 1 second", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("streamID", streamToDelete.id), + zap.Duration("duration", time.Since(lastTime))) + time.Sleep(streamDeleteInterval - time.Since(lastTime)) + } + } s.client.grpcPool.ReleaseConn(streamToDelete.conn, streamToDelete.addr) delete(s.storeStreamsCache.m, streamToDelete.addr) + s.storeStreamsCache.lastDeleteTime[streamToDelete.addr] = time.Now() } streamToDelete.cancel() log.Info("A stream to store has been removed", From 12025cee728269fb7cd2ae622db82de54b89cf96 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 30 Jan 2024 11:58:05 +0800 Subject: [PATCH 09/14] fix panic --- cdc/kv/region_worker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 0f106ce9b46..5eee15899ce 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -165,6 +165,7 @@ func newRegionWorker( session: s, inputCh: make(chan []*regionStatefulEvent, regionWorkerInputChanSize), outputCh: s.eventCh, + stream: stream, errorCh: make(chan error, 1), statesManager: newRegionStateManager(-1), rtsManager: newRegionTsManager(), From 5fb44f9ce8a8ef816e09a43eda47441a8549d18f Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 30 Jan 2024 12:13:14 +0800 Subject: [PATCH 10/14] refine log --- cdc/kv/client.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 1454a9de6d9..2ce3df6f70b 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -1444,15 +1444,17 @@ func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { zap.Uint64("streamIDInMap", streamInMap.id)) return } + if lastTime, ok := s.storeStreamsCache.lastDeleteTime[streamToDelete.addr]; ok { if time.Since(lastTime) < streamDeleteInterval { - log.Warn("delete stream failed, delete too frequently, wait 1 second", + log.Warn( + "delete a stream of a same store too frequently, wait a while and try again", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), zap.Uint64("streamID", streamToDelete.id), - zap.Duration("duration", time.Since(lastTime))) + zap.Duration("sinceLastTime", time.Since(lastTime))) time.Sleep(streamDeleteInterval - time.Since(lastTime)) } } @@ -1460,6 +1462,7 @@ func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { delete(s.storeStreamsCache.m, streamToDelete.addr) s.storeStreamsCache.lastDeleteTime[streamToDelete.addr] = time.Now() } + streamToDelete.cancel() log.Info("A stream to store has been removed", zap.String("namespace", s.changefeed.Namespace), From e9a953d5abed5d1a82a5f899ef6bd038572d86ae Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 30 Jan 2024 12:25:33 +0800 Subject: [PATCH 11/14] prevent stream creation too frequent --- cdc/kv/client.go | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 2ce3df6f70b..665d8caab6f 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -78,7 +78,7 @@ const ( tableMonitorInterval = 2 * time.Second - streamDeleteInterval = 2 * time.Second + streamAlterInterval = 2 * time.Second ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -402,10 +402,10 @@ type eventFeedSession struct { storeStreamsCache struct { sync.RWMutex m map[string]*eventFeedStream - // lastDeleteTime is used to record last time a stream is deleted from the cache. + // lastAlterTime is used to record last time a stream is created/deleted to/from the cache. // It is used to avoid creation/deleting streams too frequently, which may cause // huge overhead of incremental region scanning in TiKV. - lastDeleteTime map[string]time.Time + lastAlterTime map[string]time.Time } // use sync.Pool to store resolved ts event only, because resolved ts event @@ -456,7 +456,7 @@ func newEventFeedSession( }, } res.storeStreamsCache.m = make(map[string]*eventFeedStream) - res.storeStreamsCache.lastDeleteTime = make(map[string]time.Time) + res.storeStreamsCache.lastAlterTime = make(map[string]time.Time) return res } @@ -1424,6 +1424,19 @@ func (s *eventFeedSession) sendResolvedTs( func (s *eventFeedSession) addStream(stream *eventFeedStream) { s.storeStreamsCache.Lock() defer s.storeStreamsCache.Unlock() + if lastTime, ok := s.storeStreamsCache.lastAlterTime[stream.addr]; ok { + if time.Since(lastTime) < streamAlterInterval { + log.Warn( + "add a stream of a same store too frequently, wait a while and try again", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("storeID", stream.storeID), + zap.Duration("sinceLastTime", time.Since(lastTime))) + time.Sleep(streamAlterInterval - time.Since(lastTime)) + } + } s.storeStreamsCache.m[stream.addr] = stream } @@ -1445,8 +1458,8 @@ func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { return } - if lastTime, ok := s.storeStreamsCache.lastDeleteTime[streamToDelete.addr]; ok { - if time.Since(lastTime) < streamDeleteInterval { + if lastTime, ok := s.storeStreamsCache.lastAlterTime[streamToDelete.addr]; ok { + if time.Since(lastTime) < streamAlterInterval { log.Warn( "delete a stream of a same store too frequently, wait a while and try again", zap.String("namespace", s.changefeed.Namespace), @@ -1455,12 +1468,12 @@ func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { zap.String("tableName", s.tableName), zap.Uint64("streamID", streamToDelete.id), zap.Duration("sinceLastTime", time.Since(lastTime))) - time.Sleep(streamDeleteInterval - time.Since(lastTime)) + time.Sleep(streamAlterInterval - time.Since(lastTime)) } } s.client.grpcPool.ReleaseConn(streamToDelete.conn, streamToDelete.addr) delete(s.storeStreamsCache.m, streamToDelete.addr) - s.storeStreamsCache.lastDeleteTime[streamToDelete.addr] = time.Now() + s.storeStreamsCache.lastAlterTime[streamToDelete.addr] = time.Now() } streamToDelete.cancel() From 255e7dfe4f2089c41b3ddb461ff061e4c6020420 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 30 Jan 2024 12:31:17 +0800 Subject: [PATCH 12/14] fix error --- cdc/kv/client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 665d8caab6f..02e8ee89e44 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -1438,6 +1438,7 @@ func (s *eventFeedSession) addStream(stream *eventFeedStream) { } } s.storeStreamsCache.m[stream.addr] = stream + s.storeStreamsCache.lastAlterTime[stream.addr] = time.Now() } // deleteStream deletes a stream from the session.streams. From 19612fbbb3c2ec5d62af6a54a9735a7a1a07b451 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 31 Jan 2024 14:02:59 +0800 Subject: [PATCH 13/14] fix ut error --- cdc/kv/client.go | 57 ++++++++++++++++++++++++++----------------- cdc/kv/client_test.go | 7 +++--- 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 02e8ee89e44..0d7849c5564 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -78,7 +78,7 @@ const ( tableMonitorInterval = 2 * time.Second - streamAlterInterval = 2 * time.Second + streamAlterInterval = 1 * time.Second ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -1447,37 +1447,48 @@ func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { s.storeStreamsCache.Lock() defer s.storeStreamsCache.Unlock() - if streamInMap, ok := s.storeStreamsCache.m[streamToDelete.addr]; ok { - if streamInMap.id != streamToDelete.id { - log.Warn("delete stream failed, stream id mismatch, ignore it", + streamInMap, ok := s.storeStreamsCache.m[streamToDelete.addr] + if !ok { + log.Warn("delete stream failed, stream not found, ignore it", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("streamID", streamToDelete.id), + zap.Uint64("streamIDInMap", streamInMap.id)) + return + } + + if streamInMap.id != streamToDelete.id { + log.Warn("delete stream failed, stream id mismatch, ignore it", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("streamID", streamToDelete.id), + zap.Uint64("streamIDInMap", streamInMap.id)) + return + } + + if lastTime, ok := s.storeStreamsCache.lastAlterTime[streamToDelete.addr]; ok { + if time.Since(lastTime) < streamAlterInterval { + log.Warn( + "delete a stream of a same store too frequently, wait a while and try again", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), zap.Uint64("streamID", streamToDelete.id), - zap.Uint64("streamIDInMap", streamInMap.id)) - return - } - - if lastTime, ok := s.storeStreamsCache.lastAlterTime[streamToDelete.addr]; ok { - if time.Since(lastTime) < streamAlterInterval { - log.Warn( - "delete a stream of a same store too frequently, wait a while and try again", - zap.String("namespace", s.changefeed.Namespace), - zap.String("changefeed", s.changefeed.ID), - zap.Int64("tableID", s.tableID), - zap.String("tableName", s.tableName), - zap.Uint64("streamID", streamToDelete.id), - zap.Duration("sinceLastTime", time.Since(lastTime))) - time.Sleep(streamAlterInterval - time.Since(lastTime)) - } + zap.Duration("sinceLastTime", time.Since(lastTime))) + time.Sleep(streamAlterInterval - time.Since(lastTime)) } - s.client.grpcPool.ReleaseConn(streamToDelete.conn, streamToDelete.addr) - delete(s.storeStreamsCache.m, streamToDelete.addr) - s.storeStreamsCache.lastAlterTime[streamToDelete.addr] = time.Now() } + s.client.grpcPool.ReleaseConn(streamToDelete.conn, streamToDelete.addr) + delete(s.storeStreamsCache.m, streamToDelete.addr) streamToDelete.cancel() + s.storeStreamsCache.lastAlterTime[streamToDelete.addr] = time.Now() + log.Info("A stream to store has been removed", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 5651104e16f..201afe2ec42 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -286,7 +286,7 @@ func waitRequestID(t *testing.T, allocatedID uint64) { return nil } return errors.Errorf("request id %d is not larger than %d", getCurrentRequestID(), allocatedID) - }, retry.WithBackoffBaseDelay(10), retry.WithMaxTries(20)) + }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(100)) require.Nil(t, err) } @@ -1502,6 +1502,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { + cancel() close(ch1) server1.Stop() wg.Wait() @@ -1547,7 +1548,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { } return errors.Errorf("request is not received, requestID: %d, expected: %d", atomic.LoadUint64(&requestID), getCurrentRequestID()) - }, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(10)) + }, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(30)) require.Nil(t, err) initialized1 := mockInitializedEvent(regionID, getCurrentRequestID()) @@ -1601,7 +1602,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { } return errors.Errorf("request is not received, requestID: %d, expected: %d", atomic.LoadUint64(&requestID), getCurrentRequestID()) - }, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(10)) + }, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(30)) require.Nil(t, err) initialized2 := mockInitializedEvent(regionID, getCurrentRequestID()) From 6aad55e28dc7c9bd074bbaaaa4bb3f404499724a Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 31 Jan 2024 14:48:15 +0800 Subject: [PATCH 14/14] fix ut error 2 --- cdc/kv/client.go | 6 ++++-- cdc/kv/client_test.go | 5 ++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 0d7849c5564..f15ac1f2510 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -77,13 +77,15 @@ const ( scanRegionsConcurrency = 1024 tableMonitorInterval = 2 * time.Second - - streamAlterInterval = 1 * time.Second ) // time interval to force kv client to terminate gRPC stream and reconnect var reconnectInterval = 60 * time.Minute +// streamAlterInterval is the interval to limit the frequency of creating/deleting streams. +// Make it a variable so that we can change it in unit test. +var streamAlterInterval = 1 * time.Second + type regionStatefulEvent struct { changeEvent *cdcpb.Event resolvedTsEvent *resolvedTsEvent diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 201afe2ec42..5f048e2621f 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -57,6 +57,7 @@ func Test(t *testing.T) { conf := config.GetDefaultServerConfig() config.StoreGlobalServerConfig(conf) InitWorkerPool() + streamAlterInterval = 1 * time.Microsecond go func() { RunWorkerPool(context.Background()) //nolint:errcheck }() @@ -1364,6 +1365,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { + cancel() close(ch1) server1.Stop() wg.Wait() @@ -1414,7 +1416,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { return nil } return errors.New("message is not sent") - }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) + }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(30)) require.Nil(t, err) @@ -2076,6 +2078,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { + cancel() close(ch1) server1.Stop() wg.Wait()