Skip to content

Commit

Permalink
feat(bootstrap): save connected peers as backup bootstrap peers (#8856)
Browse files Browse the repository at this point in the history
* feat(bootstrap): save connected peers as backup temporary bootstrap ones
* fix: do not add duplicated oldSavedPeers, not using tags, reuse
randomizeList
* test: add regression test
* chore: add changelog

---------

Co-authored-by: Henrique Dias <hacdias@gmail.com>
Co-authored-by: Marcin Rataj <lidel@lidel.org>
  • Loading branch information
3 people authored May 25, 2023
1 parent eda19c8 commit 63561f3
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 56 deletions.
7 changes: 4 additions & 3 deletions config/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package config

type Internal struct {
// All marked as omitempty since we are expecting to make changes to all subcomponents of Internal
Bitswap *InternalBitswap `json:",omitempty"`
UnixFSShardingSizeThreshold *OptionalString `json:",omitempty"`
Libp2pForceReachability *OptionalString `json:",omitempty"`
Bitswap *InternalBitswap `json:",omitempty"`
UnixFSShardingSizeThreshold *OptionalString `json:",omitempty"`
Libp2pForceReachability *OptionalString `json:",omitempty"`
BackupBootstrapInterval *OptionalDuration `json:",omitempty"`
}

type InternalBitswap struct {
Expand Down
229 changes: 180 additions & 49 deletions core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package bootstrap
import (
"context"
"errors"
"fmt"
"io"
"math/rand"
"sync"
"sync/atomic"
"time"

logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
"github.com/jbenet/goprocess/context"
"github.com/jbenet/goprocess/periodic"
goprocessctx "github.com/jbenet/goprocess/context"
periodicproc "github.com/jbenet/goprocess/periodic"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -50,13 +50,26 @@ type BootstrapConfig struct {
// for the bootstrap process to use. This makes it possible for clients
// to control the peers the process uses at any moment.
BootstrapPeers func() []peer.AddrInfo

// BackupBootstrapInterval governs the periodic interval at which the node will
// attempt to save connected nodes to use as temporary bootstrap peers.
BackupBootstrapInterval time.Duration

// MaxBackupBootstrapSize controls the maximum number of peers we're saving
// as backup bootstrap peers.
MaxBackupBootstrapSize int

SaveBackupBootstrapPeers func(context.Context, []peer.AddrInfo)
LoadBackupBootstrapPeers func(context.Context) []peer.AddrInfo
}

// DefaultBootstrapConfig specifies default sane parameters for bootstrapping.
var DefaultBootstrapConfig = BootstrapConfig{
MinPeerThreshold: 4,
Period: 30 * time.Second,
ConnectionTimeout: (30 * time.Second) / 3, // Perod / 3
MinPeerThreshold: 4,
Period: 30 * time.Second,
ConnectionTimeout: (30 * time.Second) / 3, // Perod / 3
BackupBootstrapInterval: 1 * time.Hour,
MaxBackupBootstrapSize: 20,
}

func BootstrapConfigWithPeers(pis []peer.AddrInfo) BootstrapConfig {
Expand Down Expand Up @@ -90,6 +103,9 @@ func Bootstrap(id peer.ID, host host.Host, rt routing.Routing, cfg BootstrapConf
log.Debugf("%s bootstrap error: %s", id, err)
}

// Exit the first call (triggered independently by `proc.Go`, not `Tick`)
// only after being done with the *single* Routing.Bootstrap call. Following
// periodic calls (`Tick`) will not block on this.
<-doneWithRound
}

Expand All @@ -108,9 +124,100 @@ func Bootstrap(id peer.ID, host host.Host, rt routing.Routing, cfg BootstrapConf

doneWithRound <- struct{}{}
close(doneWithRound) // it no longer blocks periodic

startSavePeersAsTemporaryBootstrapProc(cfg, host, proc)

return proc, nil
}

// Aside of the main bootstrap process we also run a secondary one that saves
// connected peers as a backup measure if we can't connect to the official
// bootstrap ones. These peers will serve as *temporary* bootstrap nodes.
func startSavePeersAsTemporaryBootstrapProc(cfg BootstrapConfig, host host.Host, bootstrapProc goprocess.Process) {
savePeersFn := func(worker goprocess.Process) {
ctx := goprocessctx.OnClosingContext(worker)

if err := saveConnectedPeersAsTemporaryBootstrap(ctx, host, cfg); err != nil {
log.Debugf("saveConnectedPeersAsTemporaryBootstrap error: %s", err)
}
}
savePeersProc := periodicproc.Tick(cfg.BackupBootstrapInterval, savePeersFn)

// When the main bootstrap process ends also terminate the 'save connected
// peers' ones. Coupling the two seems the easiest way to handle this backup
// process without additional complexity.
go func() {
<-bootstrapProc.Closing()
savePeersProc.Close()
}()

// Run the first round now (after the first bootstrap process has finished)
// as the SavePeersPeriod can be much longer than bootstrap.
savePeersProc.Go(savePeersFn)
}

func saveConnectedPeersAsTemporaryBootstrap(ctx context.Context, host host.Host, cfg BootstrapConfig) error {
// Randomize the list of connected peers, we don't prioritize anyone.
connectedPeers := randomizeList(host.Network().Peers())

bootstrapPeers := cfg.BootstrapPeers()
backupPeers := make([]peer.AddrInfo, 0, cfg.MaxBackupBootstrapSize)

// Choose peers to save and filter out the ones that are already bootstrap nodes.
for _, p := range connectedPeers {
found := false
for _, bootstrapPeer := range bootstrapPeers {
if p == bootstrapPeer.ID {
found = true
break
}
}
if !found {
backupPeers = append(backupPeers, peer.AddrInfo{
ID: p,
Addrs: host.Network().Peerstore().Addrs(p),
})
}

if len(backupPeers) >= cfg.MaxBackupBootstrapSize {
break
}
}

// If we didn't reach the target number use previously stored connected peers.
if len(backupPeers) < cfg.MaxBackupBootstrapSize {
oldSavedPeers := cfg.LoadBackupBootstrapPeers(ctx)
log.Debugf("missing %d peers to reach backup bootstrap target of %d, trying from previous list of %d saved peers",
cfg.MaxBackupBootstrapSize-len(backupPeers), cfg.MaxBackupBootstrapSize, len(oldSavedPeers))

// Add some of the old saved peers. Ensure we don't duplicate them.
for _, p := range oldSavedPeers {
found := false
for _, sp := range backupPeers {
if p.ID == sp.ID {
found = true
break
}
}

if !found {
backupPeers = append(backupPeers, p)
}

if len(backupPeers) >= cfg.MaxBackupBootstrapSize {
break
}
}
}

cfg.SaveBackupBootstrapPeers(ctx, backupPeers)
log.Debugf("saved %d peers (of %d target) as bootstrap backup in the config", len(backupPeers), cfg.MaxBackupBootstrapSize)
return nil
}

// Connect to as many peers needed to reach the BootstrapConfig.MinPeerThreshold.
// Peers can be original bootstrap or temporary ones (drawn from a list of
// persisted previously connected peers).
func bootstrapRound(ctx context.Context, host host.Host, cfg BootstrapConfig) error {

ctx, cancel := context.WithTimeout(ctx, cfg.ConnectionTimeout)
Expand All @@ -127,35 +234,58 @@ func bootstrapRound(ctx context.Context, host host.Host, cfg BootstrapConfig) er
id, len(connected), cfg.MinPeerThreshold)
return nil
}
numToDial := cfg.MinPeerThreshold - len(connected)
numToDial := cfg.MinPeerThreshold - len(connected) // numToDial > 0

// filter out bootstrap nodes we are already connected to
var notConnected []peer.AddrInfo
for _, p := range peers {
if host.Network().Connectedness(p.ID) != network.Connected {
notConnected = append(notConnected, p)
if len(peers) > 0 {
numToDial -= int(peersConnect(ctx, host, peers, numToDial, true))
if numToDial <= 0 {
return nil
}
}

// if connected to all bootstrap peer candidates, exit
if len(notConnected) < 1 {
log.Debugf("%s no more bootstrap peers to create %d connections", id, numToDial)
return ErrNotEnoughBootstrapPeers
log.Debugf("not enough bootstrap peers to fill the remaining target of %d connections, trying backup list", numToDial)

tempBootstrapPeers := cfg.LoadBackupBootstrapPeers(ctx)
if len(tempBootstrapPeers) > 0 {
numToDial -= int(peersConnect(ctx, host, tempBootstrapPeers, numToDial, false))
if numToDial <= 0 {
return nil
}
}

// connect to a random susbset of bootstrap candidates
randSubset := randomSubsetOfPeers(notConnected, numToDial)
log.Debugf("tried both original bootstrap peers and temporary ones but still missing target of %d connections", numToDial)

log.Debugf("%s bootstrapping to %d nodes: %s", id, numToDial, randSubset)
return bootstrapConnect(ctx, host, randSubset)
return ErrNotEnoughBootstrapPeers
}

func bootstrapConnect(ctx context.Context, ph host.Host, peers []peer.AddrInfo) error {
if len(peers) < 1 {
return ErrNotEnoughBootstrapPeers
}
// Attempt to make `needed` connections from the `availablePeers` list. Mark
// peers as either `permanent` or temporary when adding them to the Peerstore.
// Return the number of connections completed. We eagerly over-connect in parallel,
// so we might connect to more than needed.
// (We spawn as many routines and attempt connections as the number of availablePeers,
// but this list comes from restricted sets of original or temporary bootstrap
// nodes which will keep it under a sane value.)
func peersConnect(ctx context.Context, ph host.Host, availablePeers []peer.AddrInfo, needed int, permanent bool) uint64 {
peers := randomizeList(availablePeers)

// Monitor the number of connections and stop if we reach the target.
var connected uint64
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
if int(atomic.LoadUint64(&connected)) >= needed {
cancel()
return
}
}
}
}()

errs := make(chan error, len(peers))
var wg sync.WaitGroup
for _, p := range peers {

Expand All @@ -164,45 +294,46 @@ func bootstrapConnect(ctx context.Context, ph host.Host, peers []peer.AddrInfo)
// fail/abort due to an expiring context.
// Also, performed asynchronously for dial speed.

if int(atomic.LoadUint64(&connected)) >= needed {
cancel()
break
}

wg.Add(1)
go func(p peer.AddrInfo) {
defer wg.Done()

// Skip addresses belonging to a peer we're already connected to.
// (Not a guarantee but a best-effort policy.)
if ph.Network().Connectedness(p.ID) == network.Connected {
return
}
log.Debugf("%s bootstrapping to %s", ph.ID(), p.ID)

ph.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
if err := ph.Connect(ctx, p); err != nil {
log.Debugf("failed to bootstrap with %v: %s", p.ID, err)
errs <- err
if ctx.Err() != context.Canceled {
log.Debugf("failed to bootstrap with %v: %s", p.ID, err)
}
return
}
if permanent {
// We're connecting to an original bootstrap peer, mark it as
// a permanent address (Connect will register it as TempAddrTTL).
ph.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
}

log.Infof("bootstrapped with %v", p.ID)
atomic.AddUint64(&connected, 1)
}(p)
}
wg.Wait()

// our failure condition is when no connection attempt succeeded.
// So drain the errs channel, counting the results.
close(errs)
count := 0
var err error
for err = range errs {
if err != nil {
count++
}
}
if count == len(peers) {
return fmt.Errorf("failed to bootstrap. %s", err)
}
return nil
return connected
}

func randomSubsetOfPeers(in []peer.AddrInfo, max int) []peer.AddrInfo {
if max > len(in) {
max = len(in)
}

out := make([]peer.AddrInfo, max)
for i, val := range rand.Perm(len(in))[:max] {
func randomizeList[T any](in []T) []T {
out := make([]T, len(in))
for i, val := range rand.Perm(len(in)) {
out[i] = in[val]
}
return out
Expand Down
6 changes: 3 additions & 3 deletions core/bootstrap/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"github.com/libp2p/go-libp2p/core/test"
)

func TestSubsetWhenMaxIsGreaterThanLengthOfSlice(t *testing.T) {
func TestRandomizeAddressList(t *testing.T) {
var ps []peer.AddrInfo
sizeofSlice := 100
sizeofSlice := 10
for i := 0; i < sizeofSlice; i++ {
pid, err := test.RandPeerID()
if err != nil {
Expand All @@ -18,7 +18,7 @@ func TestSubsetWhenMaxIsGreaterThanLengthOfSlice(t *testing.T) {

ps = append(ps, peer.AddrInfo{ID: pid})
}
out := randomSubsetOfPeers(ps, 2*sizeofSlice)
out := randomizeList(ps)
if len(out) != len(ps) {
t.Fail()
}
Expand Down
Loading

0 comments on commit 63561f3

Please sign in to comment.