From f356737746c771814590cb4b1f5786b64aec7908 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Fri, 12 Jan 2024 11:28:23 +0100 Subject: [PATCH] refactor(p2p): move chan descs to p2p.channel_params and tune priorities --- go.mod | 2 +- go.sum | 4 +- internal/consensus/reactor.go | 65 +++------------- internal/consensus/reactor_test.go | 16 ++-- internal/evidence/reactor.go | 2 +- internal/p2p/channel_params.go | 120 ++++++++++++++++++++++------- internal/statesync/reactor.go | 36 +-------- node/setup.go | 8 +- 8 files changed, 119 insertions(+), 134 deletions(-) diff --git a/go.mod b/go.mod index 3b24448362..82e74f9a47 100644 --- a/go.mod +++ b/go.mod @@ -115,7 +115,6 @@ require ( go.opentelemetry.io/otel v1.8.0 // indirect go.opentelemetry.io/otel/trace v1.8.0 // indirect go.tmz.dev/musttag v0.7.2 // indirect - golang.org/x/time v0.3.0 // indirect google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect ) @@ -309,4 +308,5 @@ require ( github.com/tendermint/go-amino v0.16.0 github.com/tyler-smith/go-bip39 v1.1.0 golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa + golang.org/x/time v0.5.0 ) diff --git a/go.sum b/go.sum index 0414e8cfb8..8f599f3700 100644 --- a/go.sum +++ b/go.sum @@ -1208,8 +1208,8 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= -golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index bd789bfeec..03acee4157 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -30,54 +30,7 @@ var ( _ p2p.Wrapper = (*tmcons.Message)(nil) ) -// GetChannelDescriptor produces an instance of a descriptor for this -// package's required channels. -func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor { - return map[p2p.ChannelID]*p2p.ChannelDescriptor{ - StateChannel: { - ID: StateChannel, - Priority: 8, - SendQueueCapacity: 64, - RecvMessageCapacity: maxMsgSize, - RecvBufferCapacity: 128, - Name: "state", - }, - DataChannel: { - // TODO: Consider a split between gossiping current block and catchup - // stuff. Once we gossip the whole block there is nothing left to send - // until next height or round. - ID: DataChannel, - Priority: 12, - SendQueueCapacity: 64, - RecvBufferCapacity: 512, - RecvMessageCapacity: maxMsgSize, - Name: "data", - }, - VoteChannel: { - ID: VoteChannel, - Priority: 10, - SendQueueCapacity: 64, - RecvBufferCapacity: 4096, - RecvMessageCapacity: maxMsgSize, - Name: "vote", - }, - VoteSetBitsChannel: { - ID: VoteSetBitsChannel, - Priority: 5, - SendQueueCapacity: 8, - RecvBufferCapacity: 128, - RecvMessageCapacity: maxMsgSize, - Name: "voteSet", - }, - } -} - const ( - StateChannel = p2p.ChannelID(0x20) - DataChannel = p2p.ChannelID(0x21) - VoteChannel = p2p.ChannelID(0x22) - VoteSetBitsChannel = p2p.ChannelID(0x23) - maxMsgSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes. blocksToContributeToBecomeGoodPeer = 10000 @@ -173,23 +126,23 @@ func (r *Reactor) OnStart(ctx context.Context) error { var chBundle channelBundle var err error - chans := getChannelDescriptors() - chBundle.state, err = r.chCreator(ctx, chans[StateChannel]) + chans := p2p.ConsensusChannelDescriptors() + chBundle.state, err = r.chCreator(ctx, chans[p2p.ConsensusStateChannel]) if err != nil { return err } - chBundle.data, err = r.chCreator(ctx, chans[DataChannel]) + chBundle.data, err = r.chCreator(ctx, chans[p2p.ConsensusDataChannel]) if err != nil { return err } - chBundle.vote, err = r.chCreator(ctx, chans[VoteChannel]) + chBundle.vote, err = r.chCreator(ctx, chans[p2p.ConsensusVoteChannel]) if err != nil { return err } - chBundle.voteSet, err = r.chCreator(ctx, chans[VoteSetBitsChannel]) + chBundle.voteSet, err = r.chCreator(ctx, chans[p2p.VoteSetBitsChannel]) if err != nil { return err } @@ -780,13 +733,13 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, cha } switch envelope.ChannelID { - case StateChannel: + case p2p.ConsensusStateChannel: err = r.handleStateMessage(ctx, envelope, msg, chans.voteSet) - case DataChannel: + case p2p.ConsensusDataChannel: err = r.handleDataMessage(ctx, envelope, msg) - case VoteChannel: + case p2p.ConsensusVoteChannel: err = r.handleVoteMessage(ctx, envelope, msg) - case VoteSetBitsChannel: + case p2p.VoteSetBitsChannel: err = r.handleVoteSetBitsMessage(ctx, envelope, msg) default: err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope) diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 6b54be08dc..d48c8d1ba8 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -88,10 +88,10 @@ func setup( blocksyncSubs: make(map[types.NodeID]eventbus.Subscription, numNodes), } - rts.stateChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(StateChannel, size)) - rts.dataChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(DataChannel, size)) - rts.voteChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(VoteChannel, size)) - rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(VoteSetBitsChannel, size)) + rts.stateChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(p2p.ConsensusStateChannel, size)) + rts.dataChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(p2p.ConsensusDataChannel, size)) + rts.voteChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(p2p.ConsensusVoteChannel, size)) + rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(p2p.VoteSetBitsChannel, size)) ctx, cancel := context.WithCancel(ctx) t.Cleanup(cancel) @@ -99,13 +99,13 @@ func setup( chCreator := func(nodeID types.NodeID) p2p.ChannelCreator { return func(ctx context.Context, desc *p2p.ChannelDescriptor) (p2p.Channel, error) { switch desc.ID { - case StateChannel: + case p2p.ConsensusStateChannel: return rts.stateChannels[nodeID], nil - case DataChannel: + case p2p.ConsensusDataChannel: return rts.dataChannels[nodeID], nil - case VoteChannel: + case p2p.ConsensusVoteChannel: return rts.voteChannels[nodeID], nil - case VoteSetBitsChannel: + case p2p.VoteSetBitsChannel: return rts.voteSetBitsChannels[nodeID], nil default: return nil, fmt.Errorf("invalid channel; %v", desc.ID) diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index ff8f6b6309..54dd75a810 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -27,7 +27,7 @@ const ( func GetChannelDescriptor() *p2p.ChannelDescriptor { return &p2p.ChannelDescriptor{ ID: EvidenceChannel, - Priority: 6, + Priority: 3, RecvMessageCapacity: maxMsgSize, RecvBufferCapacity: 32, Name: "evidence", diff --git a/internal/p2p/channel_params.go b/internal/p2p/channel_params.go index 4e3341e507..dbbfc13468 100644 --- a/internal/p2p/channel_params.go +++ b/internal/p2p/channel_params.go @@ -16,6 +16,14 @@ import ( ) const ( + // + // Consensus channels + // + ConsensusStateChannel = ChannelID(0x20) + ConsensusDataChannel = ChannelID(0x21) + ConsensusVoteChannel = ChannelID(0x22) + VoteSetBitsChannel = ChannelID(0x23) + ErrorChannel = ChannelID(0x10) // BlockSyncChannel is a channelStore for blocks and status updates BlockSyncChannel = ChannelID(0x40) @@ -40,21 +48,24 @@ const ( lightBlockMsgSize = int(1e7) // ~1MB // paramMsgSize is the maximum size of a paramsResponseMessage paramMsgSize = int(1e5) // ~100kb + // consensusMsgSize is the maximum size of a consensus message + maxMsgSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes. + ) // ChannelDescriptors returns a map of all supported descriptors func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor { - return map[ChannelID]*ChannelDescriptor{ + channels := map[ChannelID]*ChannelDescriptor{ ErrorChannel: { ID: ErrorChannel, - Priority: 6, + Priority: 7, RecvMessageCapacity: blockMaxMsgSize, RecvBufferCapacity: 32, Name: "error", }, BlockSyncChannel: { ID: BlockSyncChannel, - Priority: 5, + Priority: 6, SendQueueCapacity: 1000, RecvBufferCapacity: 1024, RecvMessageCapacity: types.MaxBlockSizeBytes + @@ -64,13 +75,28 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor { }, MempoolChannel: { ID: MempoolChannel, - Priority: 5, + Priority: 2, // 5 RecvMessageCapacity: mempoolBatchSize(cfg.Mempool.MaxTxBytes), RecvBufferCapacity: 128, Name: "mempool", - SendRateLimit: 5, - SendRateBurst: 20, + // SendRateLimit: 5, + // SendRateBurst: 20, }, + } + + for k, v := range StatesyncChannelDescriptors() { + channels[k] = v + } + for k, v := range ConsensusChannelDescriptors() { + channels[k] = v + } + + return channels +} + +// ChannelDescriptors returns a map of all supported descriptors +func StatesyncChannelDescriptors() map[ChannelID]*ChannelDescriptor { + return map[ChannelID]*ChannelDescriptor{ SnapshotChannel: { ID: SnapshotChannel, Priority: 6, @@ -81,7 +107,7 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor { }, ChunkChannel: { ID: ChunkChannel, - Priority: 3, + Priority: 4, SendQueueCapacity: 4, RecvMessageCapacity: chunkMsgSize, RecvBufferCapacity: 128, @@ -89,7 +115,7 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor { }, LightBlockChannel: { ID: LightBlockChannel, - Priority: 5, + Priority: 6, SendQueueCapacity: 10, RecvMessageCapacity: lightBlockMsgSize, RecvBufferCapacity: 128, @@ -97,7 +123,7 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor { }, ParamsChannel: { ID: ParamsChannel, - Priority: 2, + Priority: 3, SendQueueCapacity: 10, RecvMessageCapacity: paramMsgSize, RecvBufferCapacity: 128, @@ -106,6 +132,48 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor { } } +// GetChannelDescriptor produces an instance of a descriptor for this +// package's required channels. +func ConsensusChannelDescriptors() map[ChannelID]*ChannelDescriptor { + return map[ChannelID]*ChannelDescriptor{ + ConsensusStateChannel: { + ID: ConsensusStateChannel, + Priority: 18, + SendQueueCapacity: 64, + RecvMessageCapacity: maxMsgSize, + RecvBufferCapacity: 128, + Name: "state", + }, + ConsensusDataChannel: { + // TODO: Consider a split between gossiping current block and catchup + // stuff. Once we gossip the whole block there is nothing left to send + // until next height or round. + ID: ConsensusDataChannel, + Priority: 22, + SendQueueCapacity: 64, + RecvBufferCapacity: 512, + RecvMessageCapacity: maxMsgSize, + Name: "data", + }, + ConsensusVoteChannel: { + ID: ConsensusVoteChannel, + Priority: 20, + SendQueueCapacity: 64, + RecvBufferCapacity: 4096, + RecvMessageCapacity: maxMsgSize, + Name: "vote", + }, + VoteSetBitsChannel: { + ID: VoteSetBitsChannel, + Priority: 15, + SendQueueCapacity: 8, + RecvBufferCapacity: 128, + RecvMessageCapacity: maxMsgSize, + Name: "voteSet", + }, + } +} + // ResolveChannelID returns channel ID according to message type // currently only is supported blocksync channelID, the remaining channelIDs should be added as it will be necessary func ResolveChannelID(msg proto.Message) ChannelID { @@ -116,6 +184,7 @@ func ResolveChannelID(msg proto.Message) ChannelID { *blocksync.StatusRequest, *blocksync.StatusResponse: return BlockSyncChannel + // State sync case *statesync.ChunkRequest, *statesync.ChunkResponse: return ChunkChannel @@ -128,30 +197,27 @@ func ResolveChannelID(msg proto.Message) ChannelID { case *statesync.LightBlockRequest, *statesync.LightBlockResponse: return LightBlockChannel - case *consensus.NewRoundStep, - *consensus.NewValidBlock, + // Consensus messages + case *consensus.VoteSetBits: + return VoteSetBitsChannel + case *consensus.Vote, *consensus.Commit: + return ConsensusVoteChannel + case *consensus.ProposalPOL, *consensus.Proposal, - *consensus.ProposalPOL, - *consensus.BlockPart, - *consensus.Vote, + *consensus.BlockPart: + return ConsensusDataChannel + case *consensus.NewRoundStep, *consensus.NewValidBlock, + *consensus.HasCommit, *consensus.HasVote, - *consensus.VoteSetMaj23, - *consensus.VoteSetBits, - *consensus.Commit, - *consensus.HasCommit: - // TODO: enable these channels when they are implemented - //*statesync.SnapshotsRequest, - //*statesync.SnapshotsResponse, - //*statesync.ChunkRequest, - //*statesync.ChunkResponse, - //*statesync.LightBlockRequest, - //*statesync.LightBlockResponse, - //*statesync.ParamsRequest, - //*statesync.ParamsResponse: + *consensus.VoteSetMaj23: + return ConsensusStateChannel + // pex case *p2pproto.PexRequest, *p2pproto.PexResponse, *p2pproto.Echo: + // evidence case *prototypes.Evidence: + // mempool case *mempool.Txs: return MempoolChannel } diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index d1b64b329a..04bc62c01c 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -83,41 +83,7 @@ const ( ) func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor { - return map[p2p.ChannelID]*p2p.ChannelDescriptor{ - SnapshotChannel: { - ID: SnapshotChannel, - Priority: 6, - SendQueueCapacity: 10, - RecvMessageCapacity: snapshotMsgSize, - RecvBufferCapacity: 128, - Name: "snapshot", - }, - ChunkChannel: { - ID: ChunkChannel, - Priority: 3, - SendQueueCapacity: 4, - RecvMessageCapacity: chunkMsgSize, - RecvBufferCapacity: 128, - Name: "chunk", - }, - LightBlockChannel: { - ID: LightBlockChannel, - Priority: 5, - SendQueueCapacity: 10, - RecvMessageCapacity: lightBlockMsgSize, - RecvBufferCapacity: 128, - Name: "light-block", - }, - ParamsChannel: { - ID: ParamsChannel, - Priority: 2, - SendQueueCapacity: 10, - RecvMessageCapacity: paramMsgSize, - RecvBufferCapacity: 128, - Name: "params", - }, - } - + return p2p.StatesyncChannelDescriptors() } // Metricer defines an interface used for the rpc sync info query, please see statesync.metrics diff --git a/node/setup.go b/node/setup.go index f78d8f4f47..19f12513df 100644 --- a/node/setup.go +++ b/node/setup.go @@ -365,10 +365,10 @@ func makeNodeInfo( Version: version.TMCoreSemVer, Channels: []byte{ byte(p2p.BlockSyncChannel), - byte(consensus.StateChannel), - byte(consensus.DataChannel), - byte(consensus.VoteChannel), - byte(consensus.VoteSetBitsChannel), + byte(p2p.ConsensusStateChannel), + byte(p2p.ConsensusDataChannel), + byte(p2p.ConsensusVoteChannel), + byte(p2p.VoteSetBitsChannel), byte(p2p.MempoolChannel), byte(evidence.EvidenceChannel), byte(statesync.SnapshotChannel),