Skip to content

Commit

Permalink
puller: fix retry logic when check store version failed (#11903) (#11932
Browse files Browse the repository at this point in the history
)

close #11766
  • Loading branch information
ti-chi-bot authored Dec 24, 2024
1 parent 201a10d commit 3b5f615
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 16 deletions.
12 changes: 12 additions & 0 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ var (
metricFeedDuplicateRequestCounter = eventFeedErrorCounter.WithLabelValues("DuplicateRequest")
metricFeedUnknownErrorCounter = eventFeedErrorCounter.WithLabelValues("Unknown")
metricFeedRPCCtxUnavailable = eventFeedErrorCounter.WithLabelValues("RPCCtxUnavailable")
metricGetStoreErr = eventFeedErrorCounter.WithLabelValues("GetStoreErr")
metricStoreSendRequestErr = eventFeedErrorCounter.WithLabelValues("SendRequestToStore")
metricKvIsBusyCounter = eventFeedErrorCounter.WithLabelValues("KvIsBusy")
metricKvCongestedCounter = eventFeedErrorCounter.WithLabelValues("KvCongested")
Expand All @@ -108,6 +109,10 @@ func (e *rpcCtxUnavailableErr) Error() string {
e.verID.GetID(), e.verID.GetVer(), e.verID.GetConfVer())
}

type getStoreErr struct{}

func (e *getStoreErr) Error() string { return "get store error" }

type sendRequestToStoreErr struct{}

func (e *sendRequestToStoreErr) Error() string { return "send request to store error" }
Expand Down Expand Up @@ -739,6 +744,13 @@ func (s *SharedClient) doHandleError(ctx context.Context, errInfo regionErrorInf
metricFeedRPCCtxUnavailable.Inc()
s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedTable)
return nil
case *getStoreErr:
metricGetStoreErr.Inc()
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
// cannot get the store the region belongs to, so we need to reload the region.
s.regionCache.OnSendFail(bo, errInfo.rpcCtx, true, err)
s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedTable)
return nil
case *sendRequestToStoreErr:
metricStoreSendRequestErr.Inc()
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
Expand Down
100 changes: 100 additions & 0 deletions cdc/kv/shared_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/store/mockstore/mockcopr"
Expand Down Expand Up @@ -261,6 +262,105 @@ func TestConnectToOfflineOrFailedTiKV(t *testing.T) {
}
}

func TestGetStoreFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

events1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataServer(events1)
server1, addr1 := newMockService(ctx, t, srv1, wg)

rpcClient, cluster, pdClient, _ := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())

pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}

grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{}, nil)

regionCache := tikv.NewRegionCache(pdClient)

pdClock := pdutil.NewClock4Test()

kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
require.Nil(t, err)
lockResolver := txnutil.NewLockerResolver(kvStorage, model.ChangeFeedID{})

invalidStore1 := "localhost:1"
invalidStore2 := "localhost:2"
cluster.AddStore(1, addr1)
cluster.AddStore(2, invalidStore1)
cluster.AddStore(3, invalidStore2)
cluster.Bootstrap(11, []uint64{1, 2, 3}, []uint64{4, 5, 6}, 4)

client := NewSharedClient(
model.ChangeFeedID{ID: "test"},
&config.ServerConfig{
KVClient: &config.KVClientConfig{
WorkerConcurrent: 1,
GrpcStreamConcurrent: 1,
AdvanceIntervalInMs: 10,
},
Debug: &config.DebugConfig{Puller: &config.PullerConfig{LogRegionDetails: false}},
},
false, pdClient, grpcPool, regionCache, pdClock, lockResolver,
)

defer func() {
cancel()
client.Close()
_ = kvStorage.Close()
regionCache.Close()
pdClient.Close()
srv1.wg.Wait()
server1.Stop()
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()
err := client.Run(ctx)
require.Equal(t, context.Canceled, errors.Cause(err))
}()

failpoint.Enable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed", `return(true)`)
subID := client.AllocSubscriptionID()
span := tablepb.Span{TableID: 1, StartKey: []byte("a"), EndKey: []byte("b")}
eventCh := make(chan MultiplexingEvent, 50)
client.Subscribe(subID, span, 1, eventCh)

