Skip to content

Commit

Permalink
cherry pick pingcap#1641 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
amyangfei authored and ti-srebot committed Apr 13, 2021
1 parent a366f30 commit a8f14a0
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 33 deletions.
22 changes: 8 additions & 14 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1105,16 +1105,14 @@ func (s *eventFeedSession) receiveFromStream(
for {
cevent, err := stream.Recv()

failpoint.Inject("kvClientStreamRecvError", func() {
err = errors.New("injected stream recv error")
})
// TODO: Should we have better way to handle the errors?
if err == io.EOF {
for _, state := range regionStates {
close(state.regionEventCh)
failpoint.Inject("kvClientStreamRecvError", func(msg failpoint.Value) {
errStr := msg.(string)
if errStr == io.EOF.Error() {
err = io.EOF
} else {
err = errors.New(errStr)
}
return nil
}
})
if err != nil {
if status.Code(errors.Cause(err)) == codes.Canceled {
log.Debug(
Expand Down Expand Up @@ -1402,12 +1400,8 @@ func (s *eventFeedSession) singleEventFeed(
continue
case event, ok = <-receiverCh:
}
if !ok {
log.Debug("singleEventFeed receiver closed")
return lastResolvedTs, nil
}

if event == nil {
if !ok || event == nil {
log.Debug("singleEventFeed closed by error")
return lastResolvedTs, cerror.ErrEventFeedAborted.GenWithStackByArgs()
}
Expand Down
125 changes: 117 additions & 8 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,11 +1193,7 @@ func (s *etcdSuite) TestStreamSendWithError(c *check.C) {
cancel()
}

// TestStreamSendWithError mainly tests the scenario that the `Recv` call of a gPRC
// stream in kv client meets error, and kv client logs the error and re-establish
// new request
func (s *etcdSuite) TestStreamRecvWithError(c *check.C) {
defer testleak.AfterTest(c)()
func (s *etcdSuite) testStreamRecvWithError(c *check.C, failpointStr string) {
defer s.TearDownTest(c)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -1226,7 +1222,7 @@ func (s *etcdSuite) TestStreamRecvWithError(c *check.C) {
cluster.AddStore(1, addr1)
cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(true)")
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", failpointStr)
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError")
Expand Down Expand Up @@ -1316,6 +1312,42 @@ func (s *etcdSuite) TestStreamRecvWithError(c *check.C) {
cancel()
}

// TestStreamSendWithErrorNormal mainly tests the scenario that the `Recv` call
// of a gPRC stream in kv client meets a **logical related** error, and kv client
// logs the error and re-establish new request.
func (s *etcdSuite) TestStreamRecvWithErrorNormal(c *check.C) {
defer testleak.AfterTest(c)()

// test client v2
s.testStreamRecvWithError(c, "1*return(\"injected stream recv error\")")

// test client v1
clientv2 := enableKVClientV2
enableKVClientV2 = false
defer func() {
enableKVClientV2 = clientv2
}()
s.testStreamRecvWithError(c, "1*return(\"injected stream recv error\")")
}

// TestStreamSendWithErrorIOEOF mainly tests the scenario that the `Recv` call
// of a gPRC stream in kv client meets error io.EOF, and kv client logs the error
// and re-establish new request
func (s *etcdSuite) TestStreamRecvWithErrorIOEOF(c *check.C) {
defer testleak.AfterTest(c)()

// test client v2
s.testStreamRecvWithError(c, "1*return(\"EOF\")")

// test client v1
clientv2 := enableKVClientV2
enableKVClientV2 = false
defer func() {
enableKVClientV2 = clientv2
}()
s.testStreamRecvWithError(c, "1*return(\"EOF\")")
}

// TestIncompatibleTiKV tests TiCDC new request to TiKV meets `ErrVersionIncompatible`
// error (in fact this error is raised before EventFeed API is really called),
// TiCDC will wait 20s and then retry. This is a common scenario when rolling
Expand Down Expand Up @@ -2351,8 +2383,8 @@ func (s *etcdSuite) TestFailRegionReentrant(c *check.C) {
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientRegionReentrantErrorDelay", "sleep(500)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError")
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvErrorDelay")
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientRegionReentrantError")
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientRegionReentrantErrorDelay")
}()
baseAllocatedID := currentRequestID()
lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
Expand Down Expand Up @@ -2387,3 +2419,80 @@ func (s *etcdSuite) TestFailRegionReentrant(c *check.C) {
time.Sleep(time.Second)
cancel()
}
<<<<<<< HEAD
=======

// TestClientV1UnlockRangeReentrant tests clientV1 can handle region reconnection
// with unstable TiKV store correctly. The test workflow is as follows:
// 1. kv client establishes two regions request, naming region-1, region-2, they
// belong to the same TiKV store.
// 2. The region-1 is firstly established, yet region-2 has some delay after its
// region state is inserted into `pendingRegions`
// 3. At this time the TiKV store crashes and `stream.Recv` returns error. In the
// defer function of `receiveFromStream`, all pending regions will be cleaned
// up, which means the region lock will be unlocked once for these regions.
// 4. In step-2, the region-2 continues to run, it can't get store stream which
// has been deleted in step-3, so it will create new stream but fails because
// of unstable TiKV store, at this point, the kv client should handle with the
// pending region correctly.
func (s *etcdSuite) TestClientV1UnlockRangeReentrant(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)

clientv2 := enableKVClientV2
enableKVClientV2 = false
defer func() {
enableKVClientV2 = clientv2
}()

ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataService(c, ch1)
server1, addr1 := newMockService(ctx, c, srv1, wg)

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
kvStorage := newStorageWithCurVersionCache(tiStore, addr1)
defer kvStorage.Close() //nolint:errcheck

regionID3 := uint64(3)
regionID4 := uint64(4)
cluster.AddStore(1, addr1)
cluster.Bootstrap(regionID3, []uint64{1}, []uint64{4}, 4)
cluster.SplitRaw(regionID3, regionID4, []byte("b"), []uint64{5}, 5)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(\"injected stream recv error\")")
c.Assert(err, check.IsNil)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientPendingRegionDelay", "1*sleep(0)->1*sleep(2000)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError")
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientPendingRegionDelay")
}()
lockresolver := txnutil.NewLockerResolver(kvStorage)
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// wait the second region is scheduled
time.Sleep(time.Millisecond * 500)
close(ch1)
server1.Stop()
// wait the kvClientPendingRegionDelay ends, and the second region is processed
time.Sleep(time.Second * 2)
cancel()
wg.Wait()
}
>>>>>>> 000eb0f... kv/client: fix the error handling for io.EOF may miss region reconnect (#1641)
13 changes: 7 additions & 6 deletions cdc/kv/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,14 @@ func (s *eventFeedSession) receiveFromStreamV2(
worker.inputCh <- nil
}
})
failpoint.Inject("kvClientStreamRecvError", func() {
err = errors.New("injected stream recv error")
failpoint.Inject("kvClientStreamRecvError", func(msg failpoint.Value) {
errStr := msg.(string)
if errStr == io.EOF.Error() {
err = io.EOF
} else {
err = errors.New(errStr)
}
})
if err == io.EOF {
close(worker.inputCh)
return nil
}
if err != nil {
if status.Code(errors.Cause(err)) == codes.Canceled {
log.Debug(
Expand Down
6 changes: 1 addition & 5 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,9 @@ func (w *regionWorker) eventHandler(ctx context.Context) error {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case event, ok := <-w.inputCh:
if !ok {
log.Debug("region worker receiver closed")
return nil
}
// event == nil means the region worker should exit and re-establish
// all existing regions.
if event == nil {
if !ok || event == nil {
log.Info("region worker closed by error")
return w.evictAllRegions(ctx)
}
Expand Down

0 comments on commit a8f14a0

Please sign in to comment.