Skip to content

Commit

Permalink
Add option to specify bootstrap addresses
Browse files Browse the repository at this point in the history
Allow the libp2p host to optionally perform bootstrap with given list of
addinfos.
masih committed Feb 17, 2022
1 parent f7ffb10 commit c76879f
Showing 11 changed files with 137 additions and 28 deletions.
2 changes: 1 addition & 1 deletion commands/controller.go
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ func controllerCommand(c *cli.Context) error {
select {
case <-ctx.Done():
case <-exiting:
// no need to shutdown in this case.
// no need to shut down in this case.
return
}

7 changes: 6 additions & 1 deletion commands/flags.go
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ import (

var log = logging.Logger("dealbot")

var CommonFlags []cli.Flag = []cli.Flag{
var CommonFlags = []cli.Flag{
altsrc.NewStringFlag(&cli.StringFlag{
Name: "wallet",
Usage: "deal client wallet address on node",
@@ -239,6 +239,11 @@ var ControllerFlags = []cli.Flag{
Usage: "libp2p multiaddrs to listen on",
EnvVars: []string{"DEALBOT_LIBP2P_ADDRS"},
}),
altsrc.NewStringSliceFlag(&cli.StringSliceFlag{
Name: "libp2p-bootstrap-addrinfo",
Usage: "libp2p addrinfos to use for bootstrapping",
EnvVars: []string{"DEALBOT_LIBP2P_BOOTSTRAP_ADDRINFO"},
}),
altsrc.NewStringFlag(&cli.StringFlag{
Name: "graphql",
Usage: "host:port to bind graphql server on",
12 changes: 11 additions & 1 deletion controller/controller.go
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ import (
"github.com/filecoin-project/dealbot/metrics/prometheus"
"github.com/filecoin-project/lotus/api"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"

logging "github.com/ipfs/go-log/v2"
@@ -126,7 +127,16 @@ func New(ctx *cli.Context) (*Controller, error) {
listenAddrs = append(listenAddrs, addr)
}

backend, err := state.NewStateDB(ctx.Context, connector, migrator, ctx.String("datapointlog"), key, listenAddrs, recorder)
var btstrp []peer.AddrInfo
for _, b := range ctx.StringSlice("libp2p-bootstrap-addrinfo") {
b, err := peer.AddrInfoFromString(b)
if err != nil {
return nil, err
}
btstrp = append(btstrp, *b)
}

backend, err := state.NewStateDB(ctx.Context, connector, migrator, ctx.String("datapointlog"), key, listenAddrs, btstrp, recorder)
if err != nil {
return nil, err
}
2 changes: 1 addition & 1 deletion controller/http_test.go
Original file line number Diff line number Diff line change
@@ -389,7 +389,7 @@ func newHarness(ctx context.Context, t *testing.T, connector state.DBConnector,
require.NoError(t, err)

max, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
be, err := state.NewStateDB(ctx, connector, migrator, "", pr, []multiaddr.Multiaddr{max}, h.recorder)
be, err := state.NewStateDB(ctx, connector, migrator, "", pr, []multiaddr.Multiaddr{max}, nil, h.recorder)
require.NoError(t, err)
cc := cli.NewContext(cli.NewApp(), &flag.FlagSet{}, nil)
h.controller, err = controller.NewWithDependencies(cc, listener, nil, h.recorder, be)
2 changes: 1 addition & 1 deletion controller/state/interface.go
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ import (
"github.com/ipfs/go-cid"
)

// State provides an interface for presistence.
// State provides an interface for persistence.
type State interface {
AssignTask(ctx context.Context, req tasks.PopTask) (tasks.Task, error)
Get(ctx context.Context, uuid string) (tasks.Task, error)
12 changes: 12 additions & 0 deletions controller/state/libp2p.go
Original file line number Diff line number Diff line change
@@ -2,8 +2,10 @@ package state

import (
"database/sql"
"io"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-ipfs/core/bootstrap"
csms "github.com/libp2p/go-conn-security-multistream"
crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
@@ -87,6 +89,16 @@ func NewHost(priv crypto.PrivKey, listenAddrs []multiaddr.Multiaddr) (host.Host,
return host, nil
}

func bootstrapHost(host host.Host, btstrp []peer.AddrInfo) (io.Closer, error) {
bCfg := bootstrap.DefaultBootstrapConfig
// TODO: parameterize this value, since the concrete value is application specific.
bCfg.MinPeerThreshold = 1
bCfg.BootstrapPeers = func() []peer.AddrInfo {
return btstrp
}
return bootstrap.Bootstrap(host.ID(), host, nil, bCfg)
}

func dbDS(table string, db *sql.DB) datastore.Batching {
queries := pg.NewQueries(table)
ds := sqlds.NewDatastore(db, queries)
15 changes: 12 additions & 3 deletions controller/state/statedb.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ import (
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/storage/memstore"
crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multicodec"
tokenjson "github.com/polydawn/refmt/json"
@@ -100,12 +101,12 @@ type stateDB struct {
}

// NewStateDB creates a state instance with a given driver and identity
func NewStateDB(ctx context.Context, dbConn DBConnector, migrator Migrator, logfile string, identity crypto.PrivKey, addrs []multiaddr.Multiaddr, recorder metrics.MetricsRecorder) (State, error) {
return newStateDBWithNotify(ctx, dbConn, migrator, logfile, identity, addrs, recorder, nil)
func NewStateDB(ctx context.Context, dbConn DBConnector, migrator Migrator, logfile string, identity crypto.PrivKey, addrs []multiaddr.Multiaddr, btstrp []peer.AddrInfo, recorder metrics.MetricsRecorder) (State, error) {
return newStateDBWithNotify(ctx, dbConn, migrator, logfile, identity, addrs, btstrp, recorder, nil)
}

// newStateDBWithNotify is NewStateDB with additional parameters for testing
func newStateDBWithNotify(ctx context.Context, dbConn DBConnector, migrator Migrator, logfile string, identity crypto.PrivKey, addrs []multiaddr.Multiaddr, recorder metrics.MetricsRecorder, runNotice chan string) (State, error) {
func newStateDBWithNotify(ctx context.Context, dbConn DBConnector, migrator Migrator, logfile string, identity crypto.PrivKey, addrs []multiaddr.Multiaddr, btstrp []peer.AddrInfo, recorder metrics.MetricsRecorder, runNotice chan string) (State, error) {

// Open database connection
err := dbConn.Connect()
@@ -153,6 +154,14 @@ func newStateDBWithNotify(ctx context.Context, dbConn DBConnector, migrator Migr
if err != nil {
return nil, err
}
log.Infow("libp2p host instantiated", "id", host.ID(), "listenAdds", host.Addrs())

if len(btstrp) != 0 {
if _, err = bootstrapHost(host, btstrp); err != nil {
return nil, err
}
log.Infow("successfully bootstrapped host", "bootstrapAdds", btstrp)
}

b := dbDS("legs_data", st.db())
pub, err := dtsync.NewPublisher(host, b, storeLS, "/pando/v0.0.1")
2 changes: 1 addition & 1 deletion controller/state/statedb_test.go
Original file line number Diff line number Diff line change
@@ -760,7 +760,7 @@ func withState(ctx context.Context, t *testing.T, fn func(*stateDB)) {
err = WipeAndReset(dbConn, migrator)
require.NoError(t, err)
max, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
stateInterface, err := NewStateDB(ctx, dbConn, migrator, "", key, []multiaddr.Multiaddr{max}, nil)
stateInterface, err := NewStateDB(ctx, dbConn, migrator, "", key, []multiaddr.Multiaddr{max}, nil, nil)
require.NoError(t, err)
state, ok := stateInterface.(*stateDB)
require.True(t, ok, "returned wrong type")
4 changes: 2 additions & 2 deletions controller/state/taskscheduler_test.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ func TestScheduledTask(t *testing.T) {

err = WipeAndReset(dbConn, migrator)
require.NoError(t, err)
stateInterface, err := newStateDBWithNotify(ctx, dbConn, migrator, "", key, []multiaddr.Multiaddr{}, nil, runNotice)
stateInterface, err := newStateDBWithNotify(ctx, dbConn, migrator, "", key, []multiaddr.Multiaddr{}, nil, nil, runNotice)
require.NoError(t, err)
state := stateInterface.(*stateDB)

@@ -160,7 +160,7 @@ func TestScheduledTaskLimit(t *testing.T) {

err = WipeAndReset(dbConn, migrator)
require.NoError(t, err)
stateInterface, err := newStateDBWithNotify(ctx, dbConn, migrator, "", key, []multiaddr.Multiaddr{}, nil, runNotice)
stateInterface, err := newStateDBWithNotify(ctx, dbConn, migrator, "", key, []multiaddr.Multiaddr{}, nil, nil, runNotice)
require.NoError(t, err)
state := stateInterface.(*stateDB)

15 changes: 8 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ require (
github.com/ipfs/go-cid v0.1.0
github.com/ipfs/go-datastore v0.5.1
github.com/ipfs/go-ds-sql v0.3.0
github.com/ipfs/go-ipfs v0.11.0
github.com/ipfs/go-log/v2 v2.5.0
github.com/ipld/go-car v0.3.3
github.com/ipld/go-car/v2 v2.1.1
@@ -141,7 +142,7 @@ require (
github.com/filecoin-project/specs-storage v0.1.1-0.20211228030229-6d460d25a0c9 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/gbrlsnchs/jwt/v3 v3.0.1 // indirect
github.com/go-errors/errors v1.0.1 // indirect
github.com/go-kit/log v0.2.0 // indirect
@@ -186,10 +187,10 @@ require (
github.com/ipfs/go-ds-badger2 v0.1.2 // indirect
github.com/ipfs/go-ds-leveldb v0.5.0 // indirect
github.com/ipfs/go-ds-measure v0.2.1-0.20211210144622-128d71257f12 // indirect
github.com/ipfs/go-fs-lock v0.0.6 // indirect
github.com/ipfs/go-fs-lock v0.0.7 // indirect
github.com/ipfs/go-graphsync v0.12.0 // indirect
github.com/ipfs/go-ipfs-blockstore v1.1.2 // indirect
github.com/ipfs/go-ipfs-cmds v0.3.0 // indirect
github.com/ipfs/go-ipfs-cmds v0.6.0 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect
github.com/ipfs/go-ipfs-exchange-interface v0.1.0 // indirect
github.com/ipfs/go-ipfs-files v0.0.9 // indirect
@@ -202,11 +203,11 @@ require (
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-merkledag v0.5.1 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/ipfs/go-path v0.0.7 // indirect
github.com/ipfs/go-path v0.2.1 // indirect
github.com/ipfs/go-peertaskqueue v0.7.1 // indirect
github.com/ipfs/go-unixfs v0.3.1 // indirect
github.com/ipfs/go-verifcid v0.0.1 // indirect
github.com/ipfs/interface-go-ipfs-core v0.4.0 // indirect
github.com/ipfs/interface-go-ipfs-core v0.5.2 // indirect
github.com/ipld/go-codec-dagpb v1.3.0 // indirect
github.com/ipld/go-ipld-selector-text-lite v0.0.1 // indirect
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 // indirect
@@ -336,8 +337,8 @@ require (
go.opentelemetry.io/otel/trace v1.3.0 // indirect
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/dig v1.10.0 // indirect
go.uber.org/fx v1.9.0 // indirect
go.uber.org/dig v1.12.0 // indirect
go.uber.org/fx v1.15.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.19.1 // indirect
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
92 changes: 82 additions & 10 deletions go.sum

Large diffs are not rendered by default.

0 comments on commit c76879f

Please sign in to comment.