makeTsEvent := func(regionID, ts, requestID uint64) *cdcpb.ChangeDataEvent {
return &cdcpb.ChangeDataEvent{
Events: []*cdcpb.Event{
{
RegionId: regionID,
RequestId: requestID,
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: ts},
},
},
}
}

checkTsEvent := func(event model.RegionFeedEvent, ts uint64) {
require.Equal(t, ts, event.Resolved.ResolvedTs)
}

events1 <- mockInitializedEvent(11, uint64(subID))
ts := oracle.GoTimeToTS(pdClock.CurrentTime())
events1 <- makeTsEvent(11, ts, uint64(subID))
select {
case <-eventCh:
require.True(t, false, "should not get event when get store failed")
case <-time.After(5 * time.Second):
}
failpoint.Disable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed")
select {
case event := <-eventCh:
checkTsEvent(event.RegionFeedEvent, ts)
case <-time.After(5 * time.Second):
require.True(t, false, "reconnection not succeed in 5 second")
}
}

type mockChangeDataServer struct {
ch chan *cdcpb.ChangeDataEvent
wg sync.WaitGroup
Expand Down
43 changes: 27 additions & 16 deletions cdc/kv/shared_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/kv/sharedconn"
"github.com/pingcap/tiflow/pkg/chann"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
"go.uber.org/zap"
Expand Down Expand Up @@ -90,12 +91,31 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
if err := waitForPreFetching(); err != nil {
return err
}
if canceled := stream.run(ctx, c, r); canceled {
return nil
var regionErr error
if err := version.CheckStoreVersion(ctx, c.pd, r.storeID); err != nil {
log.Info("event feed check store version fails",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", stream.streamID),
zap.Uint64("storeID", r.storeID),
zap.String("addr", r.storeAddr),
zap.Error(err))
if errors.Cause(err) == context.Canceled {
return nil
} else if cerrors.Is(err, cerrors.ErrGetAllStoresFailed) {
regionErr = &getStoreErr{}
} else {
regionErr = &sendRequestToStoreErr{}
}
} else {
if canceled := stream.run(ctx, c, r); canceled {
return nil
}
regionErr = &sendRequestToStoreErr{}
}
for _, m := range stream.clearStates() {
for _, state := range m {
state.markStopped(&sendRequestToStoreErr{})
state.markStopped(regionErr)
sfEvent := newEventItem(nil, state, stream)
slot := hashRegionID(state.region.verID.GetID(), len(c.workers))
_ = c.workers[slot].sendEvent(ctx, sfEvent)
Expand All @@ -108,7 +128,7 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
// It means it's a special task for stopping the table.
continue
}
c.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{}))
c.onRegionFail(newRegionErrorInfo(region, regionErr))
}
if err := util.Hang(ctx, time.Second); err != nil {
return err
Expand All @@ -135,17 +155,6 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste
}
}

if err := version.CheckStoreVersion(ctx, c.pd, rs.storeID); err != nil {
log.Info("event feed check store version fails",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Error(err))
return isCanceled()
}

log.Info("event feed going to create grpc stream",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
Expand Down Expand Up @@ -310,7 +319,9 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request
if s.multiplexing != nil {
req := &cdcpb.ChangeDataRequest{
RequestId: uint64(subscriptionID),
Request: &cdcpb.ChangeDataRequest_Deregister_{},
Request: &cdcpb.ChangeDataRequest_Deregister_{
Deregister: &cdcpb.ChangeDataRequest_Deregister{},
},
}
if err = s.multiplexing.Client().Send(req); err != nil {
log.Warn("event feed send deregister request to grpc stream failed",
Expand Down
4 changes: 4 additions & 0 deletions pkg/version/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/util/engine"
Expand Down Expand Up @@ -199,6 +200,9 @@ func checkPDVersion(ctx context.Context, pdAddr string, credential *security.Cre
// CheckStoreVersion checks whether the given TiKV is compatible with this CDC.
// If storeID is 0, it checks all TiKV.
func CheckStoreVersion(ctx context.Context, client pd.Client, storeID uint64) error {
failpoint.Inject("GetStoreFailed", func() {
failpoint.Return(cerror.WrapError(cerror.ErrGetAllStoresFailed, fmt.Errorf("unknown store %d", storeID)))
})
var stores []*metapb.Store
var err error
if storeID == 0 {
Expand Down

0 comments on commit 3b5f615

Please sign in to comment.