Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore/migrate peering #473

Merged
merged 21 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9b3e749
feat: implement peering service
Stebalien May 25, 2020
05cbfba
fix: doc comment location
Stebalien May 26, 2020
a7e3b3f
fix: address peering service code feedback
Stebalien May 26, 2020
2e1f012
test: add unit test for peering service
Stebalien May 26, 2020
c1fb740
fix(peering): fix a race condition
Stebalien May 26, 2020
2ef7092
fix: remove unecessary context
Stebalien May 26, 2020
971983d
fix: really cap the max backoff at 10 minutes
Stebalien May 26, 2020
d6255b5
update go-libp2p to v0.13.0
marten-seemann Dec 19, 2020
f778bf1
peering: add logs before many-second waits
mvdan Feb 6, 2021
67c4cf0
fix: the test of peering.PeeringService
TakashiMatsuda May 15, 2021
1f381a2
feature: 'ipfs swarm peering' command (#8147)
TakashiMatsuda Sep 15, 2021
b368457
fix: take the lock while listing peers
Stebalien Sep 15, 2021
51b5ca2
feat: go-libp2p 0.16, UnixFS autosharding and go-datastore with conte…
aschmahmann Nov 29, 2021
f264de0
feat: opt-in Swarm.ResourceMgr (go-libp2p v0.18) (#8680)
marten-seemann Apr 8, 2022
9f54e44
feat: add a public function on peering to get the state
Jun 11, 2022
d7e9091
chore: bump go-libp2p v0.22.0 & go1.18&go1.19
Jorropo Aug 29, 2022
396c166
style: gofumpt and godot [skip changelog] (#10081)
kehiy Aug 17, 2023
dae3136
docs: fix typos
criadoperez Sep 22, 2023
41637c6
peering: migrate from Kubo
gammazero Sep 28, 2023
7926f7d
docs: improve changelog for peering
Jorropo Oct 24, 2023
b5cf3fe
chore: remove outdated comment in peering
Jorropo Oct 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ The following emojis are used to highlight certain changes:
* The gateway now sets a `Cache-Control` header for requests under the `/ipns/` namespace if the TTL for the corresponding IPNS Records or DNSLink entities is known.
* `boxo/bitswap/client`:
* A new `WithoutDuplicatedBlockStats()` option can be used with `bitswap.New` and `bsclient.New`. This disable accounting for duplicated blocks, which requires a `blockstore.Has()` lookup for every received block and thus, can impact performance.
* ✨ Migrated repositories into Boxo
* [`github.com/ipfs/kubo/peering`](https://pkg.go.dev/github.com/ipfs/kubo/peering) => [`./peering`](./peering)
A service which establish, overwatch and maintain long lived connections.

### Changed

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/ipfs/go-ipld-cbor v0.0.6
github.com/ipfs/go-ipld-format v0.5.0
github.com/ipfs/go-ipld-legacy v0.2.1
github.com/ipfs/go-log v1.0.5
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipfs/go-metrics-interface v0.0.1
github.com/ipfs/go-peertaskqueue v0.8.1
Expand Down Expand Up @@ -107,7 +108,6 @@ require (
github.com/huin/goupnp v1.2.0 // indirect
github.com/ipfs/go-ipfs-pq v0.0.3 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-unixfs v0.4.5 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
Expand Down
321 changes: 321 additions & 0 deletions peering/peering.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
package peering

import (
"context"
"errors"
"math/rand"
"strconv"
"sync"
"time"

"github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)

const (
// maxBackoff is the maximum time between reconnect attempts.
maxBackoff = 10 * time.Minute
// The backoff will be cut off when we get within 10% of the actual max.
// If we go over the max, we'll adjust the delay down to a random value
// between 90-100% of the max backoff.
maxBackoffJitter = 10 // %
connmgrTag = "ipfs-peering"
// This needs to be sufficient to prevent two sides from simultaneously
// dialing.
initialDelay = 5 * time.Second
)

var logger = log.Logger("peering")

type State uint

func (s State) String() string {
switch s {
case StateInit:
return "init"
case StateRunning:
return "running"
case StateStopped:
return "stopped"
default:
return "unknown peering state: " + strconv.FormatUint(uint64(s), 10)

Check warning on line 44 in peering/peering.go

View check run for this annotation

Codecov / codecov/patch

peering/peering.go#L35-L44

Added lines #L35 - L44 were not covered by tests
}
}

const (
StateInit State = iota
StateRunning
StateStopped
)

// peerHandler keeps track of all state related to a specific "peering" peer.
type peerHandler struct {
peer peer.ID
host host.Host
ctx context.Context
cancel context.CancelFunc

mu sync.Mutex
addrs []multiaddr.Multiaddr
reconnectTimer *time.Timer

nextDelay time.Duration
}

// setAddrs sets the addresses for this peer.
func (ph *peerHandler) setAddrs(addrs []multiaddr.Multiaddr) {
// Not strictly necessary, but it helps to not trust the calling code.
addrCopy := make([]multiaddr.Multiaddr, len(addrs))
copy(addrCopy, addrs)

ph.mu.Lock()
defer ph.mu.Unlock()
ph.addrs = addrCopy

Check warning on line 76 in peering/peering.go

View check run for this annotation

Codecov / codecov/patch

peering/peering.go#L69-L76

Added lines #L69 - L76 were not covered by tests
}

// getAddrs returns a shared slice of addresses for this peer. Do not modify.
func (ph *peerHandler) getAddrs() []multiaddr.Multiaddr {
ph.mu.Lock()
defer ph.mu.Unlock()
return ph.addrs
}

// stop permanently stops the peer handler.
func (ph *peerHandler) stop() {
ph.cancel()

ph.mu.Lock()
defer ph.mu.Unlock()
if ph.reconnectTimer != nil {
ph.reconnectTimer.Stop()
ph.reconnectTimer = nil
}

Check warning on line 95 in peering/peering.go

View check run for this annotation

Codecov / codecov/patch

peering/peering.go#L93-L95

Added lines #L93 - L95 were not covered by tests
}

func (ph *peerHandler) nextBackoff() time.Duration {
if ph.nextDelay < maxBackoff {
ph.nextDelay += ph.nextDelay/2 + time.Duration(rand.Int63n(int64(ph.nextDelay)))
}

// If we've gone over the max backoff, reduce it under the max.
if ph.nextDelay > maxBackoff {
ph.nextDelay = maxBackoff
// randomize the backoff a bit (10%).
ph.nextDelay -= time.Duration(rand.Int63n(int64(maxBackoff) * maxBackoffJitter / 100))
}

return ph.nextDelay
}

func (ph *peerHandler) reconnect() {
// Try connecting
addrs := ph.getAddrs()
logger.Debugw("reconnecting", "peer", ph.peer, "addrs", addrs)

err := ph.host.Connect(ph.ctx, peer.AddrInfo{ID: ph.peer, Addrs: addrs})
if err != nil {
logger.Debugw("failed to reconnect", "peer", ph.peer, "error", err)
// Ok, we failed. Extend the timeout.
ph.mu.Lock()
if ph.reconnectTimer != nil {
// Only counts if the reconnectTimer still exists. If not, a
// connection _was_ somehow established.
ph.reconnectTimer.Reset(ph.nextBackoff())
}

Check warning on line 127 in peering/peering.go

View check run for this annotation

Codecov / codecov/patch

peering/peering.go#L120-L127

Added lines #L120 - L127 were not covered by tests
// Otherwise, someone else has stopped us so we can assume that
// we're either connected or someone else will start us.
ph.mu.Unlock()

Check warning on line 130 in peering/peering.go

View check run for this annotation

Codecov / codecov/patch

peering/peering.go#L130

Added line #L130 was not covered by tests
}

// Always call this. We could have connected since we processed the
// error.
ph.stopIfConnected()
}

func (ph *peerHandler) stopIfConnected() {
ph.mu.Lock()
defer ph.mu.Unlock()

if ph.reconnectTimer != nil && ph.host.Network().Connectedness(ph.peer) == network.Connected {
logger.Debugw("successfully reconnected", "peer", ph.peer)
ph.reconnectTimer.Stop()
ph.reconnectTimer = nil
ph.nextDelay = initialDelay
}
}

// startIfDisconnected is the inverse of stopIfConnected.
func (ph *peerHandler) startIfDisconnected() {
ph.mu.Lock()
defer ph.mu.Unlock()

if ph.reconnectTimer == nil && ph.host.Network().Connectedness(ph.peer) != network.Connected {
logger.Debugw("disconnected from peer", "peer", ph.peer)
// Always start with a short timeout so we can stagger things a bit.
ph.reconnectTimer = time.AfterFunc(ph.nextBackoff(), ph.reconnect)
}
}

// PeeringService maintains connections to specified peers, reconnecting on
// disconnect with a back-off.
type PeeringService struct {
host host.Host

mu sync.RWMutex
peers map[peer.ID]*peerHandler
state State
}

// NewPeeringService constructs a new peering service. Peers can be added and
// removed immediately, but connections won't be formed until `Start` is called.
func NewPeeringService(host host.Host) *PeeringService {
return &PeeringService{host: host, peers: make(map[peer.ID]*peerHandler)}
}

// Start starts the peering service, connecting and maintaining connections to
// all registered peers. It returns an error if the service has already been
// stopped.
func (ps *PeeringService) Start() error {
ps.mu.Lock()
defer ps.mu.Unlock()

switch ps.state {
case StateInit:
logger.Infow("starting")
case StateRunning:
return nil
case StateStopped:
return errors.New("already stopped")

Check warning on line 191 in peering/peering.go

View check run for this annotation

Codecov / codecov/patch

peering/peering.go#L190-L191

Added lines #L190 - L191 were not covered by tests
}
ps.host.Network().Notify((*netNotifee)(ps))
ps.state = StateRunning
for _, handler := range ps.peers {
go handler.startIfDisconnected()
}
return nil
}

// GetState get the State of the PeeringService.
func (ps *PeeringService) GetState() State {
ps.mu.RLock()
defer ps.mu.RUnlock()
return ps.state

Check warning on line 205 in peering/peering.go

View check run for this annotation

Codecov / codecov/patch

peering/peering.go#L202-L205

Added lines #L202 - L205 were not covered by tests
}

// Stop stops the peering service.
func (ps *PeeringService) Stop() {
ps.host.Network().StopNotify((*netNotifee)(ps))
ps.mu.Lock()
defer ps.mu.Unlock()

switch ps.state {
case StateInit, StateRunning:
logger.Infow("stopping")
for _, handler := range ps.peers {
handler.stop()
}
ps.state = StateStopped
}
}

// AddPeer adds a peer to the peering service. This function may be safely
// called at any time: before the service is started, while running, or after it
// stops.
//
// Add peer may also be called multiple times for the same peer. The new
// addresses will replace the old.
func (ps *PeeringService) AddPeer(info peer.AddrInfo) {
ps.mu.Lock()
defer ps.mu.Unlock()

if handler, ok := ps.peers[info.ID]; ok {
logger.Infow("updating addresses", "peer", info.ID, "addrs", info.Addrs)
handler.setAddrs(info.Addrs)

Check warning on line 236 in peering/peering.go

View check run for this annotation

Codecov / codecov/patch

peering/peering.go#L235-L236

Added lines #L235 - L236 were not covered by tests
} else {
logger.Infow("peer added", "peer", info.ID, "addrs", info.Addrs)
ps.host.ConnManager().Protect(info.ID, connmgrTag)

handler = &peerHandler{
host: ps.host,
peer: info.ID,
addrs: info.Addrs,
nextDelay: initialDelay,
}
handler.ctx, handler.cancel = context.WithCancel(context.Background())
ps.peers[info.ID] = handler
switch ps.state {
case StateRunning:
go handler.startIfDisconnected()
case StateStopped:
// We still construct everything in this state because
// it's easier to reason about. But we should still free
// resources.
handler.cancel()
}
}
}

// ListPeers lists peers in the peering service.
func (ps *PeeringService) ListPeers() []peer.AddrInfo {
ps.mu.RLock()
defer ps.mu.RUnlock()

out := make([]peer.AddrInfo, 0, len(ps.peers))
for id, addrs := range ps.peers {
ai := peer.AddrInfo{ID: id}
ai.Addrs = append(ai.Addrs, addrs.addrs...)
out = append(out, ai)
}
return out
}

// RemovePeer removes a peer from the peering service. This function may be
// safely called at any time: before the service is started, while running, or
// after it stops.
func (ps *PeeringService) RemovePeer(id peer.ID) {
ps.mu.Lock()
defer ps.mu.Unlock()

if handler, ok := ps.peers[id]; ok {
logger.Infow("peer removed", "peer", id)
ps.host.ConnManager().Unprotect(id, connmgrTag)

handler.stop()
delete(ps.peers, id)
}
}

type netNotifee PeeringService

func (nn *netNotifee) Connected(_ network.Network, c network.Conn) {
ps := (*PeeringService)(nn)

p := c.RemotePeer()
ps.mu.RLock()
defer ps.mu.RUnlock()

if handler, ok := ps.peers[p]; ok {
// use a goroutine to avoid blocking events.
go handler.stopIfConnected()
}
}

func (nn *netNotifee) Disconnected(_ network.Network, c network.Conn) {
ps := (*PeeringService)(nn)

p := c.RemotePeer()
ps.mu.RLock()
defer ps.mu.RUnlock()

if handler, ok := ps.peers[p]; ok {
// use a goroutine to avoid blocking events.
go handler.startIfDisconnected()
}
}
func (nn *netNotifee) OpenedStream(network.Network, network.Stream) {}
func (nn *netNotifee) ClosedStream(network.Network, network.Stream) {}
func (nn *netNotifee) Listen(network.Network, multiaddr.Multiaddr) {}
func (nn *netNotifee) ListenClose(network.Network, multiaddr.Multiaddr) {}

Check warning on line 321 in peering/peering.go

View check run for this annotation

Codecov / codecov/patch

peering/peering.go#L318-L321

Added lines #L318 - L321 were not covered by tests
Loading