diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 5440b7b0625..b8da2f88198 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -1109,16 +1109,14 @@ func (s *eventFeedSession) receiveFromStream( for { cevent, err := stream.Recv() - failpoint.Inject("kvClientStreamRecvError", func() { - err = errors.New("injected stream recv error") - }) - // TODO: Should we have better way to handle the errors? - if err == io.EOF { - for _, state := range regionStates { - close(state.regionEventCh) + failpoint.Inject("kvClientStreamRecvError", func(msg failpoint.Value) { + errStr := msg.(string) + if errStr == io.EOF.Error() { + err = io.EOF + } else { + err = errors.New(errStr) } - return nil - } + }) if err != nil { if status.Code(errors.Cause(err)) == codes.Canceled { log.Debug( @@ -1406,12 +1404,8 @@ func (s *eventFeedSession) singleEventFeed( continue case event, ok = <-receiverCh: } - if !ok { - log.Debug("singleEventFeed receiver closed") - return lastResolvedTs, nil - } - if event == nil { + if !ok || event == nil { log.Debug("singleEventFeed closed by error") return lastResolvedTs, cerror.ErrEventFeedAborted.GenWithStackByArgs() } diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index c2dd0510f61..073844cb589 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -1193,11 +1193,7 @@ func (s *etcdSuite) TestStreamSendWithError(c *check.C) { cancel() } -// TestStreamSendWithError mainly tests the scenario that the `Recv` call of a gPRC -// stream in kv client meets error, and kv client logs the error and re-establish -// new request -func (s *etcdSuite) TestStreamRecvWithError(c *check.C) { - defer testleak.AfterTest(c)() +func (s *etcdSuite) testStreamRecvWithError(c *check.C, failpointStr string) { defer s.TearDownTest(c) ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} @@ -1226,7 +1222,7 @@ func (s *etcdSuite) TestStreamRecvWithError(c *check.C) { cluster.AddStore(1, addr1) cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) - err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(true)") + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", failpointStr) c.Assert(err, check.IsNil) defer func() { _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError") @@ -1316,6 +1312,42 @@ func (s *etcdSuite) TestStreamRecvWithError(c *check.C) { cancel() } +// TestStreamSendWithErrorNormal mainly tests the scenario that the `Recv` call +// of a gPRC stream in kv client meets a **logical related** error, and kv client +// logs the error and re-establish new request. +func (s *etcdSuite) TestStreamRecvWithErrorNormal(c *check.C) { + defer testleak.AfterTest(c)() + + // test client v2 + s.testStreamRecvWithError(c, "1*return(\"injected stream recv error\")") + + // test client v1 + clientv2 := enableKVClientV2 + enableKVClientV2 = false + defer func() { + enableKVClientV2 = clientv2 + }() + s.testStreamRecvWithError(c, "1*return(\"injected stream recv error\")") +} + +// TestStreamSendWithErrorIOEOF mainly tests the scenario that the `Recv` call +// of a gPRC stream in kv client meets error io.EOF, and kv client logs the error +// and re-establish new request +func (s *etcdSuite) TestStreamRecvWithErrorIOEOF(c *check.C) { + defer testleak.AfterTest(c)() + + // test client v2 + s.testStreamRecvWithError(c, "1*return(\"EOF\")") + + // test client v1 + clientv2 := enableKVClientV2 + enableKVClientV2 = false + defer func() { + enableKVClientV2 = clientv2 + }() + s.testStreamRecvWithError(c, "1*return(\"EOF\")") +} + // TestIncompatibleTiKV tests TiCDC new request to TiKV meets `ErrVersionIncompatible` // error (in fact this error is raised before EventFeed API is really called), // TiCDC will wait 20s and then retry. This is a common scenario when rolling @@ -2351,8 +2383,8 @@ func (s *etcdSuite) TestFailRegionReentrant(c *check.C) { err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientRegionReentrantErrorDelay", "sleep(500)") c.Assert(err, check.IsNil) defer func() { - _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError") - _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvErrorDelay") + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientRegionReentrantError") + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientRegionReentrantErrorDelay") }() baseAllocatedID := currentRequestID() lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage)) @@ -2434,7 +2466,7 @@ func (s *etcdSuite) TestClientV1UnlockRangeReentrant(c *check.C) { cluster.Bootstrap(regionID3, []uint64{1}, []uint64{4}, 4) cluster.SplitRaw(regionID3, regionID4, []byte("b"), []uint64{5}, 5) - err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(true)") + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(\"injected stream recv error\")") c.Assert(err, check.IsNil) err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientPendingRegionDelay", "1*sleep(0)->1*sleep(2000)") c.Assert(err, check.IsNil) diff --git a/cdc/kv/client_v2.go b/cdc/kv/client_v2.go index 625ff1ce8db..0e46abc057f 100644 --- a/cdc/kv/client_v2.go +++ b/cdc/kv/client_v2.go @@ -210,13 +210,14 @@ func (s *eventFeedSession) receiveFromStreamV2( worker.inputCh <- nil } }) - failpoint.Inject("kvClientStreamRecvError", func() { - err = errors.New("injected stream recv error") + failpoint.Inject("kvClientStreamRecvError", func(msg failpoint.Value) { + errStr := msg.(string) + if errStr == io.EOF.Error() { + err = io.EOF + } else { + err = errors.New(errStr) + } }) - if err == io.EOF { - close(worker.inputCh) - return nil - } if err != nil { if status.Code(errors.Cause(err)) == codes.Canceled { log.Debug( diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 76176862dd6..6309ee376a2 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -267,13 +267,9 @@ func (w *regionWorker) eventHandler(ctx context.Context) error { case <-ctx.Done(): return errors.Trace(ctx.Err()) case event, ok := <-w.inputCh: - if !ok { - log.Debug("region worker receiver closed") - return nil - } // event == nil means the region worker should exit and re-establish // all existing regions. - if event == nil { + if !ok || event == nil { log.Info("region worker closed by error") return w.evictAllRegions(ctx) }