Skip to content

Commit

Permalink
owner, processor(cdc): fix reporting ErrPeerMessageClientClosed to us…
Browse files Browse the repository at this point in the history
…er erroneously (#3980)

* owner, processor(cdc): fix reporting ErrPeerMessageClientClosed to user erroneously

* p2p/client: fix unstable test

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
liuzix and ti-chi-bot authored Dec 21, 2021
1 parent 9ef831f commit 68db903
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 24 deletions.
62 changes: 41 additions & 21 deletions cdc/owner/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,26 +127,20 @@ func (s *schedulerV2) DispatchTable(
captureID model.CaptureID,
isDelete bool,
) (done bool, err error) {
client, ok := s.GetClient(ctx, captureID)
if !ok {
return false, nil
}

topic := model.DispatchTableTopic(changeFeedID)
message := &model.DispatchTableMessage{
OwnerRev: ctx.GlobalVars().OwnerRevision,
ID: tableID,
IsDelete: isDelete,
}

_, err = client.TrySendMessage(ctx, topic, message)
ok, err := s.trySendMessage(ctx, captureID, topic, message)
if err != nil {
if cerror.ErrPeerMessageSendTryAgain.Equal(err) {
log.Warn("scheduler: send message failed, retry later", zap.Error(err))
return false, nil
}
return false, errors.Trace(err)
}
if !ok {
return false, nil
}

s.stats.RecordDispatch()
log.Debug("send message successfully",
Expand All @@ -161,25 +155,19 @@ func (s *schedulerV2) Announce(
changeFeedID model.ChangeFeedID,
captureID model.CaptureID,
) (bool, error) {
client, ok := s.GetClient(ctx, captureID)
if !ok {
return false, nil
}

topic := model.AnnounceTopic(changeFeedID)
message := &model.AnnounceMessage{
OwnerRev: ctx.GlobalVars().OwnerRevision,
OwnerVersion: version.ReleaseSemver(),
}

_, err := client.TrySendMessage(ctx, topic, message)
ok, err := s.trySendMessage(ctx, captureID, topic, message)
if err != nil {
if cerror.ErrPeerMessageSendTryAgain.Equal(err) {
log.Warn("scheduler: send message failed, retry later", zap.Error(err))
return false, nil
}
return false, errors.Trace(err)
}
if !ok {
return false, nil
}

s.stats.RecordAnnounce()
log.Debug("send message successfully",
Expand All @@ -189,7 +177,7 @@ func (s *schedulerV2) Announce(
return true, nil
}

func (s *schedulerV2) GetClient(ctx context.Context, target model.CaptureID) (*p2p.MessageClient, bool) {
func (s *schedulerV2) getClient(target model.CaptureID) (*p2p.MessageClient, bool) {
client := s.messageRouter.GetClient(target)
if client == nil {
log.Warn("scheduler: no message client found, retry later",
Expand All @@ -199,6 +187,38 @@ func (s *schedulerV2) GetClient(ctx context.Context, target model.CaptureID) (*p
return client, true
}

func (s *schedulerV2) trySendMessage(
ctx context.Context,
target model.CaptureID,
topic p2p.Topic,
value interface{},
) (bool, error) {
// TODO (zixiong): abstract this function out together with the similar method in cdc/processor/agent.go
// We probably need more advanced logic to handle and mitigate complex failure situations.

client, ok := s.getClient(target)
if !ok {
return false, nil
}

_, err := client.TrySendMessage(ctx, topic, value)
if err != nil {
if cerror.ErrPeerMessageSendTryAgain.Equal(err) {
return false, nil
}
if cerror.ErrPeerMessageClientClosed.Equal(err) {
log.Warn("peer messaging client is closed while trying to send a message through it. "+
"Report a bug if this warning repeats",
zap.String("changefeed-id", s.changeFeedID),
zap.String("target", target))
return false, nil
}
return false, errors.Trace(err)
}

return true, nil
}

func (s *schedulerV2) Close(ctx context.Context) {
log.Debug("scheduler closed", zap.String("changefeed-id", s.changeFeedID))
s.deregisterPeerMessageHandlers(ctx)
Expand Down
5 changes: 5 additions & 0 deletions cdc/owner/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func TestSchedulerBasics(t *testing.T) {
_ = failpoint.Disable("github.com/pingcap/tiflow/pkg/p2p/ClientInjectSendMessageTryAgain")
}()

_ = failpoint.Enable("github.com/pingcap/tiflow/pkg/p2p/ClientInjectClosed", "5*return(true)")
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/pkg/p2p/ClientInjectClosed")
}()

stdCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

Expand Down
10 changes: 10 additions & 0 deletions cdc/processor/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ func (a *agentImpl) trySendMessage(
topic p2p.Topic,
value interface{},
) (bool, error) {
// TODO (zixiong): abstract this function out together with the similar method in cdc/owner/scheduler.go
// We probably need more advanced logic to handle and mitigate complex failure situations.

client := a.messageRouter.GetClient(target)
if client == nil {
a.printNoClientWarning(target)
Expand All @@ -299,6 +302,13 @@ func (a *agentImpl) trySendMessage(
if cerror.ErrPeerMessageSendTryAgain.Equal(err) {
return false, nil
}
if cerror.ErrPeerMessageClientClosed.Equal(err) {
log.Warn("peer messaging client is closed while trying to send a message through it. "+
"Report a bug if this warning repeats",
zap.String("changefeed-id", a.changeFeed),
zap.String("target", target))
return false, nil
}
return false, errors.Trace(err)
}

Expand Down
43 changes: 43 additions & 0 deletions cdc/processor/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tiflow/cdc/model"
pscheduler "github.com/pingcap/tiflow/cdc/scheduler"
cdcContext "github.com/pingcap/tiflow/pkg/context"
Expand Down Expand Up @@ -334,3 +335,45 @@ func TestAgentNoOwnerAtStartUp(t *testing.T) {
err = agent.Close()
require.NoError(t, err)
}

func TestAgentTolerateClientClosed(t *testing.T) {
suite := newAgentTestSuite(t)
defer suite.Close()

suite.etcdKVClient.On("Get", mock.Anything, etcd.CaptureOwnerKey, mock.Anything).Return(&clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Key: []byte(etcd.CaptureOwnerKey),
Value: []byte(ownerCaptureID),
ModRevision: 1,
},
},
}, nil).Once()

// Test Point 1: Create an agent.
agent, err := suite.CreateAgent(t)
require.NoError(t, err)

_ = failpoint.Enable("github.com/pingcap/tiflow/pkg/p2p/ClientInjectClosed", "5*return(true)")
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/pkg/p2p/ClientInjectClosed")
}()

// Test Point 2: We should tolerate the error ErrPeerMessageClientClosed
for i := 0; i < 6; i++ {
err = agent.Tick(suite.cdcCtx)
require.NoError(t, err)
}

select {
case <-suite.ctx.Done():
require.Fail(t, "context should not be canceled")
case syncMsg := <-suite.syncCh:
require.Equal(t, &model.SyncMessage{
ProcessorVersion: version.ReleaseSemver(),
Running: nil,
Adding: nil,
Removing: nil,
}, syncMsg)
}
}
5 changes: 5 additions & 0 deletions pkg/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,11 @@ func (c *MessageClient) TrySendMessage(ctx context.Context, topic Topic, value i
failpoint.Return(0, cerrors.ErrPeerMessageSendTryAgain.GenWithStackByArgs())
})

// FIXME (zixiong): This is a temporary way for testing whether the caller can handler this error.
failpoint.Inject("ClientInjectClosed", func() {
failpoint.Return(0, cerrors.ErrPeerMessageClientClosed.GenWithStackByArgs())
})

return c.sendMessage(ctx, topic, value, true)
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/p2p/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func TestClientSendAnomalies(t *testing.T) {
client.connector = connector
connector.On("Connect", mock.Anything).Return(grpcClient, func() {}, nil)

grpcStream := newMockSendMessageClient(ctx)
grpcStream := newMockSendMessageClient(runCtx)
grpcClient.On("SendMessage", mock.Anything, []grpc.CallOption(nil)).Return(
grpcStream,
nil,
Expand All @@ -315,7 +315,6 @@ func TestClientSendAnomalies(t *testing.T) {
ClientVersion: "v5.4.0",
SenderAdvertisedAddr: "fake-addr:8300",
}, packet.Meta)
closeClient()
})

grpcStream.On("Recv").Return(nil, nil)
Expand All @@ -335,11 +334,14 @@ func TestClientSendAnomalies(t *testing.T) {
require.Regexp(t, ".*ErrPeerMessageSendTryAgain.*", err.Error())

// Test point 2: close the client while SendMessage is blocking.
go func() {
time.Sleep(100 * time.Millisecond)
closeClient()
}()
_, err = client.SendMessage(ctx, "test-topic", &testMessage{Value: 1})
require.Error(t, err)
require.Regexp(t, ".*ErrPeerMessageClientClosed.*", err.Error())

closeClient()
wg.Wait()

// Test point 3: call SendMessage after the client is closed.
Expand Down

0 comments on commit 68db903

Please sign in to comment.