Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace a Few IntFlags with Uint64Flags #9959

Merged
merged 10 commits into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
MetaDataDir: cliCtx.String(cmd.P2PMetadata.Name),
TCPPort: cliCtx.Uint(cmd.P2PTCPPort.Name),
UDPPort: cliCtx.Uint(cmd.P2PUDPPort.Name),
MaxPeers: cliCtx.Uint(cmd.P2PMaxPeers.Name),
MaxPeers: cliCtx.Uint64(cmd.P2PMaxPeers.Name),
AllowListCIDR: cliCtx.String(cmd.P2PAllowList.Name),
DenyListCIDR: slice.SplitCommaSeparated(cliCtx.StringSlice(cmd.P2PDenyList.Name)),
EnableUPnP: cliCtx.Bool(cmd.EnableUPnPFlag.Name),
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Config struct {
MetaDataDir string
TCPPort uint
UDPPort uint
MaxPeers uint
MaxPeers uint64
AllowListCIDR string
DenyListCIDR []string
StateNotifier statefeed.Notifier
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/connection_gater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestService_RejectInboundPeersBeyondLimit(t *testing.T) {
ScorerParams: &scorers.Config{},
}),
host: mockp2p.NewTestP2P(t).BHost,
cfg: &Config{MaxPeers: uint(limit)},
cfg: &Config{MaxPeers: uint64(limit)},
}
var err error
s.addrFilter, err = configureFilter(&Config{})
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type PeerManager interface {
ENR() *enr.Record
DiscoveryAddresses() ([]multiaddr.Multiaddr, error)
RefreshENR()
FindPeersWithSubnet(ctx context.Context, topic string, subIndex uint64, threshold int) (bool, error)
FindPeersWithSubnet(ctx context.Context, topic string, subIndex uint64, threshold uint64) (bool, error)
AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error)
}

Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/peers/scorers/block_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func newBlockProviderScorer(store *peerdata.Store, config *BlockProviderScorerCo
if scorer.config.StalePeerRefreshInterval == 0 {
scorer.config.StalePeerRefreshInterval = DefaultBlockProviderStalePeerRefreshInterval
}
batchSize := uint64(flags.Get().BlockBatchLimit)
batchSize := flags.Get().BlockBatchLimit
scorer.maxScore = 1.0
if batchSize > 0 {
totalBatches := float64(scorer.config.ProcessedBlocksCap / batchSize)
Expand All @@ -110,7 +110,7 @@ func (s *BlockProviderScorer) score(pid peer.ID) float64 {
if !ok || time.Since(peerData.BlockProviderUpdated) >= s.config.StalePeerRefreshInterval {
return s.maxScore
}
batchSize := uint64(flags.Get().BlockBatchLimit)
batchSize := flags.Get().BlockBatchLimit
if batchSize > 0 {
processedBatches := float64(peerData.ProcessedBlocks / batchSize)
score += processedBatches * s.config.ProcessedBatchWeight
Expand Down
10 changes: 5 additions & 5 deletions beacon-chain/p2p/peers/scorers/block_providers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestScorers_BlockProvider_Score(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

batchSize := uint64(flags.Get().BlockBatchLimit)
batchSize := flags.Get().BlockBatchLimit
tests := []struct {
name string
update func(scorer *scorers.BlockProviderScorer)
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestScorers_BlockProvider_WeightSorted(t *testing.T) {
},
})
scorer := peerStatuses.Scorers().BlockProviderScorer()
batchSize := uint64(flags.Get().BlockBatchLimit)
batchSize := flags.Get().BlockBatchLimit
r := rand.NewDeterministicGenerator()

reverse := func(pids []peer.ID) []peer.ID {
Expand Down Expand Up @@ -214,7 +214,7 @@ func TestScorers_BlockProvider_WeightSorted(t *testing.T) {
}

func TestScorers_BlockProvider_Sorted(t *testing.T) {
batchSize := uint64(flags.Get().BlockBatchLimit)
batchSize := flags.Get().BlockBatchLimit
tests := []struct {
name string
update func(s *scorers.BlockProviderScorer)
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestScorers_BlockProvider_Sorted(t *testing.T) {
func TestScorers_BlockProvider_MaxScore(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
batchSize := uint64(flags.Get().BlockBatchLimit)
batchSize := flags.Get().BlockBatchLimit

tests := []struct {
name string
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestScorers_BlockProvider_MaxScore(t *testing.T) {
func TestScorers_BlockProvider_FormatScorePretty(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
batchSize := uint64(flags.Get().BlockBatchLimit)
batchSize := flags.Get().BlockBatchLimit
format := "[%0.1f%%, raw: %0.2f, blocks: %d/1280]"

tests := []struct {
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/peers/scorers/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestScorers_Service_Init(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

batchSize := uint64(flags.Get().BlockBatchLimit)
batchSize := flags.Get().BlockBatchLimit

t.Run("default config", func(t *testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestScorers_Service_Score(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

batchSize := uint64(flags.Get().BlockBatchLimit)
batchSize := flags.Get().BlockBatchLimit

peerScores := func(s *scorers.Service, pids []peer.ID) map[string]float64 {
scores := make(map[string]float64, len(pids))
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/peers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ func (p *Status) BestFinalized(maxPeers int, ourFinalizedEpoch types.Epoch) (typ

// BestNonFinalized returns the highest known epoch, higher than ours,
// and is shared by at least minPeers.
func (p *Status) BestNonFinalized(minPeers int, ourHeadEpoch types.Epoch) (types.Epoch, []peer.ID) {
func (p *Status) BestNonFinalized(minPeers uint64, ourHeadEpoch types.Epoch) (types.Epoch, []peer.ID) {
connected := p.Connected()
epochVotes := make(map[types.Epoch]uint64)
pidEpoch := make(map[peer.ID]types.Epoch, len(connected))
Expand Down
13 changes: 6 additions & 7 deletions beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ const syncLockerVal = 100
// subscribed to a particular subnet. Then we try to connect
// with those peers. This method will block until the required amount of
// peers are found, the method only exits in the event of context timeouts.
func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
index uint64, threshold int) (bool, error) {
func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string, subIndex, threshold uint64) (bool, error) {
ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet")
defer span.End()

span.AddAttributes(trace.Int64Attribute("index", int64(index)))
span.AddAttributes(trace.Int64Attribute("index", int64(subIndex)))

if s.dv5Listener == nil {
// return if discovery isn't set
Expand All @@ -49,14 +48,14 @@ func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
iterator := s.dv5Listener.RandomNodes()
switch {
case strings.Contains(topic, GossipAttestationMessage):
iterator = filterNodes(ctx, iterator, s.filterPeerForAttSubnet(index))
iterator = filterNodes(ctx, iterator, s.filterPeerForAttSubnet(subIndex))
case strings.Contains(topic, GossipSyncCommitteeMessage):
iterator = filterNodes(ctx, iterator, s.filterPeerForSyncSubnet(index))
iterator = filterNodes(ctx, iterator, s.filterPeerForSyncSubnet(subIndex))
default:
return false, errors.New("no subnet exists for provided topic")
}

currNum := len(s.pubsub.ListPeers(topic))
currNum := uint64(len(s.pubsub.ListPeers(topic)))
wg := new(sync.WaitGroup)
for {
if err := ctx.Err(); err != nil {
Expand All @@ -81,7 +80,7 @@ func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
}
// Wait for all dials to be completed.
wg.Wait()
currNum = len(s.pubsub.ListPeers(topic))
currNum = uint64(len(s.pubsub.ListPeers(topic)))
}
return true, nil
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/testing/fuzz_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (p *FakeP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
}

// FindPeersWithSubnet mocks the p2p func.
func (p *FakeP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) {
func (p *FakeP2P) FindPeersWithSubnet(_ context.Context, _ string, _, _ uint64) (bool, error) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/testing/mock_peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (m MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
func (m MockPeerManager) RefreshENR() {}

// FindPeersWithSubnet .
func (m MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) {
func (m MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _, _ uint64) (bool, error) {
return true, nil
}

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (p *TestP2P) Peers() *peers.Status {
}

// FindPeersWithSubnet mocks the p2p func.
func (p *TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) {
func (p *TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _, _ uint64) (bool, error) {
return false, nil
}

Expand Down
8 changes: 4 additions & 4 deletions beacon-chain/sync/initial-sync/blocks_fetcher_peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (f *blocksFetcher) selectFailOverPeer(excludedPID peer.ID, peers []peer.ID)

// waitForMinimumPeers spins and waits up until enough peers are available.
func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, error) {
required := params.BeaconConfig().MaxPeersToSync
required := uint64(params.BeaconConfig().MaxPeersToSync)
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
Expand All @@ -79,7 +79,7 @@ func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, err
headEpoch := slots.ToEpoch(f.chain.HeadSlot())
_, peers = f.p2p.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, headEpoch)
}
if len(peers) >= required {
if uint64(len(peers)) >= required {
return peers, nil
}
log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -123,14 +123,14 @@ func (f *blocksFetcher) filterPeers(ctx context.Context, peers []peer.ID, peersP
// trimPeers limits peer list, returning only specified percentage of peers.
// Takes system constraints into account (min/max peers to sync).
func trimPeers(peers []peer.ID, peersPercentage float64) []peer.ID {
required := params.BeaconConfig().MaxPeersToSync
required := uint64(params.BeaconConfig().MaxPeersToSync)
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
// Weak/slow peers will be pushed down the list and trimmed since only percentage of peers is selected.
limit := uint64(math.Round(float64(len(peers)) * peersPercentage))
// Limit cannot be less that minimum peers required by sync mechanism.
limit = mathutil.Max(limit, uint64(required))
limit = mathutil.Max(limit, required)
// Limit cannot be higher than number of peers available (safe-guard).
limit = mathutil.Min(limit, uint64(len(peers)))
return peers[:limit]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestBlocksFetcher_filterPeers(t *testing.T) {
capacityWeight float64
}

batchSize := uint64(flags.Get().BlockBatchLimit)
batchSize := flags.Get().BlockBatchLimit
tests := []struct {
name string
args args
Expand Down
12 changes: 6 additions & 6 deletions beacon-chain/sync/initial-sync/blocks_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) {
}

func TestBlocksFetcher_scheduleRequest(t *testing.T) {
blockBatchLimit := uint64(flags.Get().BlockBatchLimit)
blockBatchLimit := flags.Get().BlockBatchLimit
t.Run("context cancellation", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{})
Expand Down Expand Up @@ -426,7 +426,7 @@ func TestBlocksFetcher_handleRequest(t *testing.T) {
})

cancel()
response := fetcher.handleRequest(ctx, 1, uint64(blockBatchLimit))
response := fetcher.handleRequest(ctx, 1, blockBatchLimit)
assert.ErrorContains(t, "context canceled", response.err)
})

Expand All @@ -441,7 +441,7 @@ func TestBlocksFetcher_handleRequest(t *testing.T) {
requestCtx, reqCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer reqCancel()
go func() {
response := fetcher.handleRequest(requestCtx, 1 /* start */, uint64(blockBatchLimit) /* count */)
response := fetcher.handleRequest(requestCtx, 1 /* start */, blockBatchLimit /* count */)
select {
case <-ctx.Done():
case fetcher.fetchResponses <- response:
Expand All @@ -459,7 +459,7 @@ func TestBlocksFetcher_handleRequest(t *testing.T) {
blocks = resp.blocks
}
}
if uint64(len(blocks)) != uint64(blockBatchLimit) {
if uint64(len(blocks)) != blockBatchLimit {
t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchLimit, len(blocks))
}

Expand Down Expand Up @@ -510,11 +510,11 @@ func TestBlocksFetcher_requestBeaconBlocksByRange(t *testing.T) {
req := &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: 1,
Step: 1,
Count: uint64(blockBatchLimit),
Count: blockBatchLimit,
}
blocks, err := fetcher.requestBlocks(ctx, req, peerIDs[0])
assert.NoError(t, err)
assert.Equal(t, uint64(blockBatchLimit), uint64(len(blocks)), "Incorrect number of blocks returned")
assert.Equal(t, blockBatchLimit, uint64(len(blocks)), "Incorrect number of blocks returned")

// Test context cancellation.
ctx, cancel = context.WithCancel(context.Background())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestBlocksFetcher_findFork(t *testing.T) {
peers = append(peers, connectPeerHavingBlocks(t, p2p, chain1, finalizedSlot, p2p.Peers()))
}

blockBatchLimit := uint64(flags.Get().BlockBatchLimit) * 2
blockBatchLimit := flags.Get().BlockBatchLimit * 2
pidInd := 0
for i := uint64(1); i < uint64(len(chain1)); i += blockBatchLimit {
req := &p2ppb.BeaconBlocksByRangeRequest{
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/initial-sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,13 @@ func (s *Service) Resync() error {
}

func (s *Service) waitForMinimumPeers() {
required := params.BeaconConfig().MaxPeersToSync
required := uint64(params.BeaconConfig().MaxPeersToSync)
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
for {
_, peers := s.cfg.P2P.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, s.cfg.Chain.FinalizedCheckpt().Epoch)
if len(peers) >= required {
if uint64(len(peers)) >= required {
break
}
log.WithFields(logrus.Fields{
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/initial-sync/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func TestService_Constants(t *testing.T) {
if params.BeaconConfig().MaxPeersToSync*flags.Get().BlockBatchLimit > 1000 {
if uint64(params.BeaconConfig().MaxPeersToSync)*flags.Get().BlockBatchLimit > uint64(1000) {
t.Fatal("rpc rejects requests over 1000 range slots")
}
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_beacon_blocks_by_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa

// The initial count for the first batch to be returned back.
count := m.Count
allowedBlocksPerSecond := uint64(flags.Get().BlockBatchLimit)
allowedBlocksPerSecond := flags.Get().BlockBatchLimit
if count > allowedBlocksPerSecond {
count = allowedBlocksPerSecond
}
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/rpc_beacon_blocks_by_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,11 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
req := &pb.BeaconBlocksByRangeRequest{
StartSlot: 100,
Step: 1,
Count: uint64(flags.Get().BlockBatchLimit),
Count: flags.Get().BlockBatchLimit,
}
saveBlocks(req)

for i := 0; i < flags.Get().BlockBatchLimitBurstFactor; i++ {
for i := uint64(0); i < flags.Get().BlockBatchLimitBurstFactor; i++ {
assert.NoError(t, sendRequest(p1, p2, r, req, true, false))
}

Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ func (s *Service) unSubscribeFromTopic(topic string) {
// find if we have peers who are subscribed to the same subnet
func (s *Service) validPeersExist(subnetTopic string) bool {
numOfPeers := s.cfg.p2p.PubSub().ListPeers(subnetTopic + s.cfg.p2p.Encoding().ProtocolSuffix())
return len(numOfPeers) >= flags.Get().MinimumPeersPerSubnet
return uint64(len(numOfPeers)) >= flags.Get().MinimumPeersPerSubnet
}

func (s *Service) retrievePersistentSubs(currSlot types.Slot) []uint64 {
Expand Down Expand Up @@ -682,7 +682,7 @@ func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID {
for _, sub := range wantedSubs {
subnetTopic := fmt.Sprintf(topic, digest, sub) + s.cfg.p2p.Encoding().ProtocolSuffix()
peers := s.cfg.p2p.PubSub().ListPeers(subnetTopic)
if len(peers) > flags.Get().MinimumPeersPerSubnet {
if uint64(len(peers)) > flags.Get().MinimumPeersPerSubnet {
// In the event we have more than the minimum, we can
// mark the remaining as viable for pruning.
peers = peers[:flags.Get().MinimumPeersPerSubnet]
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func TestFilterSubnetPeers(t *testing.T) {
// Try with only peers from subnet 20.
wantedPeers = []peer.ID{p2.BHost.ID()}
// Connect an excess amount of peers in the particular subnet.
for i := 1; i <= flags.Get().MinimumPeersPerSubnet; i++ {
for i := uint64(1); i <= flags.Get().MinimumPeersPerSubnet; i++ {
nPeer := createPeer(t, subnet20)
p.Connect(nPeer)
wantedPeers = append(wantedPeers, nPeer.BHost.ID())
Expand Down
6 changes: 3 additions & 3 deletions cmd/beacon-chain/flags/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var (
}
// MinSyncPeers specifies the required number of successful peer handshakes in order
// to start syncing with external peers.
MinSyncPeers = &cli.IntFlag{
MinSyncPeers = &cli.Uint64Flag{
Name: "min-sync-peers",
Usage: "The required number of valid peers to connect with before syncing.",
Value: 3,
Expand Down Expand Up @@ -123,13 +123,13 @@ var (
Usage: "Does not run the discoveryV5 dht.",
}
// BlockBatchLimit specifies the requested block batch size.
BlockBatchLimit = &cli.IntFlag{
BlockBatchLimit = &cli.Uint64Flag{
Name: "block-batch-limit",
Usage: "The amount of blocks the local peer is bounded to request and respond to in a batch.",
Value: 64,
}
// BlockBatchLimitBurstFactor specifies the factor by which block batch size may increase.
BlockBatchLimitBurstFactor = &cli.IntFlag{
BlockBatchLimitBurstFactor = &cli.Uint64Flag{
Name: "block-batch-limit-burst-factor",
Usage: "The factor by which block batch limit may increase on burst.",
Value: 10,
Expand Down
Loading