diff --git a/cdc/kv/client.go b/cdc/kv/client.go index a58403d3067..6a4ace59ab4 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -1105,16 +1105,14 @@ func (s *eventFeedSession) receiveFromStream( for { cevent, err := stream.Recv() - failpoint.Inject("kvClientStreamRecvError", func() { - err = errors.New("injected stream recv error") - }) - // TODO: Should we have better way to handle the errors? - if err == io.EOF { - for _, state := range regionStates { - close(state.regionEventCh) + failpoint.Inject("kvClientStreamRecvError", func(msg failpoint.Value) { + errStr := msg.(string) + if errStr == io.EOF.Error() { + err = io.EOF + } else { + err = errors.New(errStr) } - return nil - } + }) if err != nil { if status.Code(errors.Cause(err)) == codes.Canceled { log.Debug( @@ -1402,12 +1400,8 @@ func (s *eventFeedSession) singleEventFeed( continue case event, ok = <-receiverCh: } - if !ok { - log.Debug("singleEventFeed receiver closed") - return lastResolvedTs, nil - } - if event == nil { + if !ok || event == nil { log.Debug("singleEventFeed closed by error") return lastResolvedTs, cerror.ErrEventFeedAborted.GenWithStackByArgs() } diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index e82d9581bbb..fd99c17e251 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -1193,11 +1193,7 @@ func (s *etcdSuite) TestStreamSendWithError(c *check.C) { cancel() } -// TestStreamSendWithError mainly tests the scenario that the `Recv` call of a gPRC -// stream in kv client meets error, and kv client logs the error and re-establish -// new request -func (s *etcdSuite) TestStreamRecvWithError(c *check.C) { - defer testleak.AfterTest(c)() +func (s *etcdSuite) testStreamRecvWithError(c *check.C, failpointStr string) { defer s.TearDownTest(c) ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} @@ -1226,7 +1222,7 @@ func (s *etcdSuite) TestStreamRecvWithError(c *check.C) { cluster.AddStore(1, addr1) cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) - err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(true)") + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", failpointStr) c.Assert(err, check.IsNil) defer func() { _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError") @@ -1316,6 +1312,42 @@ func (s *etcdSuite) TestStreamRecvWithError(c *check.C) { cancel() } +// TestStreamSendWithErrorNormal mainly tests the scenario that the `Recv` call +// of a gPRC stream in kv client meets a **logical related** error, and kv client +// logs the error and re-establish new request. +func (s *etcdSuite) TestStreamRecvWithErrorNormal(c *check.C) { + defer testleak.AfterTest(c)() + + // test client v2 + s.testStreamRecvWithError(c, "1*return(\"injected stream recv error\")") + + // test client v1 + clientv2 := enableKVClientV2 + enableKVClientV2 = false + defer func() { + enableKVClientV2 = clientv2 + }() + s.testStreamRecvWithError(c, "1*return(\"injected stream recv error\")") +} + +// TestStreamSendWithErrorIOEOF mainly tests the scenario that the `Recv` call +// of a gPRC stream in kv client meets error io.EOF, and kv client logs the error +// and re-establish new request +func (s *etcdSuite) TestStreamRecvWithErrorIOEOF(c *check.C) { + defer testleak.AfterTest(c)() + + // test client v2 + s.testStreamRecvWithError(c, "1*return(\"EOF\")") + + // test client v1 + clientv2 := enableKVClientV2 + enableKVClientV2 = false + defer func() { + enableKVClientV2 = clientv2 + }() + s.testStreamRecvWithError(c, "1*return(\"EOF\")") +} + // TestIncompatibleTiKV tests TiCDC new request to TiKV meets `ErrVersionIncompatible` // error (in fact this error is raised before EventFeed API is really called), // TiCDC will wait 20s and then retry. This is a common scenario when rolling @@ -2351,8 +2383,8 @@ func (s *etcdSuite) TestFailRegionReentrant(c *check.C) { err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientRegionReentrantErrorDelay", "sleep(500)") c.Assert(err, check.IsNil) defer func() { - _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError") - _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvErrorDelay") + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientRegionReentrantError") + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientRegionReentrantErrorDelay") }() baseAllocatedID := currentRequestID() lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage)) @@ -2387,3 +2419,80 @@ func (s *etcdSuite) TestFailRegionReentrant(c *check.C) { time.Sleep(time.Second) cancel() } +<<<<<<< HEAD +======= + +// TestClientV1UnlockRangeReentrant tests clientV1 can handle region reconnection +// with unstable TiKV store correctly. The test workflow is as follows: +// 1. kv client establishes two regions request, naming region-1, region-2, they +// belong to the same TiKV store. +// 2. The region-1 is firstly established, yet region-2 has some delay after its +// region state is inserted into `pendingRegions` +// 3. At this time the TiKV store crashes and `stream.Recv` returns error. In the +// defer function of `receiveFromStream`, all pending regions will be cleaned +// up, which means the region lock will be unlocked once for these regions. +// 4. In step-2, the region-2 continues to run, it can't get store stream which +// has been deleted in step-3, so it will create new stream but fails because +// of unstable TiKV store, at this point, the kv client should handle with the +// pending region correctly. +func (s *etcdSuite) TestClientV1UnlockRangeReentrant(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + + clientv2 := enableKVClientV2 + enableKVClientV2 = false + defer func() { + enableKVClientV2 = clientv2 + }() + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + + ch1 := make(chan *cdcpb.ChangeDataEvent, 10) + srv1 := newMockChangeDataService(c, ch1) + server1, addr1 := newMockService(ctx, c, srv1, wg) + + rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("") + c.Assert(err, check.IsNil) + pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} + tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + c.Assert(err, check.IsNil) + kvStorage := newStorageWithCurVersionCache(tiStore, addr1) + defer kvStorage.Close() //nolint:errcheck + + regionID3 := uint64(3) + regionID4 := uint64(4) + cluster.AddStore(1, addr1) + cluster.Bootstrap(regionID3, []uint64{1}, []uint64{4}, 4) + cluster.SplitRaw(regionID3, regionID4, []byte("b"), []uint64{5}, 5) + + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(\"injected stream recv error\")") + c.Assert(err, check.IsNil) + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientPendingRegionDelay", "1*sleep(0)->1*sleep(2000)") + c.Assert(err, check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError") + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientPendingRegionDelay") + }() + lockresolver := txnutil.NewLockerResolver(kvStorage) + isPullInit := &mockPullerInit{} + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{}) + eventCh := make(chan *model.RegionFeedEvent, 10) + wg.Add(1) + go func() { + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + cdcClient.Close() //nolint:errcheck + wg.Done() + }() + + // wait the second region is scheduled + time.Sleep(time.Millisecond * 500) + close(ch1) + server1.Stop() + // wait the kvClientPendingRegionDelay ends, and the second region is processed + time.Sleep(time.Second * 2) + cancel() + wg.Wait() +} +>>>>>>> 000eb0f... kv/client: fix the error handling for io.EOF may miss region reconnect (#1641) diff --git a/cdc/kv/client_v2.go b/cdc/kv/client_v2.go index 625ff1ce8db..0e46abc057f 100644 --- a/cdc/kv/client_v2.go +++ b/cdc/kv/client_v2.go @@ -210,13 +210,14 @@ func (s *eventFeedSession) receiveFromStreamV2( worker.inputCh <- nil } }) - failpoint.Inject("kvClientStreamRecvError", func() { - err = errors.New("injected stream recv error") + failpoint.Inject("kvClientStreamRecvError", func(msg failpoint.Value) { + errStr := msg.(string) + if errStr == io.EOF.Error() { + err = io.EOF + } else { + err = errors.New(errStr) + } }) - if err == io.EOF { - close(worker.inputCh) - return nil - } if err != nil { if status.Code(errors.Cause(err)) == codes.Canceled { log.Debug( diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 76176862dd6..6309ee376a2 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -267,13 +267,9 @@ func (w *regionWorker) eventHandler(ctx context.Context) error { case <-ctx.Done(): return errors.Trace(ctx.Err()) case event, ok := <-w.inputCh: - if !ok { - log.Debug("region worker receiver closed") - return nil - } // event == nil means the region worker should exit and re-establish // all existing regions. - if event == nil { + if !ok || event == nil { log.Info("region worker closed by error") return w.evictAllRegions(ctx) }