Skip to content

Commit

Permalink
pstoremanager: fix race condition when removing peers from peer store (
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann authored Nov 17, 2023
1 parent 5c95834 commit 2844691
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 8 deletions.
3 changes: 1 addition & 2 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,11 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
opts.EventBus = eventbus.NewBus()
}

psManager, err := pstoremanager.NewPeerstoreManager(n.Peerstore(), opts.EventBus)
psManager, err := pstoremanager.NewPeerstoreManager(n.Peerstore(), opts.EventBus, n)
if err != nil {
return nil, err
}
hostCtx, cancel := context.WithCancel(context.Background())

h := &BasicHost{
network: n,
psManager: psManager,
Expand Down
15 changes: 12 additions & 3 deletions p2p/host/pstoremanager/pstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func WithCleanupInterval(t time.Duration) Option {
type PeerstoreManager struct {
pstore peerstore.Peerstore
eventBus event.Bus
network network.Network

cancel context.CancelFunc
refCount sync.WaitGroup
Expand All @@ -49,11 +50,12 @@ type PeerstoreManager struct {
cleanupInterval time.Duration
}

func NewPeerstoreManager(pstore peerstore.Peerstore, eventBus event.Bus, opts ...Option) (*PeerstoreManager, error) {
func NewPeerstoreManager(pstore peerstore.Peerstore, eventBus event.Bus, network network.Network, opts ...Option) (*PeerstoreManager, error) {
m := &PeerstoreManager{
pstore: pstore,
gracePeriod: time.Minute,
eventBus: eventBus,
network: network,
}
for _, opt := range opts {
if err := opt(m); err != nil {
Expand Down Expand Up @@ -107,14 +109,21 @@ func (m *PeerstoreManager) background(ctx context.Context, sub event.Subscriptio
}
case network.Connected:
// If we reconnect to the peer before we've cleared the information, keep it.
// This is an optimization to keep the disconnected map small.
// We still need to check that a peer is actually disconnected before removing it from the peer store.
delete(disconnected, p)
}
case <-ticker.C:
now := time.Now()
for p, disconnectTime := range disconnected {
if disconnectTime.Add(m.gracePeriod).Before(now) {
m.pstore.RemovePeer(p)
delete(disconnected, p)
// Check that the peer is actually not connected at this point.
// This avoids a race condition where the Connected notification
// is processed after this time has fired.
if m.network.Connectedness(p) != network.Connected {
m.pstore.RemovePeer(p)
delete(disconnected, p)
}
}
}
case <-ctx.Done():
Expand Down
7 changes: 4 additions & 3 deletions p2p/host/pstoremanager/pstoremanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/pstoremanager"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"

"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
Expand All @@ -23,7 +24,7 @@ func TestGracePeriod(t *testing.T) {
eventBus := eventbus.NewBus()
pstore := NewMockPeerstore(ctrl)
const gracePeriod = 250 * time.Millisecond
man, err := pstoremanager.NewPeerstoreManager(pstore, eventBus, pstoremanager.WithGracePeriod(gracePeriod))
man, err := pstoremanager.NewPeerstoreManager(pstore, eventBus, swarmt.GenSwarm(t), pstoremanager.WithGracePeriod(gracePeriod))
require.NoError(t, err)
defer man.Close()
man.Start()
Expand Down Expand Up @@ -51,7 +52,7 @@ func TestReconnect(t *testing.T) {
eventBus := eventbus.NewBus()
pstore := NewMockPeerstore(ctrl)
const gracePeriod = 200 * time.Millisecond
man, err := pstoremanager.NewPeerstoreManager(pstore, eventBus, pstoremanager.WithGracePeriod(gracePeriod))
man, err := pstoremanager.NewPeerstoreManager(pstore, eventBus, swarmt.GenSwarm(t), pstoremanager.WithGracePeriod(gracePeriod))
require.NoError(t, err)
defer man.Close()
man.Start()
Expand All @@ -77,7 +78,7 @@ func TestClose(t *testing.T) {
eventBus := eventbus.NewBus()
pstore := NewMockPeerstore(ctrl)
const gracePeriod = time.Hour
man, err := pstoremanager.NewPeerstoreManager(pstore, eventBus, pstoremanager.WithGracePeriod(gracePeriod))
man, err := pstoremanager.NewPeerstoreManager(pstore, eventBus, swarmt.GenSwarm(t), pstoremanager.WithGracePeriod(gracePeriod))
require.NoError(t, err)
man.Start()

Expand Down

0 comments on commit 2844691

Please sign in to comment.