Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#8005
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
overvenus authored and ti-chi-bot committed Jan 3, 2023
1 parent 38f1d35 commit b949269
Show file tree
Hide file tree
Showing 16 changed files with 279 additions and 9 deletions.
4 changes: 2 additions & 2 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func NewServer(pdEndpoints []string) (*Server, error) {

s := &Server{
pdEndpoints: pdEndpoints,
grpcService: p2p.NewServerWrapper(),
grpcService: p2p.NewServerWrapper(debugConfig.Messages.ToMessageServerConfig()),
tcpServer: tcpServer,
}

Expand Down Expand Up @@ -273,7 +273,7 @@ func (s *Server) run(ctx context.Context) (err error) {

conf := config.GetGlobalServerConfig()
if conf.Debug.EnableNewScheduler {
grpcServer := grpc.NewServer()
grpcServer := grpc.NewServer(s.grpcService.ServerOptions()...)
p2pProto.RegisterCDCPeerToPeerServer(grpcServer, s.grpcService)

wg.Go(func() error {
Expand Down
99 changes: 99 additions & 0 deletions engine/pkg/p2p/message_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package p2p

import (
"context"
"time"

"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/security"
)

// MessageSender is used to send a message of a given topic to a given node.
type MessageSender interface {
// TODO investigate whether we need to implement a barrier mechanism

// SendToNode sends a message to a given node. Returns whether it is successful and a possible error.
// A `would-block` error will not be returned. (false, nil) would be returned instead.
SendToNode(ctx context.Context, targetNodeID NodeID, topic Topic, message interface{}) (bool, error)

// SendToNodeB sends a message to a given node in a blocking way
SendToNodeB(ctx context.Context, targetNodeID NodeID, topic Topic, message interface{}) error
}

type messageSenderImpl struct {
router MessageRouter
}

// NewMessageSender returns a new message sender.
func NewMessageSender(router MessageRouter) MessageSender {
return &messageSenderImpl{router: router}
}

// SendToNodeB implements MessageSender.SendToNodeB
// Note the blocking send may have performance issue, BE CAUTION when using this function.
func (m *messageSenderImpl) SendToNodeB(
ctx context.Context, targetNodeID NodeID, topic Topic, message interface{},
) error {
client := m.router.GetClient(targetNodeID)
if client == nil {
return errors.ErrExecutorNotFoundForMessage.GenWithStackByArgs()
}

// TODO: blocking send in p2p library may have performance issue
_, err := client.SendMessage(ctx, topic, message)
return err
}

func (m *messageSenderImpl) SendToNode(ctx context.Context, targetNodeID NodeID, topic Topic, message interface{}) (bool, error) {
client := m.router.GetClient(targetNodeID)
if client == nil {
return false, nil
}

_, err := client.TrySendMessage(ctx, topic, message)
if err != nil {
if errors.Is(err, errors.ErrPeerMessageSendTryAgain) {
return false, nil
}
return false, errors.Trace(err)
}
return true, nil
}

// MessageRouter alias to p2p.MessageRouter
type MessageRouter = p2p.MessageRouter

var defaultClientConfig = &p2p.MessageClientConfig{
SendChannelSize: 128,
BatchSendInterval: 100 * time.Millisecond, // essentially disables flushing
MaxBatchBytes: 8 * 1024 * 1024, // 8MB
MaxBatchCount: 4096,
RetryRateLimitPerSecond: 1.0, // once per second
ClientVersion: "v5.4.0", // a fake version
MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB
}

// NewMessageRouter creates a new MessageRouter instance via tiflow p2p API
func NewMessageRouter(nodeID NodeID, advertisedAddr string) MessageRouter {
config := *defaultClientConfig // copy
config.AdvertisedAddr = advertisedAddr
return p2p.NewMessageRouter(
nodeID,
&security.Credential{ /* TLS not supported for now */ },
&config,
)
}
97 changes: 97 additions & 0 deletions engine/pkg/p2p/server_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package p2p

import (
"context"
"fmt"
"math"
"net"
"sync"
"testing"
"time"

"github.com/phayes/freeport"
p2pImpl "github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/security"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

func makeListenerForServerTests(t *testing.T) (l net.Listener, addr string) {
port := freeport.GetPort()
addr = fmt.Sprintf("127.0.0.1:%d", port)
l, err := net.Listen("tcp", addr)
require.NoError(t, err)
return
}

// read only
var clientConfigForUnitTesting = &p2pImpl.MessageClientConfig{
SendChannelSize: 1,
BatchSendInterval: time.Second,
MaxBatchBytes: math.MaxInt64,
MaxBatchCount: math.MaxInt64,
RetryRateLimitPerSecond: 999.0,
ClientVersion: "v5.4.0", // a fake version
AdvertisedAddr: "fake-addr:8300",
MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB
}

func TestMessageRPCServiceBasics(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

l, addr := makeListenerForServerTests(t)
messageSrvc, err := NewMessageRPCService("test-node-1", &security.Credential{} /* no TLS */)
require.NoError(t, err)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := messageSrvc.Serve(ctx, l)
require.Error(t, err)
require.Regexp(t, ".*canceled.*", err.Error())
}()

var called atomic.Bool
handlerManager := messageSrvc.MakeHandlerManager()
ok, err := handlerManager.RegisterHandler(ctx, "test-topic-1", &msgContent{}, func(sender NodeID, value MessageValue) error {
require.Equal(t, "test-client-1", sender)
require.IsType(t, &msgContent{}, value)
require.False(t, called.Swap(true))
return nil
})
require.NoError(t, err)
require.True(t, ok)

client := p2pImpl.NewMessageClient("test-client-1", clientConfigForUnitTesting)
wg.Add(1)
go func() {
defer wg.Done()
err := client.Run(ctx, "tcp", addr, "test-node-1", &security.Credential{} /* no TLS */)
require.Error(t, err)
require.Regexp(t, ".*canceled.*", err.Error())
}()

_, err = client.SendMessage(ctx, "test-topic-1", &msgContent{})
require.NoError(t, err)
require.Eventually(t, func() bool {
return called.Load()
}, 5*time.Second, 10*time.Millisecond)

cancel()
wg.Wait()
}
12 changes: 12 additions & 0 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func TestParseCfg(t *testing.T) {
ServerMaxPendingMessageCount: 102400,
ServerAckInterval: config.TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
MaxRecvMsgSize: 256 * 1024 * 1024,
},
},
}, o.serverConfig)
Expand Down Expand Up @@ -273,6 +274,14 @@ client-retry-rate-limit = 100.0
server-max-pending-message-count = 1024
server-ack-interval = "1s"
server-worker-pool-size = 16
<<<<<<< HEAD
=======
max-recv-msg-size = 4
[debug.scheduler]
heartbeat-tick = 3
max-task-concurrency = 11
check-balance-interval = "10s"
>>>>>>> 5da8a95464 (p2p(ticdc): use a larger max receive message size (#8005))
`, dataDir)
err := os.WriteFile(configPath, []byte(configContent), 0o644)
require.Nil(t, err)
Expand Down Expand Up @@ -354,6 +363,7 @@ server-worker-pool-size = 16
ServerMaxPendingMessageCount: 1024,
ServerAckInterval: config.TomlDuration(1 * time.Second),
ServerWorkerPoolSize: 16,
MaxRecvMsgSize: 4,
},
},
}, o.serverConfig)
Expand Down Expand Up @@ -497,6 +507,7 @@ cert-allowed-cn = ["dd","ee"]
ServerMaxPendingMessageCount: 102400,
ServerAckInterval: config.TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
MaxRecvMsgSize: 256 * 1024 * 1024,
},
},
}, o.serverConfig)
Expand Down Expand Up @@ -557,6 +568,7 @@ unknown3 = 3
ServerMaxPendingMessageCount: 102400,
ServerAckInterval: config.TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
MaxRecvMsgSize: 256 * 1024 * 1024,
},
}, o.serverConfig.Debug)
}
16 changes: 16 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,25 @@ const (
"client-retry-rate-limit": 1,
"server-max-pending-message-count": 102400,
"server-ack-interval": 100000000,
<<<<<<< HEAD
"server-worker-pool-size": 4
}
}
=======
"server-worker-pool-size": 4,
"max-recv-msg-size": 268435456
},
"scheduler": {
"heartbeat-tick": 2,
"max-task-concurrency": 10,
"check-balance-interval": 60000000000,
"add-table-batch-size": 50,
"region-per-span": 0
},
"enable-new-sink": true
},
"cluster-id": "default"
>>>>>>> 5da8a95464 (p2p(ticdc): use a larger max receive message size (#8005))
}`

testCfgTestReplicaConfigMarshal1 = `{
Expand Down
17 changes: 17 additions & 0 deletions pkg/config/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/pingcap/tiflow/pkg/p2p"
)

const defaultMaxRecvMsgSize = 256 * 1024 * 1024 // 256MB

// MessagesConfig configs MessageServer and MessageClient.
type MessagesConfig struct {
ClientMaxBatchInterval TomlDuration `toml:"client-max-batch-interval" json:"client-max-batch-interval"`
Expand All @@ -30,6 +32,9 @@ type MessagesConfig struct {
ServerMaxPendingMessageCount int `toml:"server-max-pending-message-count" json:"server-max-pending-message-count"`
ServerAckInterval TomlDuration `toml:"server-ack-interval" json:"server-ack-interval"`
ServerWorkerPoolSize int `toml:"server-worker-pool-size" json:"server-worker-pool-size"`

// MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive.
MaxRecvMsgSize int `toml:"max-recv-msg-size" json:"max-recv-msg-size"`
}

// read only
Expand All @@ -42,6 +47,7 @@ var defaultMessageConfig = &MessagesConfig{
ServerMaxPendingMessageCount: 102400,
ServerAckInterval: TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
MaxRecvMsgSize: defaultMaxRecvMsgSize,
}

const (
Expand Down Expand Up @@ -121,6 +127,14 @@ func (c *MessagesConfig) ValidateAndAdjust() error {
return cerrors.ErrInvalidServerOption.GenWithStackByArgs("server-worker-pool-size is larger than 32")
}

if c.MaxRecvMsgSize == 0 {
c.MaxRecvMsgSize = defaultMaxRecvMsgSize
}
if c.MaxRecvMsgSize < 0 {
return cerrors.ErrInvalidServerOption.GenWithStackByArgs(
"max-recv-msg-size must be larger than 0")
}

return nil
}

Expand All @@ -134,6 +148,7 @@ func (c *MessagesConfig) Clone() *MessagesConfig {
ServerMaxPendingMessageCount: c.ServerMaxPendingMessageCount,
ServerAckInterval: c.ServerAckInterval,
ServerWorkerPoolSize: c.ServerWorkerPoolSize,
MaxRecvMsgSize: c.MaxRecvMsgSize,
}
}

Expand All @@ -146,6 +161,7 @@ func (c *MessagesConfig) ToMessageClientConfig() *p2p.MessageClientConfig {
MaxBatchCount: c.ClientMaxBatchCount,
RetryRateLimitPerSecond: c.ClientRetryRateLimit,
DialTimeout: clientDialTimeout,
MaxRecvMsgSize: c.MaxRecvMsgSize,
}
}

Expand All @@ -160,5 +176,6 @@ func (c *MessagesConfig) ToMessageServerConfig() *p2p.MessageServerConfig {
MaxPeerCount: maxPeerCount,
WaitUnregisterHandleTimeoutThreshold: unregisterHandleTimeout,
SendRateLimitPerStream: serverSendRateLimit,
MaxRecvMsgSize: c.MaxRecvMsgSize,
}
}
7 changes: 7 additions & 0 deletions pkg/config/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestDefaultMessageServerConfig(t *testing.T) {
require.Greater(t, serverConfig.SendRateLimitPerStream, 0.1)
require.Greater(t, serverConfig.MaxPeerCount, 0)
require.Greater(t, serverConfig.WaitUnregisterHandleTimeoutThreshold, time.Duration(0))
require.EqualValues(t, serverConfig.MaxRecvMsgSize, defaultMaxRecvMsgSize)
}

func TestDefaultMessageClientConfig(t *testing.T) {
Expand All @@ -43,6 +44,7 @@ func TestDefaultMessageClientConfig(t *testing.T) {
require.Greater(t, clientConfig.MaxBatchCount, 0)
require.Greater(t, clientConfig.RetryRateLimitPerSecond, 0.1)
require.Greater(t, clientConfig.DialTimeout, time.Duration(0))
require.EqualValues(t, clientConfig.MaxRecvMsgSize, defaultMaxRecvMsgSize)
}

func TestMessagesConfigClone(t *testing.T) {
Expand Down Expand Up @@ -73,4 +75,9 @@ func TestMessagesConfigValidateAndAdjust(t *testing.T) {
err = illegalConfig.ValidateAndAdjust()
require.Error(t, err)
require.Regexp(t, ".*ErrInvalidServerOption.*", err.Error())

illegalConfig = defaultMessageConfig.Clone()
illegalConfig.MaxRecvMsgSize = -1
err = illegalConfig.ValidateAndAdjust()
require.Error(t, err)
}
Loading

0 comments on commit b949269

Please sign in to comment.