diff --git a/core/core.go b/core/core.go index c35d3e4457f..8e50b5369fc 100644 --- a/core/core.go +++ b/core/core.go @@ -49,13 +49,13 @@ import ( "github.com/ipfs/boxo/namesys" ipnsrp "github.com/ipfs/boxo/namesys/republisher" + "github.com/ipfs/boxo/peering" "github.com/ipfs/kubo/config" "github.com/ipfs/kubo/core/bootstrap" "github.com/ipfs/kubo/core/node" "github.com/ipfs/kubo/core/node/libp2p" "github.com/ipfs/kubo/fuse/mount" "github.com/ipfs/kubo/p2p" - "github.com/ipfs/kubo/peering" "github.com/ipfs/kubo/repo" irouting "github.com/ipfs/kubo/routing" ) diff --git a/core/node/peering.go b/core/node/peering.go index eba165ef984..8af2d5ecab9 100644 --- a/core/node/peering.go +++ b/core/node/peering.go @@ -3,7 +3,7 @@ package node import ( "context" - "github.com/ipfs/kubo/peering" + "github.com/ipfs/boxo/peering" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/fx" diff --git a/go.mod b/go.mod index 3dc7f6827e9..1d849395c65 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/fsnotify/fsnotify v1.6.0 github.com/google/uuid v1.3.0 github.com/hashicorp/go-multierror v1.1.1 - github.com/ipfs/boxo v0.13.1 + github.com/ipfs/boxo v0.13.2-0.20230928204640-dfa1b9b6ce14 github.com/ipfs/go-block-format v0.1.2 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-cidutil v0.1.0 diff --git a/go.sum b/go.sum index 9e06cc307d0..bcad9634aee 100644 --- a/go.sum +++ b/go.sum @@ -335,8 +335,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.13.1 h1:nQ5oQzcMZR3oL41REJDcTbrvDvuZh3J9ckc9+ILeRQI= -github.com/ipfs/boxo v0.13.1/go.mod h1:btrtHy0lmO1ODMECbbEY1pxNtrLilvKSYLoGQt1yYCk= +github.com/ipfs/boxo v0.13.2-0.20230928204640-dfa1b9b6ce14 h1:2h8DfEBYBcV+1awRuJzRkO7c++66VfL3nwqzVYLkT8g= +github.com/ipfs/boxo v0.13.2-0.20230928204640-dfa1b9b6ce14/go.mod h1:pu8HsZvuyYeYJsqtLDCoYSvy8rHj6vI3dlh8P0f83Zs= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ= diff --git a/peering/peering.go b/peering/peering.go deleted file mode 100644 index 34647d63c13..00000000000 --- a/peering/peering.go +++ /dev/null @@ -1,325 +0,0 @@ -package peering - -import ( - "context" - "errors" - "math/rand" - "strconv" - "sync" - "time" - - "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" -) - -// Seed the random number generator. -// -// We don't need good randomness, but we do need randomness. -const ( - // maxBackoff is the maximum time between reconnect attempts. - maxBackoff = 10 * time.Minute - // The backoff will be cut off when we get within 10% of the actual max. - // If we go over the max, we'll adjust the delay down to a random value - // between 90-100% of the max backoff. - maxBackoffJitter = 10 // % - connmgrTag = "ipfs-peering" - // This needs to be sufficient to prevent two sides from simultaneously - // dialing. - initialDelay = 5 * time.Second -) - -var logger = log.Logger("peering") - -type State uint - -func (s State) String() string { - switch s { - case StateInit: - return "init" - case StateRunning: - return "running" - case StateStopped: - return "stopped" - default: - return "unknown peering state: " + strconv.FormatUint(uint64(s), 10) - } -} - -const ( - StateInit State = iota - StateRunning - StateStopped -) - -// peerHandler keeps track of all state related to a specific "peering" peer. -type peerHandler struct { - peer peer.ID - host host.Host - ctx context.Context - cancel context.CancelFunc - - mu sync.Mutex - addrs []multiaddr.Multiaddr - reconnectTimer *time.Timer - - nextDelay time.Duration -} - -// setAddrs sets the addresses for this peer. -func (ph *peerHandler) setAddrs(addrs []multiaddr.Multiaddr) { - // Not strictly necessary, but it helps to not trust the calling code. - addrCopy := make([]multiaddr.Multiaddr, len(addrs)) - copy(addrCopy, addrs) - - ph.mu.Lock() - defer ph.mu.Unlock() - ph.addrs = addrCopy -} - -// getAddrs returns a shared slice of addresses for this peer. Do not modify. -func (ph *peerHandler) getAddrs() []multiaddr.Multiaddr { - ph.mu.Lock() - defer ph.mu.Unlock() - return ph.addrs -} - -// stop permanently stops the peer handler. -func (ph *peerHandler) stop() { - ph.cancel() - - ph.mu.Lock() - defer ph.mu.Unlock() - if ph.reconnectTimer != nil { - ph.reconnectTimer.Stop() - ph.reconnectTimer = nil - } -} - -func (ph *peerHandler) nextBackoff() time.Duration { - if ph.nextDelay < maxBackoff { - ph.nextDelay += ph.nextDelay/2 + time.Duration(rand.Int63n(int64(ph.nextDelay))) - } - - // If we've gone over the max backoff, reduce it under the max. - if ph.nextDelay > maxBackoff { - ph.nextDelay = maxBackoff - // randomize the backoff a bit (10%). - ph.nextDelay -= time.Duration(rand.Int63n(int64(maxBackoff) * maxBackoffJitter / 100)) - } - - return ph.nextDelay -} - -func (ph *peerHandler) reconnect() { - // Try connecting - addrs := ph.getAddrs() - logger.Debugw("reconnecting", "peer", ph.peer, "addrs", addrs) - - err := ph.host.Connect(ph.ctx, peer.AddrInfo{ID: ph.peer, Addrs: addrs}) - if err != nil { - logger.Debugw("failed to reconnect", "peer", ph.peer, "error", err) - // Ok, we failed. Extend the timeout. - ph.mu.Lock() - if ph.reconnectTimer != nil { - // Only counts if the reconnectTimer still exists. If not, a - // connection _was_ somehow established. - ph.reconnectTimer.Reset(ph.nextBackoff()) - } - // Otherwise, someone else has stopped us so we can assume that - // we're either connected or someone else will start us. - ph.mu.Unlock() - } - - // Always call this. We could have connected since we processed the - // error. - ph.stopIfConnected() -} - -func (ph *peerHandler) stopIfConnected() { - ph.mu.Lock() - defer ph.mu.Unlock() - - if ph.reconnectTimer != nil && ph.host.Network().Connectedness(ph.peer) == network.Connected { - logger.Debugw("successfully reconnected", "peer", ph.peer) - ph.reconnectTimer.Stop() - ph.reconnectTimer = nil - ph.nextDelay = initialDelay - } -} - -// startIfDisconnected is the inverse of stopIfConnected. -func (ph *peerHandler) startIfDisconnected() { - ph.mu.Lock() - defer ph.mu.Unlock() - - if ph.reconnectTimer == nil && ph.host.Network().Connectedness(ph.peer) != network.Connected { - logger.Debugw("disconnected from peer", "peer", ph.peer) - // Always start with a short timeout so we can stagger things a bit. - ph.reconnectTimer = time.AfterFunc(ph.nextBackoff(), ph.reconnect) - } -} - -// PeeringService maintains connections to specified peers, reconnecting on -// disconnect with a back-off. -type PeeringService struct { - host host.Host - - mu sync.RWMutex - peers map[peer.ID]*peerHandler - state State -} - -// NewPeeringService constructs a new peering service. Peers can be added and -// removed immediately, but connections won't be formed until `Start` is called. -func NewPeeringService(host host.Host) *PeeringService { - return &PeeringService{host: host, peers: make(map[peer.ID]*peerHandler)} -} - -// Start starts the peering service, connecting and maintaining connections to -// all registered peers. It returns an error if the service has already been -// stopped. -func (ps *PeeringService) Start() error { - ps.mu.Lock() - defer ps.mu.Unlock() - - switch ps.state { - case StateInit: - logger.Infow("starting") - case StateRunning: - return nil - case StateStopped: - return errors.New("already stopped") - } - ps.host.Network().Notify((*netNotifee)(ps)) - ps.state = StateRunning - for _, handler := range ps.peers { - go handler.startIfDisconnected() - } - return nil -} - -// GetState get the State of the PeeringService. -func (ps *PeeringService) GetState() State { - ps.mu.RLock() - defer ps.mu.RUnlock() - return ps.state -} - -// Stop stops the peering service. -func (ps *PeeringService) Stop() error { - ps.host.Network().StopNotify((*netNotifee)(ps)) - ps.mu.Lock() - defer ps.mu.Unlock() - - switch ps.state { - case StateInit, StateRunning: - logger.Infow("stopping") - for _, handler := range ps.peers { - handler.stop() - } - ps.state = StateStopped - } - return nil -} - -// AddPeer adds a peer to the peering service. This function may be safely -// called at any time: before the service is started, while running, or after it -// stops. -// -// Add peer may also be called multiple times for the same peer. The new -// addresses will replace the old. -func (ps *PeeringService) AddPeer(info peer.AddrInfo) { - ps.mu.Lock() - defer ps.mu.Unlock() - - if handler, ok := ps.peers[info.ID]; ok { - logger.Infow("updating addresses", "peer", info.ID, "addrs", info.Addrs) - handler.setAddrs(info.Addrs) - } else { - logger.Infow("peer added", "peer", info.ID, "addrs", info.Addrs) - ps.host.ConnManager().Protect(info.ID, connmgrTag) - - handler = &peerHandler{ - host: ps.host, - peer: info.ID, - addrs: info.Addrs, - nextDelay: initialDelay, - } - handler.ctx, handler.cancel = context.WithCancel(context.Background()) - ps.peers[info.ID] = handler - switch ps.state { - case StateRunning: - go handler.startIfDisconnected() - case StateStopped: - // We still construct everything in this state because - // it's easier to reason about. But we should still free - // resources. - handler.cancel() - } - } -} - -// ListPeers lists peers in the peering service. -func (ps *PeeringService) ListPeers() []peer.AddrInfo { - ps.mu.RLock() - defer ps.mu.RUnlock() - - out := make([]peer.AddrInfo, 0, len(ps.peers)) - for id, addrs := range ps.peers { - ai := peer.AddrInfo{ID: id} - ai.Addrs = append(ai.Addrs, addrs.addrs...) - out = append(out, ai) - } - return out -} - -// RemovePeer removes a peer from the peering service. This function may be -// safely called at any time: before the service is started, while running, or -// after it stops. -func (ps *PeeringService) RemovePeer(id peer.ID) { - ps.mu.Lock() - defer ps.mu.Unlock() - - if handler, ok := ps.peers[id]; ok { - logger.Infow("peer removed", "peer", id) - ps.host.ConnManager().Unprotect(id, connmgrTag) - - handler.stop() - delete(ps.peers, id) - } -} - -type netNotifee PeeringService - -func (nn *netNotifee) Connected(_ network.Network, c network.Conn) { - ps := (*PeeringService)(nn) - - p := c.RemotePeer() - ps.mu.RLock() - defer ps.mu.RUnlock() - - if handler, ok := ps.peers[p]; ok { - // use a goroutine to avoid blocking events. - go handler.stopIfConnected() - } -} - -func (nn *netNotifee) Disconnected(_ network.Network, c network.Conn) { - ps := (*PeeringService)(nn) - - p := c.RemotePeer() - ps.mu.RLock() - defer ps.mu.RUnlock() - - if handler, ok := ps.peers[p]; ok { - // use a goroutine to avoid blocking events. - go handler.startIfDisconnected() - } -} -func (nn *netNotifee) OpenedStream(network.Network, network.Stream) {} -func (nn *netNotifee) ClosedStream(network.Network, network.Stream) {} -func (nn *netNotifee) Listen(network.Network, multiaddr.Multiaddr) {} -func (nn *netNotifee) ListenClose(network.Network, multiaddr.Multiaddr) {} diff --git a/peering/peering_test.go b/peering/peering_test.go deleted file mode 100644 index de07789c216..00000000000 --- a/peering/peering_test.go +++ /dev/null @@ -1,172 +0,0 @@ -package peering - -import ( - "context" - "testing" - "time" - - "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/p2p/net/connmgr" - - "github.com/stretchr/testify/require" -) - -func newNode(t *testing.T) host.Host { - cm, err := connmgr.NewConnManager(1, 100, connmgr.WithGracePeriod(0)) - require.NoError(t, err) - h, err := libp2p.New( - libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), - // We'd like to set the connection manager low water to 0, but - // that would disable the connection manager. - libp2p.ConnectionManager(cm), - ) - require.NoError(t, err) - return h -} - -func TestPeeringService(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - h1 := newNode(t) - ps1 := NewPeeringService(h1) - - h2 := newNode(t) - h3 := newNode(t) - h4 := newNode(t) - - // peer 1 -> 2 - ps1.AddPeer(peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) - require.Contains(t, ps1.ListPeers(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) - - // We haven't started so we shouldn't have any peers. - require.Never(t, func() bool { - return len(h1.Network().Peers()) > 0 - }, 100*time.Millisecond, 1*time.Second, "expected host 1 to have no peers") - - // Use p4 to take up the one slot we have in the connection manager. - for _, h := range []host.Host{h1, h2} { - require.NoError(t, h.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()})) - h.ConnManager().TagPeer(h4.ID(), "sticky-peer", 1000) - } - - // Now start. - require.NoError(t, ps1.Start()) - // starting twice is fine. - require.NoError(t, ps1.Start()) - - // We should eventually connect. - t.Logf("waiting for h1 to connect to h2") - require.Eventually(t, func() bool { - return h1.Network().Connectedness(h2.ID()) == network.Connected - }, 30*time.Second, 10*time.Millisecond) - - // Now explicitly connect to h3. - t.Logf("waiting for h1's connection to h3 to work") - require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()})) - require.Eventually(t, func() bool { - return h1.Network().Connectedness(h3.ID()) == network.Connected - }, 30*time.Second, 100*time.Millisecond) - - require.Len(t, h1.Network().Peers(), 3) - - // force a disconnect - h1.ConnManager().TrimOpenConns(ctx) - - // Should disconnect from h3. - t.Logf("waiting for h1's connection to h3 to disconnect") - require.Eventually(t, func() bool { - return h1.Network().Connectedness(h3.ID()) != network.Connected - }, 5*time.Second, 10*time.Millisecond) - - // Should remain connected to p2 - require.Never(t, func() bool { - return h1.Network().Connectedness(h2.ID()) != network.Connected - }, 5*time.Second, 1*time.Second) - - // Now force h2 to disconnect (we have an asymmetric peering). - conns := h2.Network().ConnsToPeer(h1.ID()) - require.NotEmpty(t, conns) - h2.ConnManager().TrimOpenConns(ctx) - - // All conns to peer should eventually close. - t.Logf("waiting for all connections to close") - for _, c := range conns { - require.Eventually(t, func() bool { - s, err := c.NewStream(context.Background()) - if s != nil { - _ = s.Reset() - } - return err != nil - }, 5*time.Second, 10*time.Millisecond) - } - - // Should eventually re-connect. - require.Eventually(t, func() bool { - return h1.Network().Connectedness(h2.ID()) == network.Connected - }, 30*time.Second, 1*time.Second) - - // Unprotect 2 from 1. - ps1.RemovePeer(h2.ID()) - require.NotContains(t, ps1.ListPeers(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) - - // Trim connections. - h1.ConnManager().TrimOpenConns(ctx) - - // Should disconnect - t.Logf("waiting for h1 to disconnect from h2") - require.Eventually(t, func() bool { - return h1.Network().Connectedness(h2.ID()) != network.Connected - }, 5*time.Second, 10*time.Millisecond) - - // Should never reconnect. - t.Logf("ensuring h1 is not connected to h2 again") - require.Never(t, func() bool { - return h1.Network().Connectedness(h2.ID()) == network.Connected - }, 20*time.Second, 1*time.Second) - - // Until added back - ps1.AddPeer(peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) - require.Contains(t, ps1.ListPeers(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) - ps1.AddPeer(peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}) - require.Contains(t, ps1.ListPeers(), peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}) - t.Logf("wait for h1 to connect to h2 and h3 again") - require.Eventually(t, func() bool { - return h1.Network().Connectedness(h2.ID()) == network.Connected - }, 30*time.Second, 1*time.Second) - require.Eventually(t, func() bool { - return h1.Network().Connectedness(h3.ID()) == network.Connected - }, 30*time.Second, 1*time.Second) - - // Should be able to repeatedly stop. - require.NoError(t, ps1.Stop()) - require.NoError(t, ps1.Stop()) - - // Adding and removing should work after stopping. - ps1.AddPeer(peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}) - require.Contains(t, ps1.ListPeers(), peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}) - ps1.RemovePeer(h2.ID()) - require.NotContains(t, ps1.ListPeers(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) -} - -func TestNextBackoff(t *testing.T) { - minMaxBackoff := (100 - maxBackoffJitter) / 100 * maxBackoff - for x := 0; x < 1000; x++ { - ph := peerHandler{nextDelay: time.Second} - for min, max := time.Second*3/2, time.Second*5/2; min < minMaxBackoff; min, max = min*3/2, max*5/2 { - b := ph.nextBackoff() - if b > max || b < min { - t.Errorf("expected backoff %s to be between %s and %s", b, min, max) - } - } - for i := 0; i < 100; i++ { - b := ph.nextBackoff() - if b < minMaxBackoff || b > maxBackoff { - t.Fatal("failed to stay within max bounds") - } - } - } -}