From b9492697b0fa711db9facf1e59435d88424d9380 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Tue, 3 Jan 2023 17:12:19 +0800 Subject: [PATCH] This is an automated cherry-pick of #8005 Signed-off-by: ti-chi-bot --- cdc/server.go | 4 +- engine/pkg/p2p/message_sender.go | 99 +++++++++++++++++++++++ engine/pkg/p2p/server_integration_test.go | 97 ++++++++++++++++++++++ pkg/cmd/server/server_test.go | 12 +++ pkg/config/config_test_data.go | 16 ++++ pkg/config/messages.go | 17 ++++ pkg/config/messages_test.go | 7 ++ pkg/p2p/client.go | 11 ++- pkg/p2p/client_connector.go | 4 +- pkg/p2p/client_test.go | 1 + pkg/p2p/message_router_test.go | 1 + pkg/p2p/mock_cluster.go | 1 + pkg/p2p/server.go | 2 + pkg/p2p/server_client_integration_test.go | 1 + pkg/p2p/server_wrapper.go | 10 ++- pkg/p2p/server_wrapper_test.go | 5 +- 16 files changed, 279 insertions(+), 9 deletions(-) create mode 100644 engine/pkg/p2p/message_sender.go create mode 100644 engine/pkg/p2p/server_integration_test.go diff --git a/cdc/server.go b/cdc/server.go index 54010a54df6..3c05cd4771a 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -101,7 +101,7 @@ func NewServer(pdEndpoints []string) (*Server, error) { s := &Server{ pdEndpoints: pdEndpoints, - grpcService: p2p.NewServerWrapper(), + grpcService: p2p.NewServerWrapper(debugConfig.Messages.ToMessageServerConfig()), tcpServer: tcpServer, } @@ -273,7 +273,7 @@ func (s *Server) run(ctx context.Context) (err error) { conf := config.GetGlobalServerConfig() if conf.Debug.EnableNewScheduler { - grpcServer := grpc.NewServer() + grpcServer := grpc.NewServer(s.grpcService.ServerOptions()...) p2pProto.RegisterCDCPeerToPeerServer(grpcServer, s.grpcService) wg.Go(func() error { diff --git a/engine/pkg/p2p/message_sender.go b/engine/pkg/p2p/message_sender.go new file mode 100644 index 00000000000..1d9eebbd4a7 --- /dev/null +++ b/engine/pkg/p2p/message_sender.go @@ -0,0 +1,99 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package p2p + +import ( + "context" + "time" + + "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/p2p" + "github.com/pingcap/tiflow/pkg/security" +) + +// MessageSender is used to send a message of a given topic to a given node. +type MessageSender interface { + // TODO investigate whether we need to implement a barrier mechanism + + // SendToNode sends a message to a given node. Returns whether it is successful and a possible error. + // A `would-block` error will not be returned. (false, nil) would be returned instead. + SendToNode(ctx context.Context, targetNodeID NodeID, topic Topic, message interface{}) (bool, error) + + // SendToNodeB sends a message to a given node in a blocking way + SendToNodeB(ctx context.Context, targetNodeID NodeID, topic Topic, message interface{}) error +} + +type messageSenderImpl struct { + router MessageRouter +} + +// NewMessageSender returns a new message sender. +func NewMessageSender(router MessageRouter) MessageSender { + return &messageSenderImpl{router: router} +} + +// SendToNodeB implements MessageSender.SendToNodeB +// Note the blocking send may have performance issue, BE CAUTION when using this function. +func (m *messageSenderImpl) SendToNodeB( + ctx context.Context, targetNodeID NodeID, topic Topic, message interface{}, +) error { + client := m.router.GetClient(targetNodeID) + if client == nil { + return errors.ErrExecutorNotFoundForMessage.GenWithStackByArgs() + } + + // TODO: blocking send in p2p library may have performance issue + _, err := client.SendMessage(ctx, topic, message) + return err +} + +func (m *messageSenderImpl) SendToNode(ctx context.Context, targetNodeID NodeID, topic Topic, message interface{}) (bool, error) { + client := m.router.GetClient(targetNodeID) + if client == nil { + return false, nil + } + + _, err := client.TrySendMessage(ctx, topic, message) + if err != nil { + if errors.Is(err, errors.ErrPeerMessageSendTryAgain) { + return false, nil + } + return false, errors.Trace(err) + } + return true, nil +} + +// MessageRouter alias to p2p.MessageRouter +type MessageRouter = p2p.MessageRouter + +var defaultClientConfig = &p2p.MessageClientConfig{ + SendChannelSize: 128, + BatchSendInterval: 100 * time.Millisecond, // essentially disables flushing + MaxBatchBytes: 8 * 1024 * 1024, // 8MB + MaxBatchCount: 4096, + RetryRateLimitPerSecond: 1.0, // once per second + ClientVersion: "v5.4.0", // a fake version + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB +} + +// NewMessageRouter creates a new MessageRouter instance via tiflow p2p API +func NewMessageRouter(nodeID NodeID, advertisedAddr string) MessageRouter { + config := *defaultClientConfig // copy + config.AdvertisedAddr = advertisedAddr + return p2p.NewMessageRouter( + nodeID, + &security.Credential{ /* TLS not supported for now */ }, + &config, + ) +} diff --git a/engine/pkg/p2p/server_integration_test.go b/engine/pkg/p2p/server_integration_test.go new file mode 100644 index 00000000000..98897f4ee8a --- /dev/null +++ b/engine/pkg/p2p/server_integration_test.go @@ -0,0 +1,97 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package p2p + +import ( + "context" + "fmt" + "math" + "net" + "sync" + "testing" + "time" + + "github.com/phayes/freeport" + p2pImpl "github.com/pingcap/tiflow/pkg/p2p" + "github.com/pingcap/tiflow/pkg/security" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +func makeListenerForServerTests(t *testing.T) (l net.Listener, addr string) { + port := freeport.GetPort() + addr = fmt.Sprintf("127.0.0.1:%d", port) + l, err := net.Listen("tcp", addr) + require.NoError(t, err) + return +} + +// read only +var clientConfigForUnitTesting = &p2pImpl.MessageClientConfig{ + SendChannelSize: 1, + BatchSendInterval: time.Second, + MaxBatchBytes: math.MaxInt64, + MaxBatchCount: math.MaxInt64, + RetryRateLimitPerSecond: 999.0, + ClientVersion: "v5.4.0", // a fake version + AdvertisedAddr: "fake-addr:8300", + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB +} + +func TestMessageRPCServiceBasics(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + l, addr := makeListenerForServerTests(t) + messageSrvc, err := NewMessageRPCService("test-node-1", &security.Credential{} /* no TLS */) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := messageSrvc.Serve(ctx, l) + require.Error(t, err) + require.Regexp(t, ".*canceled.*", err.Error()) + }() + + var called atomic.Bool + handlerManager := messageSrvc.MakeHandlerManager() + ok, err := handlerManager.RegisterHandler(ctx, "test-topic-1", &msgContent{}, func(sender NodeID, value MessageValue) error { + require.Equal(t, "test-client-1", sender) + require.IsType(t, &msgContent{}, value) + require.False(t, called.Swap(true)) + return nil + }) + require.NoError(t, err) + require.True(t, ok) + + client := p2pImpl.NewMessageClient("test-client-1", clientConfigForUnitTesting) + wg.Add(1) + go func() { + defer wg.Done() + err := client.Run(ctx, "tcp", addr, "test-node-1", &security.Credential{} /* no TLS */) + require.Error(t, err) + require.Regexp(t, ".*canceled.*", err.Error()) + }() + + _, err = client.SendMessage(ctx, "test-topic-1", &msgContent{}) + require.NoError(t, err) + require.Eventually(t, func() bool { + return called.Load() + }, 5*time.Second, 10*time.Millisecond) + + cancel() + wg.Wait() +} diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 5012be7c59d..1df3ff321ad 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -208,6 +208,7 @@ func TestParseCfg(t *testing.T) { ServerMaxPendingMessageCount: 102400, ServerAckInterval: config.TomlDuration(time.Millisecond * 100), ServerWorkerPoolSize: 4, + MaxRecvMsgSize: 256 * 1024 * 1024, }, }, }, o.serverConfig) @@ -273,6 +274,14 @@ client-retry-rate-limit = 100.0 server-max-pending-message-count = 1024 server-ack-interval = "1s" server-worker-pool-size = 16 +<<<<<<< HEAD +======= +max-recv-msg-size = 4 +[debug.scheduler] +heartbeat-tick = 3 +max-task-concurrency = 11 +check-balance-interval = "10s" +>>>>>>> 5da8a95464 (p2p(ticdc): use a larger max receive message size (#8005)) `, dataDir) err := os.WriteFile(configPath, []byte(configContent), 0o644) require.Nil(t, err) @@ -354,6 +363,7 @@ server-worker-pool-size = 16 ServerMaxPendingMessageCount: 1024, ServerAckInterval: config.TomlDuration(1 * time.Second), ServerWorkerPoolSize: 16, + MaxRecvMsgSize: 4, }, }, }, o.serverConfig) @@ -497,6 +507,7 @@ cert-allowed-cn = ["dd","ee"] ServerMaxPendingMessageCount: 102400, ServerAckInterval: config.TomlDuration(time.Millisecond * 100), ServerWorkerPoolSize: 4, + MaxRecvMsgSize: 256 * 1024 * 1024, }, }, }, o.serverConfig) @@ -557,6 +568,7 @@ unknown3 = 3 ServerMaxPendingMessageCount: 102400, ServerAckInterval: config.TomlDuration(time.Millisecond * 100), ServerWorkerPoolSize: 4, + MaxRecvMsgSize: 256 * 1024 * 1024, }, }, o.serverConfig.Debug) } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 46f2fca3cbb..d09573fc0e2 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -136,9 +136,25 @@ const ( "client-retry-rate-limit": 1, "server-max-pending-message-count": 102400, "server-ack-interval": 100000000, +<<<<<<< HEAD "server-worker-pool-size": 4 } } +======= + "server-worker-pool-size": 4, + "max-recv-msg-size": 268435456 + }, + "scheduler": { + "heartbeat-tick": 2, + "max-task-concurrency": 10, + "check-balance-interval": 60000000000, + "add-table-batch-size": 50, + "region-per-span": 0 + }, + "enable-new-sink": true + }, + "cluster-id": "default" +>>>>>>> 5da8a95464 (p2p(ticdc): use a larger max receive message size (#8005)) }` testCfgTestReplicaConfigMarshal1 = `{ diff --git a/pkg/config/messages.go b/pkg/config/messages.go index af14190e257..d4057f72ce9 100644 --- a/pkg/config/messages.go +++ b/pkg/config/messages.go @@ -20,6 +20,8 @@ import ( "github.com/pingcap/tiflow/pkg/p2p" ) +const defaultMaxRecvMsgSize = 256 * 1024 * 1024 // 256MB + // MessagesConfig configs MessageServer and MessageClient. type MessagesConfig struct { ClientMaxBatchInterval TomlDuration `toml:"client-max-batch-interval" json:"client-max-batch-interval"` @@ -30,6 +32,9 @@ type MessagesConfig struct { ServerMaxPendingMessageCount int `toml:"server-max-pending-message-count" json:"server-max-pending-message-count"` ServerAckInterval TomlDuration `toml:"server-ack-interval" json:"server-ack-interval"` ServerWorkerPoolSize int `toml:"server-worker-pool-size" json:"server-worker-pool-size"` + + // MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive. + MaxRecvMsgSize int `toml:"max-recv-msg-size" json:"max-recv-msg-size"` } // read only @@ -42,6 +47,7 @@ var defaultMessageConfig = &MessagesConfig{ ServerMaxPendingMessageCount: 102400, ServerAckInterval: TomlDuration(time.Millisecond * 100), ServerWorkerPoolSize: 4, + MaxRecvMsgSize: defaultMaxRecvMsgSize, } const ( @@ -121,6 +127,14 @@ func (c *MessagesConfig) ValidateAndAdjust() error { return cerrors.ErrInvalidServerOption.GenWithStackByArgs("server-worker-pool-size is larger than 32") } + if c.MaxRecvMsgSize == 0 { + c.MaxRecvMsgSize = defaultMaxRecvMsgSize + } + if c.MaxRecvMsgSize < 0 { + return cerrors.ErrInvalidServerOption.GenWithStackByArgs( + "max-recv-msg-size must be larger than 0") + } + return nil } @@ -134,6 +148,7 @@ func (c *MessagesConfig) Clone() *MessagesConfig { ServerMaxPendingMessageCount: c.ServerMaxPendingMessageCount, ServerAckInterval: c.ServerAckInterval, ServerWorkerPoolSize: c.ServerWorkerPoolSize, + MaxRecvMsgSize: c.MaxRecvMsgSize, } } @@ -146,6 +161,7 @@ func (c *MessagesConfig) ToMessageClientConfig() *p2p.MessageClientConfig { MaxBatchCount: c.ClientMaxBatchCount, RetryRateLimitPerSecond: c.ClientRetryRateLimit, DialTimeout: clientDialTimeout, + MaxRecvMsgSize: c.MaxRecvMsgSize, } } @@ -160,5 +176,6 @@ func (c *MessagesConfig) ToMessageServerConfig() *p2p.MessageServerConfig { MaxPeerCount: maxPeerCount, WaitUnregisterHandleTimeoutThreshold: unregisterHandleTimeout, SendRateLimitPerStream: serverSendRateLimit, + MaxRecvMsgSize: c.MaxRecvMsgSize, } } diff --git a/pkg/config/messages_test.go b/pkg/config/messages_test.go index b2fa1f9188e..53d3f068d04 100644 --- a/pkg/config/messages_test.go +++ b/pkg/config/messages_test.go @@ -33,6 +33,7 @@ func TestDefaultMessageServerConfig(t *testing.T) { require.Greater(t, serverConfig.SendRateLimitPerStream, 0.1) require.Greater(t, serverConfig.MaxPeerCount, 0) require.Greater(t, serverConfig.WaitUnregisterHandleTimeoutThreshold, time.Duration(0)) + require.EqualValues(t, serverConfig.MaxRecvMsgSize, defaultMaxRecvMsgSize) } func TestDefaultMessageClientConfig(t *testing.T) { @@ -43,6 +44,7 @@ func TestDefaultMessageClientConfig(t *testing.T) { require.Greater(t, clientConfig.MaxBatchCount, 0) require.Greater(t, clientConfig.RetryRateLimitPerSecond, 0.1) require.Greater(t, clientConfig.DialTimeout, time.Duration(0)) + require.EqualValues(t, clientConfig.MaxRecvMsgSize, defaultMaxRecvMsgSize) } func TestMessagesConfigClone(t *testing.T) { @@ -73,4 +75,9 @@ func TestMessagesConfigValidateAndAdjust(t *testing.T) { err = illegalConfig.ValidateAndAdjust() require.Error(t, err) require.Regexp(t, ".*ErrInvalidServerOption.*", err.Error()) + + illegalConfig = defaultMessageConfig.Clone() + illegalConfig.MaxRecvMsgSize = -1 + err = illegalConfig.ValidateAndAdjust() + require.Error(t, err) } diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index da0586706fa..9f5833537a8 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -54,6 +54,8 @@ type MessageClientConfig struct { // The version of the client for compatibility check. // It should be in semver format. Empty string means no check. ClientVersion string + // MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive. + MaxRecvMsgSize int } // MessageClient is a client used to send peer messages. @@ -140,10 +142,11 @@ func (c *MessageClient) Run( } gRPCClient, release, err := c.connector.Connect(clientConnectOptions{ - network: network, - addr: addr, - credential: credential, - timeout: c.config.DialTimeout, + network: network, + addr: addr, + credential: credential, + timeout: c.config.DialTimeout, + maxRecvMsgSize: c.config.MaxRecvMsgSize, }) if err != nil { log.Warn("peer-message client: failed to connect to server", diff --git a/pkg/p2p/client_connector.go b/pkg/p2p/client_connector.go index 8cc1a15b397..a840d49df8a 100644 --- a/pkg/p2p/client_connector.go +++ b/pkg/p2p/client_connector.go @@ -35,7 +35,8 @@ type clientConnectOptions struct { // credential is used to setup the connection to the gRPC server. credential *security.Credential // timeout specifies the DialTimeout of the connection. - timeout time.Duration + timeout time.Duration + maxRecvMsgSize int } type cancelFn = func() @@ -63,6 +64,7 @@ func (c *clientConnectorImpl) Connect(opts clientConnectOptions) (proto.CDCPeerT conn, err := grpc.Dial( opts.addr, securityOption, + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(opts.maxRecvMsgSize)), grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { return net.DialTimeout(opts.network, s, opts.timeout) }), diff --git a/pkg/p2p/client_test.go b/pkg/p2p/client_test.go index fab827582fd..e4228cdeae3 100644 --- a/pkg/p2p/client_test.go +++ b/pkg/p2p/client_test.go @@ -67,6 +67,7 @@ var clientConfigForUnitTesting = &MessageClientConfig{ RetryRateLimitPerSecond: 999.0, ClientVersion: "v5.4.0", // a fake version AdvertisedAddr: "fake-addr:8300", + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB } func TestMessageClientBasics(t *testing.T) { diff --git a/pkg/p2p/message_router_test.go b/pkg/p2p/message_router_test.go index ed843042dbf..ac1196f2393 100644 --- a/pkg/p2p/message_router_test.go +++ b/pkg/p2p/message_router_test.go @@ -45,6 +45,7 @@ var clientConfig4TestingMessageRouter = &MessageClientConfig{ MaxBatchBytes: 8192, RetryRateLimitPerSecond: 10.0, // using 10.0 instead of 1.0 to accelerate testing DialTimeout: time.Second * 3, + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB } func newMessageRouterTestSuite() *messageRouterTestSuite { diff --git a/pkg/p2p/mock_cluster.go b/pkg/p2p/mock_cluster.go index 2315ef7994b..a679f270e25 100644 --- a/pkg/p2p/mock_cluster.go +++ b/pkg/p2p/mock_cluster.go @@ -64,6 +64,7 @@ var clientConfig4MockCluster = &MessageClientConfig{ MaxBatchBytes: 8192, RetryRateLimitPerSecond: 10.0, // using 10.0 instead of 1.0 to accelerate testing DialTimeout: time.Second * 3, + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB } func newMockNode(t *testing.T, id NodeID) *MockNode { diff --git a/pkg/p2p/server.go b/pkg/p2p/server.go index e80ecb47dff..b40da50c8fc 100755 --- a/pkg/p2p/server.go +++ b/pkg/p2p/server.go @@ -58,6 +58,8 @@ type MessageServerConfig struct { MaxPeerCount int // Semver of the server. Empty string means no version check. ServerVersion string + // MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive. + MaxRecvMsgSize int // The maximum time duration to wait before forcefully removing a handler. // diff --git a/pkg/p2p/server_client_integration_test.go b/pkg/p2p/server_client_integration_test.go index b5f300414d8..99cfc912f88 100644 --- a/pkg/p2p/server_client_integration_test.go +++ b/pkg/p2p/server_client_integration_test.go @@ -41,6 +41,7 @@ var clientConfig4Testing = &MessageClientConfig{ MaxBatchBytes: 8192, RetryRateLimitPerSecond: 10.0, // using 10.0 instead of 1.0 to accelerate testing DialTimeout: time.Second * 3, + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB } type serverConfigOpt = func(config *MessageServerConfig) diff --git a/pkg/p2p/server_wrapper.go b/pkg/p2p/server_wrapper.go index bf9b5b803a1..596c824d29f 100644 --- a/pkg/p2p/server_wrapper.go +++ b/pkg/p2p/server_wrapper.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/proto/p2p" "go.uber.org/zap" + "google.golang.org/grpc" "google.golang.org/grpc/codes" gRPCPeer "google.golang.org/grpc/peer" "google.golang.org/grpc/status" @@ -52,18 +53,25 @@ func (w *streamWrapper) Context() context.Context { type ServerWrapper struct { rwMu sync.RWMutex innerServer p2p.CDCPeerToPeerServer + cfg *MessageServerConfig wrappedStreamsMu sync.Mutex wrappedStreams map[*streamWrapper]struct{} } // NewServerWrapper creates a new ServerWrapper -func NewServerWrapper() *ServerWrapper { +func NewServerWrapper(cfg *MessageServerConfig) *ServerWrapper { return &ServerWrapper{ wrappedStreams: map[*streamWrapper]struct{}{}, + cfg: cfg, } } +// ServerOptions returns server option for creating grpc servers. +func (s *ServerWrapper) ServerOptions() []grpc.ServerOption { + return []grpc.ServerOption{grpc.MaxRecvMsgSize(s.cfg.MaxRecvMsgSize)} +} + // SendMessage implements p2p.CDCPeerToPeerServer func (s *ServerWrapper) SendMessage(stream p2p.CDCPeerToPeer_SendMessageServer) error { s.rwMu.RLock() diff --git a/pkg/p2p/server_wrapper_test.go b/pkg/p2p/server_wrapper_test.go index d83976a6c6d..af517ab24f0 100644 --- a/pkg/p2p/server_wrapper_test.go +++ b/pkg/p2p/server_wrapper_test.go @@ -68,7 +68,10 @@ func newServerWrapperForTesting(t *testing.T) (server *ServerWrapper, newClient var opts []grpc.ServerOption grpcServer := grpc.NewServer(opts...) - server = NewServerWrapper() + cfg := &MessageServerConfig{ + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB + } + server = NewServerWrapper(cfg) p2p.RegisterCDCPeerToPeerServer(grpcServer, server) var wg sync.WaitGroup