From 4bd0a00a2d3e8cd7a819b05a8d6c3b4c69d79a5d Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 28 Jan 2021 15:44:31 +0800 Subject: [PATCH 1/2] cherry pick #1336 to release-4.0 Signed-off-by: ti-srebot --- cdc/kv/client.go | 44 ++++++++++- cdc/kv/client_test.go | 167 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 201 insertions(+), 10 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index ebbefd7b6f4..c4df1d466e1 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -466,6 +466,9 @@ type eventFeedSession struct { regionChSizeGauge prometheus.Gauge errChSizeGauge prometheus.Gauge rangeChSizeGauge prometheus.Gauge + + streams map[string]cdcpb.ChangeData_EventFeedClient + streamsLock sync.RWMutex } type rangeRequestTask struct { @@ -502,6 +505,7 @@ func newEventFeedSession( regionChSizeGauge: clientChannelSize.WithLabelValues(id, "region"), errChSizeGauge: clientChannelSize.WithLabelValues(id, "err"), rangeChSizeGauge: clientChannelSize.WithLabelValues(id, "range"), + streams: make(map[string]cdcpb.ChangeData_EventFeedClient), } } @@ -633,7 +637,6 @@ func (s *eventFeedSession) dispatchRequest( ctx context.Context, g *errgroup.Group, ) error { - streams := make(map[string]cdcpb.ChangeData_EventFeedClient) // Stores pending regions info for each stream. After sending a new request, the region info wil be put to the map, // and it will be loaded by the receiver thread when it receives the first response from that region. We need this // to pass the region info to the receiver since the region info cannot be inferred from the response from TiKV. @@ -716,7 +719,7 @@ MainLoop: state := newRegionFeedState(sri, requestID) pendingRegions.insert(requestID, state) - stream, ok := streams[rpcCtx.Addr] + stream, ok := s.getStream(rpcCtx.Addr) // Establish the stream if it has not been connected yet. if !ok { storeID := rpcCtx.Peer.GetStoreId() @@ -747,7 +750,7 @@ MainLoop: pendingRegions.take(requestID) continue } - streams[rpcCtx.Addr] = stream + s.addStream(rpcCtx.Addr, stream) limiter := s.client.getRegionLimiter(regionID) g.Go(func() error { @@ -779,7 +782,7 @@ MainLoop: } // Delete the stream from the map so that the next time the store is accessed, the stream will be // re-established. - delete(streams, rpcCtx.Addr) + s.deleteStream(rpcCtx.Addr) // Delete `pendingRegions` from `storePendingRegions` so that the next time a region of this store is // requested, it will create a new one. So if the `receiveFromStream` goroutine tries to stop all // pending regions, the new pending regions that are requested after reconnecting won't be stopped @@ -1072,6 +1075,9 @@ func (s *eventFeedSession) receiveFromStream( for { cevent, err := stream.Recv() + failpoint.Inject("kvClientStreamRecvError", func() { + err = errors.New("injected stream recv error") + }) // TODO: Should we have better way to handle the errors? if err == io.EOF { for _, state := range regionStates { @@ -1095,6 +1101,17 @@ func (s *eventFeedSession) receiveFromStream( ) } + // Use the same delay mechanism as `stream.Send` error handling, since + // these two errors often mean upstream store suffers an accident, which + // needs time to recover, kv client doesn't need to retry frequently. + // TODO: add a better retry backoff or rate limitter + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + + // TODO: better to closes the send direction of the stream to notify + // the other side, but it is not safe to call CloseSend concurrently + // with SendMsg, in future refactor we should refine the recv loop + s.deleteStream(addr) + for _, state := range regionStates { select { case state.regionEventCh <- nil: @@ -1470,6 +1487,25 @@ func (s *eventFeedSession) singleEventFeed( } } +func (s *eventFeedSession) addStream(storeAddr string, stream cdcpb.ChangeData_EventFeedClient) { + s.streamsLock.Lock() + defer s.streamsLock.Unlock() + s.streams[storeAddr] = stream +} + +func (s *eventFeedSession) deleteStream(storeAddr string) { + s.streamsLock.Lock() + defer s.streamsLock.Unlock() + delete(s.streams, storeAddr) +} + +func (s *eventFeedSession) getStream(storeAddr string) (stream cdcpb.ChangeData_EventFeedClient, ok bool) { + s.streamsLock.RLock() + defer s.streamsLock.RUnlock() + stream, ok = s.streams[storeAddr] + return +} + func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row, enableOldValue bool) (*model.RegionFeedEvent, error) { var opType model.OpType switch entry.GetOpType() { diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index b265286bde6..18cfc273dc4 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -204,9 +204,11 @@ func mockInitializedEvent(regionID, requestID uint64) *cdcpb.ChangeDataEvent { } type mockChangeDataService struct { - c *check.C - ch chan *cdcpb.ChangeDataEvent - recvLoop func(server cdcpb.ChangeData_EventFeedServer) + c *check.C + ch chan *cdcpb.ChangeDataEvent + recvLoop func(server cdcpb.ChangeData_EventFeedServer) + exitNotify sync.Map + eventFeedID uint64 } func newMockChangeDataService(c *check.C, ch chan *cdcpb.ChangeDataEvent) *mockChangeDataService { @@ -217,16 +219,50 @@ func newMockChangeDataService(c *check.C, ch chan *cdcpb.ChangeDataEvent) *mockC return s } +type notifyCh struct { + notify chan struct{} + callback chan struct{} +} + +func (s *mockChangeDataService) registerExitNotify(id uint64, ch *notifyCh) { + s.exitNotify.Store(id, ch) +} + +func (s *mockChangeDataService) notifyExit(id uint64) chan struct{} { + if ch, ok := s.exitNotify.Load(id); ok { + nch := ch.(*notifyCh) + nch.notify <- struct{}{} + return nch.callback + } + return nil +} + func (s *mockChangeDataService) EventFeed(server cdcpb.ChangeData_EventFeedServer) error { if s.recvLoop != nil { go func() { s.recvLoop(server) }() } - for e := range s.ch { - err := server.Send(e) - s.c.Assert(err, check.IsNil) + notify := ¬ifyCh{ + notify: make(chan struct{}), + callback: make(chan struct{}, 1), // callback is not always retrieved } + s.registerExitNotify(atomic.LoadUint64(&s.eventFeedID), notify) + atomic.AddUint64(&s.eventFeedID, 1) +loop: + for { + select { + case e := <-s.ch: + if e == nil { + break loop + } + err := server.Send(e) + s.c.Assert(err, check.IsNil) + case <-notify.notify: + break loop + } + } + notify.callback <- struct{}{} return nil } @@ -1194,6 +1230,125 @@ func (s *etcdSuite) TestStreamSendWithError(c *check.C) { cancel() } +// TestStreamSendWithError mainly tests the scenario that the `Recv` call of a gPRC +// stream in kv client meets error, and kv client logs the error and re-establish +// new request +func (s *etcdSuite) TestStreamRecvWithError(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + + ch1 := make(chan *cdcpb.ChangeDataEvent, 10) + srv1 := newMockChangeDataService(c, ch1) + server1, addr1 := newMockService(ctx, c, srv1, wg) + + defer func() { + close(ch1) + server1.Stop() + wg.Wait() + }() + + rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("") + c.Assert(err, check.IsNil) + pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} + kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + c.Assert(err, check.IsNil) + defer kvStorage.Close() //nolint:errcheck + + regionID := uint64(3) + cluster.AddStore(1, addr1) + cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) + + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(true)") + c.Assert(err, check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError") + }() + baseAllocatedID := currentRequestID() + lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage)) + isPullInit := &mockPullerInit{} + cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{}) + eventCh := make(chan *model.RegionFeedEvent, 10) + wg.Add(1) + go func() { + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + cdcClient.Close() //nolint:errcheck + wg.Done() + }() + + // wait request id allocated with: new session, new request + waitRequestID(c, baseAllocatedID+1) + initialized1 := mockInitializedEvent(regionID, currentRequestID()) + ch1 <- initialized1 + err = retry.Run(time.Millisecond*200, 10, func() error { + if len(ch1) == 0 { + return nil + } + return errors.New("message is not sent") + }) + c.Assert(err, check.IsNil) + + // another stream will be established, so we notify and wait the first + // EventFeed loop exits. + callback := srv1.notifyExit(0) + select { + case <-callback: + case <-time.After(time.Second * 3): + c.Error("event feed loop can't exit") + } + + // wait request id allocated with: new session, new request*2 + waitRequestID(c, baseAllocatedID+2) + initialized2 := mockInitializedEvent(regionID, currentRequestID()) + ch1 <- initialized2 + + resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ + { + RegionId: regionID, + RequestId: currentRequestID(), + Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120}, + }, + }} + ch1 <- resolved + ch1 <- resolved + + expected := []*model.RegionFeedEvent{ + { + Resolved: &model.ResolvedSpan{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + ResolvedTs: 100, + }, + RegionID: regionID, + }, + { + Resolved: &model.ResolvedSpan{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + ResolvedTs: 120, + }, + RegionID: regionID, + }, + { + Resolved: &model.ResolvedSpan{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + ResolvedTs: 120, + }, + RegionID: regionID, + }, + } + + for _, expectedEv := range expected { + select { + case event := <-eventCh: + c.Assert(event, check.DeepEquals, expectedEv) + case <-time.After(time.Second): + c.Errorf("expected event %v not received", expectedEv) + } + } + cancel() +} + // TestIncompatibleTiKV tests TiCDC new request to TiKV meets `ErrVersionIncompatible` // error (in fact this error is raised before EventFeed API is really called), // TiCDC will wait 20s and then retry. This is a common scenario when rolling From 0a4b6e63edb8e07512885c68920e94cc9345eb72 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 28 Jan 2021 15:59:46 +0800 Subject: [PATCH 2/2] fix 4.0 api --- cdc/kv/client_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 18cfc273dc4..9f8630a167d 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -1249,7 +1249,9 @@ func (s *etcdSuite) TestStreamRecvWithError(c *check.C) { wg.Wait() }() - rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("") + cluster := mocktikv.NewCluster() + mvccStore := mocktikv.MustNewMVCCStore() + rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "") c.Assert(err, check.IsNil) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)