Skip to content

Commit

Permalink
refactor(p2p): move chan descs to p2p.channel_params and tune priorities
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Jan 12, 2024
1 parent cdde233 commit f356737
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 134 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ require (
go.opentelemetry.io/otel v1.8.0 // indirect
go.opentelemetry.io/otel/trace v1.8.0 // indirect
go.tmz.dev/musttag v0.7.2 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect
)

Expand Down Expand Up @@ -309,4 +308,5 @@ require (
github.com/tendermint/go-amino v0.16.0
github.com/tyler-smith/go-bip39 v1.1.0
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa
golang.org/x/time v0.5.0
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1208,8 +1208,8 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down
65 changes: 9 additions & 56 deletions internal/consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,54 +30,7 @@ var (
_ p2p.Wrapper = (*tmcons.Message)(nil)
)

// GetChannelDescriptor produces an instance of a descriptor for this
// package's required channels.
func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
return map[p2p.ChannelID]*p2p.ChannelDescriptor{
StateChannel: {
ID: StateChannel,
Priority: 8,
SendQueueCapacity: 64,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 128,
Name: "state",
},
DataChannel: {
// TODO: Consider a split between gossiping current block and catchup
// stuff. Once we gossip the whole block there is nothing left to send
// until next height or round.
ID: DataChannel,
Priority: 12,
SendQueueCapacity: 64,
RecvBufferCapacity: 512,
RecvMessageCapacity: maxMsgSize,
Name: "data",
},
VoteChannel: {
ID: VoteChannel,
Priority: 10,
SendQueueCapacity: 64,
RecvBufferCapacity: 4096,
RecvMessageCapacity: maxMsgSize,
Name: "vote",
},
VoteSetBitsChannel: {
ID: VoteSetBitsChannel,
Priority: 5,
SendQueueCapacity: 8,
RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize,
Name: "voteSet",
},
}
}

const (
StateChannel = p2p.ChannelID(0x20)
DataChannel = p2p.ChannelID(0x21)
VoteChannel = p2p.ChannelID(0x22)
VoteSetBitsChannel = p2p.ChannelID(0x23)

maxMsgSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes.

blocksToContributeToBecomeGoodPeer = 10000
Expand Down Expand Up @@ -173,23 +126,23 @@ func (r *Reactor) OnStart(ctx context.Context) error {
var chBundle channelBundle
var err error

chans := getChannelDescriptors()
chBundle.state, err = r.chCreator(ctx, chans[StateChannel])
chans := p2p.ConsensusChannelDescriptors()
chBundle.state, err = r.chCreator(ctx, chans[p2p.ConsensusStateChannel])
if err != nil {
return err
}

chBundle.data, err = r.chCreator(ctx, chans[DataChannel])
chBundle.data, err = r.chCreator(ctx, chans[p2p.ConsensusDataChannel])
if err != nil {
return err
}

chBundle.vote, err = r.chCreator(ctx, chans[VoteChannel])
chBundle.vote, err = r.chCreator(ctx, chans[p2p.ConsensusVoteChannel])
if err != nil {
return err
}

chBundle.voteSet, err = r.chCreator(ctx, chans[VoteSetBitsChannel])
chBundle.voteSet, err = r.chCreator(ctx, chans[p2p.VoteSetBitsChannel])
if err != nil {
return err
}
Expand Down Expand Up @@ -780,13 +733,13 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, cha
}

switch envelope.ChannelID {
case StateChannel:
case p2p.ConsensusStateChannel:
err = r.handleStateMessage(ctx, envelope, msg, chans.voteSet)
case DataChannel:
case p2p.ConsensusDataChannel:
err = r.handleDataMessage(ctx, envelope, msg)
case VoteChannel:
case p2p.ConsensusVoteChannel:
err = r.handleVoteMessage(ctx, envelope, msg)
case VoteSetBitsChannel:
case p2p.VoteSetBitsChannel:
err = r.handleVoteSetBitsMessage(ctx, envelope, msg)
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope)
Expand Down
16 changes: 8 additions & 8 deletions internal/consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,24 @@ func setup(
blocksyncSubs: make(map[types.NodeID]eventbus.Subscription, numNodes),
}

rts.stateChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(StateChannel, size))
rts.dataChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(DataChannel, size))
rts.voteChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(VoteChannel, size))
rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(VoteSetBitsChannel, size))
rts.stateChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(p2p.ConsensusStateChannel, size))
rts.dataChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(p2p.ConsensusDataChannel, size))
rts.voteChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(p2p.ConsensusVoteChannel, size))
rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(p2p.VoteSetBitsChannel, size))

ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)

