diff --git a/pkg/topology/kademlia/internal/metrics/metrics.go b/pkg/topology/kademlia/internal/metrics/metrics.go index 08096c0162d..5260d5f58b3 100644 --- a/pkg/topology/kademlia/internal/metrics/metrics.go +++ b/pkg/topology/kademlia/internal/metrics/metrics.go @@ -7,6 +7,7 @@ package metrics import ( + "context" "fmt" "sync" "time" @@ -341,15 +342,19 @@ func (c *Collector) Flush(addresses ...swarm.Address) error { return mErr } -// Finalize logs out all ongoing peer sessions -// and flushes all in-memory metrics counters. -func (c *Collector) Finalize(t time.Time) error { +// Finalize tries to logs out all ongoing peer sessions. +func (c *Collector) Finalize(ctx context.Context, t time.Time) error { var ( mErr error batch = new(leveldb.Batch) ) c.counters.Range(func(_, val interface{}) bool { + select { + case <-ctx.Done(): + return false + default: + } cs := val.(*Counters) PeerLogOut(t)(cs) if err := cs.flush(c.db, batch); err != nil { diff --git a/pkg/topology/kademlia/internal/metrics/metrics_test.go b/pkg/topology/kademlia/internal/metrics/metrics_test.go index 6c8270be624..80ca5d28891 100644 --- a/pkg/topology/kademlia/internal/metrics/metrics_test.go +++ b/pkg/topology/kademlia/internal/metrics/metrics_test.go @@ -5,6 +5,7 @@ package metrics_test import ( + "context" "testing" "time" @@ -125,7 +126,7 @@ func TestPeerMetricsCollector(t *testing.T) { // Finalize. mc.Record(addr, metrics.PeerLogIn(t1, metrics.PeerConnectionDirectionInbound)) - if err := mc.Finalize(t3); err != nil { + if err := mc.Finalize(context.Background(), t3); err != nil { t.Fatalf("Finalize(%s): unexpected error: %v", t3, err) } if have, want := len(mc.Snapshot(t2, addr)), 0; have != want { diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 4cf23a0308f..db1fc783a47 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -1336,9 +1336,11 @@ func (k *Kad) Close() error { case <-time.After(5 * time.Second): k.logger.Warning("kademlia manage loop did not shut down properly") } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() k.logger.Info("kademlia persisting peer metrics") - if err := k.collector.Finalize(time.Now()); err != nil { + if err := k.collector.Finalize(ctx, time.Now()); err != nil { k.logger.Debugf("kademlia: unable to finalize open sessions: %v", err) }