Skip to content

Commit

Permalink
p2p(ticdc): use a larger max receive message size (#8005) (#8014)
Browse files Browse the repository at this point in the history
close #8004
  • Loading branch information
ti-chi-bot authored Jan 6, 2023
1 parent b99b726 commit ca4a0bf
Show file tree
Hide file tree
Showing 16 changed files with 66 additions and 12 deletions.
4 changes: 2 additions & 2 deletions cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func New(pdEndpoints []string) (*server, error) {

s := &server{
pdEndpoints: pdEndpoints,
grpcService: p2p.NewServerWrapper(),
grpcService: p2p.NewServerWrapper(debugConfig.Messages.ToMessageServerConfig()),
tcpServer: tcpServer,

useEventSortEngine: useEventSortEngine,
Expand Down Expand Up @@ -377,7 +377,7 @@ func (s *server) run(ctx context.Context) (err error) {
}

if conf.Debug.EnableNewScheduler {
grpcServer := grpc.NewServer()
grpcServer := grpc.NewServer(s.grpcService.ServerOptions()...)
p2pProto.RegisterCDCPeerToPeerServer(grpcServer, s.grpcService)

wg.Go(func() error {
Expand Down
5 changes: 3 additions & 2 deletions engine/pkg/p2p/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ var defaultClientConfig = &p2p.MessageClientConfig{
BatchSendInterval: 100 * time.Millisecond, // essentially disables flushing
MaxBatchBytes: 8 * 1024 * 1024, // 8MB
MaxBatchCount: 4096,
RetryRateLimitPerSecond: 1.0, // once per second
ClientVersion: "v5.4.0", // a fake version
RetryRateLimitPerSecond: 1.0, // once per second
ClientVersion: "v5.4.0", // a fake version
MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB
}

// NewMessageRouter creates a new MessageRouter instance via tiflow p2p API
Expand Down
1 change: 1 addition & 0 deletions engine/pkg/p2p/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var clientConfigForUnitTesting = &p2pImpl.MessageClientConfig{
RetryRateLimitPerSecond: 999.0,
ClientVersion: "v5.4.0", // a fake version
AdvertisedAddr: "fake-addr:8300",
MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB
}

func TestMessageRPCServiceBasics(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func TestParseCfg(t *testing.T) {
ServerMaxPendingMessageCount: 102400,
ServerAckInterval: config.TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
MaxRecvMsgSize: 256 * 1024 * 1024,
},
Scheduler: &config.SchedulerConfig{
HeartbeatTick: 2,
Expand Down Expand Up @@ -277,6 +278,7 @@ client-retry-rate-limit = 100.0
server-max-pending-message-count = 1024
server-ack-interval = "1s"
server-worker-pool-size = 16
max-recv-msg-size = 4
[debug.scheduler]
heartbeat-tick = 3
max-task-concurrency = 11
Expand Down Expand Up @@ -358,6 +360,7 @@ check-balance-interval = "10s"
ServerMaxPendingMessageCount: 1024,
ServerAckInterval: config.TomlDuration(1 * time.Second),
ServerWorkerPoolSize: 16,
MaxRecvMsgSize: 4,
},
Scheduler: &config.SchedulerConfig{
HeartbeatTick: 3,
Expand Down Expand Up @@ -505,6 +508,7 @@ cert-allowed-cn = ["dd","ee"]
ServerMaxPendingMessageCount: 102400,
ServerAckInterval: config.TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
MaxRecvMsgSize: 256 * 1024 * 1024,
},
Scheduler: &config.SchedulerConfig{
HeartbeatTick: 2,
Expand Down Expand Up @@ -569,6 +573,7 @@ unknown3 = 3
ServerMaxPendingMessageCount: 102400,
ServerAckInterval: config.TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
MaxRecvMsgSize: 256 * 1024 * 1024,
},
Scheduler: &config.SchedulerConfig{
HeartbeatTick: 2,
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ const (
"client-retry-rate-limit": 1,
"server-max-pending-message-count": 102400,
"server-ack-interval": 100000000,
"server-worker-pool-size": 4
"server-worker-pool-size": 4,
"max-recv-msg-size": 268435456
},
"scheduler": {
"heartbeat-tick": 2,
Expand Down
17 changes: 17 additions & 0 deletions pkg/config/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/pingcap/tiflow/pkg/p2p"
)

const defaultMaxRecvMsgSize = 256 * 1024 * 1024 // 256MB

// MessagesConfig configs MessageServer and MessageClient.
type MessagesConfig struct {
ClientMaxBatchInterval TomlDuration `toml:"client-max-batch-interval" json:"client-max-batch-interval"`
Expand All @@ -30,6 +32,9 @@ type MessagesConfig struct {
ServerMaxPendingMessageCount int `toml:"server-max-pending-message-count" json:"server-max-pending-message-count"`
ServerAckInterval TomlDuration `toml:"server-ack-interval" json:"server-ack-interval"`
ServerWorkerPoolSize int `toml:"server-worker-pool-size" json:"server-worker-pool-size"`

// MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive.
MaxRecvMsgSize int `toml:"max-recv-msg-size" json:"max-recv-msg-size"`
}

// read only
Expand All @@ -42,6 +47,7 @@ var defaultMessageConfig = &MessagesConfig{
ServerMaxPendingMessageCount: 102400,
ServerAckInterval: TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
MaxRecvMsgSize: defaultMaxRecvMsgSize,
}

const (
Expand Down Expand Up @@ -121,6 +127,14 @@ func (c *MessagesConfig) ValidateAndAdjust() error {
return cerrors.ErrInvalidServerOption.GenWithStackByArgs("server-worker-pool-size is larger than 32")
}

if c.MaxRecvMsgSize == 0 {
c.MaxRecvMsgSize = defaultMaxRecvMsgSize
}
if c.MaxRecvMsgSize < 0 {
return cerrors.ErrInvalidServerOption.GenWithStackByArgs(
"max-recv-msg-size must be larger than 0")
}

return nil
}

Expand All @@ -134,6 +148,7 @@ func (c *MessagesConfig) Clone() *MessagesConfig {
ServerMaxPendingMessageCount: c.ServerMaxPendingMessageCount,
ServerAckInterval: c.ServerAckInterval,
ServerWorkerPoolSize: c.ServerWorkerPoolSize,
MaxRecvMsgSize: c.MaxRecvMsgSize,
}
}

Expand All @@ -146,6 +161,7 @@ func (c *MessagesConfig) ToMessageClientConfig() *p2p.MessageClientConfig {
MaxBatchCount: c.ClientMaxBatchCount,
RetryRateLimitPerSecond: c.ClientRetryRateLimit,
DialTimeout: clientDialTimeout,
MaxRecvMsgSize: c.MaxRecvMsgSize,
}
}

Expand All @@ -160,5 +176,6 @@ func (c *MessagesConfig) ToMessageServerConfig() *p2p.MessageServerConfig {
MaxPeerCount: maxPeerCount,
WaitUnregisterHandleTimeoutThreshold: unregisterHandleTimeout,
SendRateLimitPerStream: serverSendRateLimit,
MaxRecvMsgSize: c.MaxRecvMsgSize,
}
}
7 changes: 7 additions & 0 deletions pkg/config/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestDefaultMessageServerConfig(t *testing.T) {
require.Greater(t, serverConfig.SendRateLimitPerStream, 0.1)
require.Greater(t, serverConfig.MaxPeerCount, 0)
require.Greater(t, serverConfig.WaitUnregisterHandleTimeoutThreshold, time.Duration(0))
require.EqualValues(t, serverConfig.MaxRecvMsgSize, defaultMaxRecvMsgSize)
}

func TestDefaultMessageClientConfig(t *testing.T) {
Expand All @@ -43,6 +44,7 @@ func TestDefaultMessageClientConfig(t *testing.T) {
require.Greater(t, clientConfig.MaxBatchCount, 0)
require.Greater(t, clientConfig.RetryRateLimitPerSecond, 0.1)
require.Greater(t, clientConfig.DialTimeout, time.Duration(0))
require.EqualValues(t, clientConfig.MaxRecvMsgSize, defaultMaxRecvMsgSize)
}

func TestMessagesConfigClone(t *testing.T) {
Expand Down Expand Up @@ -73,4 +75,9 @@ func TestMessagesConfigValidateAndAdjust(t *testing.T) {
err = illegalConfig.ValidateAndAdjust()
require.Error(t, err)
require.Regexp(t, ".*ErrInvalidServerOption.*", err.Error())

illegalConfig = defaultMessageConfig.Clone()
illegalConfig.MaxRecvMsgSize = -1
err = illegalConfig.ValidateAndAdjust()
require.Error(t, err)
}
11 changes: 7 additions & 4 deletions pkg/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type MessageClientConfig struct {
// The version of the client for compatibility check.
// It should be in semver format. Empty string means no check.
ClientVersion string
// MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive.
MaxRecvMsgSize int
}

// MessageClient is a client used to send peer messages.
Expand Down Expand Up @@ -140,10 +142,11 @@ func (c *MessageClient) Run(
}

gRPCClient, release, err := c.connector.Connect(clientConnectOptions{
network: network,
addr: addr,
credential: credential,
timeout: c.config.DialTimeout,
network: network,
addr: addr,
credential: credential,
timeout: c.config.DialTimeout,
maxRecvMsgSize: c.config.MaxRecvMsgSize,
})
if err != nil {
log.Warn("peer-message client: failed to connect to server",
Expand Down
4 changes: 3 additions & 1 deletion pkg/p2p/client_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type clientConnectOptions struct {
// credential is used to setup the connection to the gRPC server.
credential *security.Credential
// timeout specifies the DialTimeout of the connection.
timeout time.Duration
timeout time.Duration
maxRecvMsgSize int
}

type cancelFn = func()
Expand Down Expand Up @@ -63,6 +64,7 @@ func (c *clientConnectorImpl) Connect(opts clientConnectOptions) (proto.CDCPeerT
conn, err := grpc.Dial(
opts.addr,
securityOption,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(opts.maxRecvMsgSize)),
grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) {
return net.DialTimeout(opts.network, s, opts.timeout)
}),
Expand Down
1 change: 1 addition & 0 deletions pkg/p2p/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var clientConfigForUnitTesting = &MessageClientConfig{
RetryRateLimitPerSecond: 999.0,
ClientVersion: "v5.4.0", // a fake version
AdvertisedAddr: "fake-addr:8300",
MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB
}

func TestMessageClientBasics(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/p2p/message_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var clientConfig4TestingMessageRouter = &MessageClientConfig{
MaxBatchBytes: 8192,
RetryRateLimitPerSecond: 10.0, // using 10.0 instead of 1.0 to accelerate testing
DialTimeout: time.Second * 3,
MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB
}

func newMessageRouterTestSuite() *messageRouterTestSuite {
Expand Down
1 change: 1 addition & 0 deletions pkg/p2p/mock_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var clientConfig4MockCluster = &MessageClientConfig{
MaxBatchBytes: 8192,
RetryRateLimitPerSecond: 10.0, // using 10.0 instead of 1.0 to accelerate testing
DialTimeout: time.Second * 3,
MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB
}

func newMockNode(t *testing.T, id NodeID) *MockNode {
Expand Down
2 changes: 2 additions & 0 deletions pkg/p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type MessageServerConfig struct {
MaxPeerCount int
// Semver of the server. Empty string means no version check.
ServerVersion string
// MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive.
MaxRecvMsgSize int

// The maximum time duration to wait before forcefully removing a handler.
//
Expand Down
1 change: 1 addition & 0 deletions pkg/p2p/server_client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var clientConfig4Testing = &MessageClientConfig{
MaxBatchBytes: 8192,
RetryRateLimitPerSecond: 10.0, // using 10.0 instead of 1.0 to accelerate testing
DialTimeout: time.Second * 3,
MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB
}

type serverConfigOpt = func(config *MessageServerConfig)
Expand Down
10 changes: 9 additions & 1 deletion pkg/p2p/server_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/proto/p2p"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
gRPCPeer "google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -52,18 +53,25 @@ func (w *streamWrapper) Context() context.Context {
type ServerWrapper struct {
rwMu sync.RWMutex
innerServer p2p.CDCPeerToPeerServer
cfg *MessageServerConfig

wrappedStreamsMu sync.Mutex
wrappedStreams map[*streamWrapper]struct{}
}

// NewServerWrapper creates a new ServerWrapper
func NewServerWrapper() *ServerWrapper {
func NewServerWrapper(cfg *MessageServerConfig) *ServerWrapper {
return &ServerWrapper{
wrappedStreams: map[*streamWrapper]struct{}{},
cfg: cfg,
}
}

// ServerOptions returns server option for creating grpc servers.
func (s *ServerWrapper) ServerOptions() []grpc.ServerOption {
return []grpc.ServerOption{grpc.MaxRecvMsgSize(s.cfg.MaxRecvMsgSize)}
}

// SendMessage implements p2p.CDCPeerToPeerServer
func (s *ServerWrapper) SendMessage(stream p2p.CDCPeerToPeer_SendMessageServer) error {
s.rwMu.RLock()
Expand Down
5 changes: 4 additions & 1 deletion pkg/p2p/server_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ func newServerWrapperForTesting(t *testing.T) (server *ServerWrapper, newClient
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)

server = NewServerWrapper()
cfg := &MessageServerConfig{
MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB
}
server = NewServerWrapper(cfg)
p2p.RegisterCDCPeerToPeerServer(grpcServer, server)

var wg sync.WaitGroup
Expand Down

0 comments on commit ca4a0bf

Please sign in to comment.