diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 65ba6d55822..95218e992f6 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -648,7 +648,9 @@ func TestEvictSOC(t *testing.T) { if err != nil { t.Fatal(err) } - checkChunk(t, ts, chunks[9], false) // chunk should still persist, eg refCnt > 0 + if has, _ := ts.ChunkStore().Has(context.Background(), chunks[0].Address()); !has { + t.Fatal("same address chunk should still persist, eg refCnt > 0") + } evicted, err := r.EvictBatchBin(context.Background(), batch.ID, 10, swarm.MaxBins) if err != nil { diff --git a/pkg/topology/kademlia/internal/metrics/metrics.go b/pkg/topology/kademlia/internal/metrics/metrics.go index 23f8e902671..89fa1b1f516 100644 --- a/pkg/topology/kademlia/internal/metrics/metrics.go +++ b/pkg/topology/kademlia/internal/metrics/metrics.go @@ -33,6 +33,15 @@ const ( // operation whose execution modifies a specific metrics. type RecordOp func(*Counters) +// Bootnode will mark the peer metric as bootnode based on the bool arg. +func IsBootnode(b bool) RecordOp { + return func(cs *Counters) { + cs.Lock() + defer cs.Unlock() + cs.isBootnode = b + } +} + // PeerLogIn will first update the current last seen to the give time t and as // the second it'll set the direction of the session connection to the given // value. The force flag will force the peer re-login if he's already logged in. @@ -155,6 +164,7 @@ type Counters struct { // Bookkeeping. isLoggedIn bool peerAddress swarm.Address + isBootnode bool // Counters. lastSeenTimestamp int64 @@ -308,6 +318,13 @@ func (c *Collector) IsUnreachable(addr swarm.Address) bool { // ExcludeOp is a function type used to filter peers on certain fields. type ExcludeOp func(*Counters) bool +// IsBootnode is used to filter bootnode peers. +func Bootnode() ExcludeOp { + return func(cs *Counters) bool { + return cs.isBootnode + } +} + // Reachable is used to filter reachable or unreachable peers based on r. func Reachability(filterReachable bool) ExcludeOp { return func(cs *Counters) bool { diff --git a/pkg/topology/kademlia/internal/metrics/metrics_test.go b/pkg/topology/kademlia/internal/metrics/metrics_test.go index 5074f02acb6..5c806b1d737 100644 --- a/pkg/topology/kademlia/internal/metrics/metrics_test.go +++ b/pkg/topology/kademlia/internal/metrics/metrics_test.go @@ -193,3 +193,39 @@ func TestPeerMetricsCollector(t *testing.T) { t.Fatalf("unexpected snapshot difference:\n%s", diff) } } + +func TestExclude(t *testing.T) { + t.Parallel() + + db, err := shed.NewDB("", nil) + if err != nil { + t.Fatal(err) + } + testutil.CleanupCloser(t, db) + + mc, err := metrics.NewCollector(db) + if err != nil { + t.Fatal(err) + } + + var addr = swarm.RandAddress(t) + + // record unhealthy, unreachable, bootnode + mc.Record(addr, metrics.PeerHealth(false), metrics.IsBootnode(true), metrics.PeerReachability(p2p.ReachabilityStatusPrivate)) + + if have, want := mc.Exclude(addr), false; have != want { + t.Fatal("should not exclude any") + } + + if have, want := mc.Exclude(addr, metrics.Bootnode()), true; have != want { + t.Fatal("should exclude bootnodes") + } + + if have, want := mc.Exclude(addr, metrics.Reachability(false)), true; have != want { + t.Fatal("should exclude unreachble") + } + + if have, want := mc.Exclude(addr, metrics.Health(false)), true; have != want { + t.Fatal("should exclude unhealthy") + } +} diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 9462372093f..5f23b4ec2f5 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -843,7 +843,7 @@ func (k *Kad) connectBootNodes(ctx context.Context) { } k.metrics.TotalOutboundConnections.Inc() - k.collector.Record(bzzAddress.Overlay, im.PeerLogIn(time.Now(), im.PeerConnectionDirectionOutbound)) + k.collector.Record(bzzAddress.Overlay, im.PeerLogIn(time.Now(), im.PeerConnectionDirectionOutbound), im.IsBootnode(true)) loggerV1.Debug("connected to bootnode", "bootnode_address", addr) connected++ @@ -1061,8 +1061,7 @@ outer: addrs = append(addrs, connectedPeer) if !fullnode { - // we continue here so we dont gossip - // about lightnodes to others. + // dont gossip about lightnodes to others. continue } // if kademlia is closing, dont enqueue anymore broadcast requests @@ -1314,9 +1313,9 @@ func (k *Kad) ClosestPeer(addr swarm.Address, includeSelf bool, filter topology. // EachConnectedPeer implements topology.PeerIterator interface. func (k *Kad) EachConnectedPeer(f topology.EachPeerFunc, filter topology.Select) error { - filters := excludeOps(filter) + filters := excludeFromIterator(filter) return k.connectedPeers.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) { - if len(filters) > 0 && k.opt.ExcludeFunc(filters...)(addr) { + if k.opt.ExcludeFunc(filters...)(addr) { return false, false, nil } return f(addr, po) @@ -1325,9 +1324,9 @@ func (k *Kad) EachConnectedPeer(f topology.EachPeerFunc, filter topology.Select) // EachConnectedPeerRev implements topology.PeerIterator interface. func (k *Kad) EachConnectedPeerRev(f topology.EachPeerFunc, filter topology.Select) error { - filters := excludeOps(filter) + filters := excludeFromIterator(filter) return k.connectedPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) { - if len(filters) > 0 && k.opt.ExcludeFunc(filters...)(addr) { + if k.opt.ExcludeFunc(filters...)(addr) { return false, false, nil } return f(addr, po) @@ -1391,9 +1390,11 @@ func (k *Kad) SubscribeTopologyChange() (c <-chan struct{}, unsubscribe func()) return channel, unsubscribe } -func excludeOps(filter topology.Select) []im.ExcludeOp { +func excludeFromIterator(filter topology.Select) []im.ExcludeOp { - ops := make([]im.ExcludeOp, 0, 2) + ops := make([]im.ExcludeOp, 0, 3) + + ops = append(ops, im.Bootnode()) if filter.Reachable { ops = append(ops, im.Reachability(false)) diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index 9a0079220a2..674b2c77d6b 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -1200,13 +1200,17 @@ func TestStart(t *testing.T) { t.Parallel() var bootnodes []ma.Multiaddr + var bootnodesOverlays []swarm.Address for i := 0; i < 10; i++ { - multiaddr, err := ma.NewMultiaddr(underlayBase + swarm.RandAddress(t).String()) + overlay := swarm.RandAddress(t) + + multiaddr, err := ma.NewMultiaddr(underlayBase + overlay.String()) if err != nil { t.Fatal(err) } bootnodes = append(bootnodes, multiaddr) + bootnodesOverlays = append(bootnodesOverlays, overlay) } t.Run("non-empty addressbook", func(t *testing.T) { @@ -1253,6 +1257,19 @@ func TestStart(t *testing.T) { waitCounter(t, &conns, 3) waitCounter(t, &failedConns, 0) + + err := kad.EachConnectedPeer(func(addr swarm.Address, bin uint8) (stop bool, jumpToNext bool, err error) { + for _, b := range bootnodesOverlays { + if b.Equal(addr) { + return false, false, errors.New("did not expect bootnode address from the iterator") + } + } + return false, false, nil + + }, topology.Select{}) + if err != nil { + t.Fatal(err) + } }) }