chCreator := func(nodeID types.NodeID) p2p.ChannelCreator {
return func(ctx context.Context, desc *p2p.ChannelDescriptor) (p2p.Channel, error) {
switch desc.ID {
case StateChannel:
case p2p.ConsensusStateChannel:
return rts.stateChannels[nodeID], nil
case DataChannel:
case p2p.ConsensusDataChannel:
return rts.dataChannels[nodeID], nil
case VoteChannel:
case p2p.ConsensusVoteChannel:
return rts.voteChannels[nodeID], nil
case VoteSetBitsChannel:
case p2p.VoteSetBitsChannel:
return rts.voteSetBitsChannels[nodeID], nil
default:
return nil, fmt.Errorf("invalid channel; %v", desc.ID)
Expand Down
2 changes: 1 addition & 1 deletion internal/evidence/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
func GetChannelDescriptor() *p2p.ChannelDescriptor {
return &p2p.ChannelDescriptor{
ID: EvidenceChannel,
Priority: 6,
Priority: 3,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 32,
Name: "evidence",
Expand Down
120 changes: 93 additions & 27 deletions internal/p2p/channel_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ import (
)

const (
//
// Consensus channels
//
ConsensusStateChannel = ChannelID(0x20)
ConsensusDataChannel = ChannelID(0x21)
ConsensusVoteChannel = ChannelID(0x22)
VoteSetBitsChannel = ChannelID(0x23)

ErrorChannel = ChannelID(0x10)
// BlockSyncChannel is a channelStore for blocks and status updates
BlockSyncChannel = ChannelID(0x40)
Expand All @@ -40,21 +48,24 @@ const (
lightBlockMsgSize = int(1e7) // ~1MB
// paramMsgSize is the maximum size of a paramsResponseMessage
paramMsgSize = int(1e5) // ~100kb
// consensusMsgSize is the maximum size of a consensus message
maxMsgSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes.

)

// ChannelDescriptors returns a map of all supported descriptors
func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor {
return map[ChannelID]*ChannelDescriptor{
channels := map[ChannelID]*ChannelDescriptor{
ErrorChannel: {
ID: ErrorChannel,
Priority: 6,
Priority: 7,
RecvMessageCapacity: blockMaxMsgSize,
RecvBufferCapacity: 32,
Name: "error",
},
BlockSyncChannel: {
ID: BlockSyncChannel,
Priority: 5,
Priority: 6,
SendQueueCapacity: 1000,
RecvBufferCapacity: 1024,
RecvMessageCapacity: types.MaxBlockSizeBytes +
Expand All @@ -64,13 +75,28 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor {
},
MempoolChannel: {
ID: MempoolChannel,
Priority: 5,
Priority: 2, // 5
RecvMessageCapacity: mempoolBatchSize(cfg.Mempool.MaxTxBytes),
RecvBufferCapacity: 128,
Name: "mempool",
SendRateLimit: 5,
SendRateBurst: 20,
// SendRateLimit: 5,
// SendRateBurst: 20,
},
}

for k, v := range StatesyncChannelDescriptors() {
channels[k] = v
}
for k, v := range ConsensusChannelDescriptors() {
channels[k] = v
}

return channels
}

// ChannelDescriptors returns a map of all supported descriptors
func StatesyncChannelDescriptors() map[ChannelID]*ChannelDescriptor {
return map[ChannelID]*ChannelDescriptor{
SnapshotChannel: {
ID: SnapshotChannel,
Priority: 6,
Expand All @@ -81,23 +107,23 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor {
},
ChunkChannel: {
ID: ChunkChannel,
Priority: 3,
Priority: 4,
SendQueueCapacity: 4,
RecvMessageCapacity: chunkMsgSize,
RecvBufferCapacity: 128,
Name: "chunk",
},
LightBlockChannel: {
ID: LightBlockChannel,
Priority: 5,
Priority: 6,
SendQueueCapacity: 10,
RecvMessageCapacity: lightBlockMsgSize,
RecvBufferCapacity: 128,
Name: "light-block",
},
ParamsChannel: {
ID: ParamsChannel,
Priority: 2,
Priority: 3,
SendQueueCapacity: 10,
RecvMessageCapacity: paramMsgSize,
RecvBufferCapacity: 128,
Expand All @@ -106,6 +132,48 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor {
}
}

// GetChannelDescriptor produces an instance of a descriptor for this
// package's required channels.
func ConsensusChannelDescriptors() map[ChannelID]*ChannelDescriptor {
return map[ChannelID]*ChannelDescriptor{
ConsensusStateChannel: {
ID: ConsensusStateChannel,
Priority: 18,
SendQueueCapacity: 64,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 128,
Name: "state",
},
ConsensusDataChannel: {
// TODO: Consider a split between gossiping current block and catchup
// stuff. Once we gossip the whole block there is nothing left to send
// until next height or round.
ID: ConsensusDataChannel,
Priority: 22,
SendQueueCapacity: 64,
RecvBufferCapacity: 512,
RecvMessageCapacity: maxMsgSize,
Name: "data",
},
ConsensusVoteChannel: {
ID: ConsensusVoteChannel,
Priority: 20,
SendQueueCapacity: 64,
RecvBufferCapacity: 4096,
RecvMessageCapacity: maxMsgSize,
Name: "vote",
},
VoteSetBitsChannel: {
ID: VoteSetBitsChannel,
Priority: 15,
SendQueueCapacity: 8,
RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize,
Name: "voteSet",
},
}
}

// ResolveChannelID returns channel ID according to message type
// currently only is supported blocksync channelID, the remaining channelIDs should be added as it will be necessary
func ResolveChannelID(msg proto.Message) ChannelID {
Expand All @@ -116,6 +184,7 @@ func ResolveChannelID(msg proto.Message) ChannelID {
*blocksync.StatusRequest,
*blocksync.StatusResponse:
return BlockSyncChannel
// State sync
case *statesync.ChunkRequest,
*statesync.ChunkResponse:
return ChunkChannel
Expand All @@ -128,30 +197,27 @@ func ResolveChannelID(msg proto.Message) ChannelID {
case *statesync.LightBlockRequest,
*statesync.LightBlockResponse:
return LightBlockChannel
case *consensus.NewRoundStep,
*consensus.NewValidBlock,
// Consensus messages
case *consensus.VoteSetBits:
return VoteSetBitsChannel
case *consensus.Vote, *consensus.Commit:
return ConsensusVoteChannel
case *consensus.ProposalPOL,
*consensus.Proposal,
*consensus.ProposalPOL,
*consensus.BlockPart,
*consensus.Vote,
*consensus.BlockPart:
return ConsensusDataChannel
case *consensus.NewRoundStep, *consensus.NewValidBlock,
*consensus.HasCommit,
*consensus.HasVote,
*consensus.VoteSetMaj23,
*consensus.VoteSetBits,
*consensus.Commit,
*consensus.HasCommit:
// TODO: enable these channels when they are implemented
//*statesync.SnapshotsRequest,
//*statesync.SnapshotsResponse,
//*statesync.ChunkRequest,
//*statesync.ChunkResponse,
//*statesync.LightBlockRequest,
//*statesync.LightBlockResponse,
//*statesync.ParamsRequest,
//*statesync.ParamsResponse:
*consensus.VoteSetMaj23:
return ConsensusStateChannel
// pex
case *p2pproto.PexRequest,
*p2pproto.PexResponse,
*p2pproto.Echo:
// evidence
case *prototypes.Evidence:
// mempool
case *mempool.Txs:
return MempoolChannel
}
Expand Down
Loading

0 comments on commit f356737

Please sign in to comment.