Skip to content

Commit

Permalink
Publish Pando-compatible metadata over GraphSync
Browse files Browse the repository at this point in the history
Implement a publisher specific to Pando metadata that uses the CID of
`RecordUpdate` as byte value of Pando metadata and exposes a GraphSync
server that can resolve both Pando metadata, and the `RecordUpdate`
CIDs.

Decouple `statedb` from publication by changing the state interface to
return the CID of `RecordUpdate`. This removes all libp2p and legs code
from the `state` package. Instead, the publication is done on the
controller itself upon `POST`s to `/complete/<workedby>` as a
non-critical operation. This makes a better structured code in the
context of Separation of Concerns.

Relates to:
 - #342
  • Loading branch information
masih committed Feb 25, 2022
1 parent a1fd7a0 commit b57f2e2
Show file tree
Hide file tree
Showing 13 changed files with 621 additions and 211 deletions.
49 changes: 46 additions & 3 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/filecoin-project/dealbot/controller/graphql"
"github.com/filecoin-project/dealbot/controller/publisher"
"github.com/filecoin-project/dealbot/controller/spawn"
"github.com/filecoin-project/dealbot/controller/state"
"github.com/filecoin-project/dealbot/controller/webutil"
Expand All @@ -24,6 +25,9 @@ import (
metricslog "github.com/filecoin-project/dealbot/metrics/log"
"github.com/filecoin-project/dealbot/metrics/prometheus"
"github.com/filecoin-project/lotus/api"
sqlds "github.com/ipfs/go-ds-sql"
"github.com/ipfs/go-ds-sql/postgres"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -56,6 +60,7 @@ type Controller struct {
spawner spawn.Spawner
gateway api.Gateway
nodeCloser lotus.NodeCloser
pub publisher.Publisher
}

func New(ctx *cli.Context) (*Controller, error) {
Expand Down Expand Up @@ -118,6 +123,13 @@ func New(ctx *cli.Context) (*Controller, error) {
if err != nil {
return nil, err
}

backend, err := state.NewStateDB(ctx.Context, connector, migrator, ctx.String("datapointlog"), key, recorder)
if err != nil {
return nil, err
}

// Get the configured libp2p host listen addresses
listenAddrs := make([]multiaddr.Multiaddr, 0)
for _, a := range ctx.StringSlice("libp2p-addrs") {
addr, err := multiaddr.NewMultiaddr(a)
Expand All @@ -127,6 +139,13 @@ func New(ctx *cli.Context) (*Controller, error) {
listenAddrs = append(listenAddrs, addr)
}

// Instantiate a new libp2p host needed by publisher.
host, err := libp2p.New(libp2p.ListenAddrs(listenAddrs...), libp2p.Identity(key))
if err != nil {
return nil, err
}

// Get libp2p bootstrap hosts.
var btstrp []peer.AddrInfo
for _, b := range ctx.StringSlice("libp2p-bootstrap-addrinfo") {
b, err := peer.AddrInfoFromString(b)
Expand All @@ -136,11 +155,20 @@ func New(ctx *cli.Context) (*Controller, error) {
btstrp = append(btstrp, *b)
}

backend, err := state.NewStateDB(ctx.Context, connector, migrator, ctx.String("datapointlog"), key, listenAddrs, btstrp, recorder)
// Instantiate a datastore backed by DB used internally by the publisher.
queries := postgres.NewQueries("legs_data")
ds := sqlds.NewDatastore(connector.SqlDB(), queries)

// Instantiate a store, used to read the state records created by state db.
store := backend.Store(ctx.Context)

// Instantiate publisher.
pub, err := publisher.NewPandoPublisher(ds, store, publisher.WithHost(host), publisher.WithBootstrapPeers(btstrp...))
if err != nil {
return nil, err
}
return NewWithDependencies(ctx, l, gl, recorder, backend)

return NewWithDependencies(ctx, l, gl, recorder, backend, pub)
}

type logEcapsulator struct {
Expand All @@ -165,9 +193,10 @@ func CorsMiddleware(next http.Handler) http.Handler {
})
}

func NewWithDependencies(ctx *cli.Context, listener, graphqlListener net.Listener, recorder metrics.MetricsRecorder, backend state.State) (*Controller, error) {
func NewWithDependencies(ctx *cli.Context, listener, graphqlListener net.Listener, recorder metrics.MetricsRecorder, backend state.State, pub publisher.Publisher) (*Controller, error) {
srv := new(Controller)
srv.db = backend
srv.pub = pub
srv.basicauth = ctx.String("basicauth")
if ctx.String("daemon-driver") == "kubernetes" {
srv.spawner = spawn.NewKubernetes()
Expand Down Expand Up @@ -265,6 +294,13 @@ func NewWithDependencies(ctx *cli.Context, listener, graphqlListener net.Listene
// explicitly via Shutdown, or due to a fault condition. It propagates the
// non-nil err return value from http.Serve.
func (c *Controller) Serve() error {
if c.pub != nil {
if err := c.pub.Start(context.TODO()); err != nil {
log.Errorw("Failed to start publisher", "err", err)
return err
}
}

select {
case <-c.doneCh:
return fmt.Errorf("tried to reuse a stopped server")
Expand Down Expand Up @@ -297,5 +333,12 @@ func (c *Controller) Shutdown(ctx context.Context) error {
if c.gateway != nil {
c.nodeCloser()
}

if c.pub != nil {
if err := c.pub.Shutdown(ctx); err != nil {
log.Errorw("Failed to shut down publisher", "err", err)
}
}

return c.server.Shutdown(ctx)
}
6 changes: 2 additions & 4 deletions controller/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/filecoin-project/dealbot/tasks"
"github.com/ipld/go-ipld-prime/codec/dagjson"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"github.com/urfave/cli/v2"
)
Expand Down Expand Up @@ -388,11 +387,10 @@ func newHarness(ctx context.Context, t *testing.T, connector state.DBConnector,
pr, _, _ := crypto.GenerateKeyPair(crypto.Ed25519, 0)
require.NoError(t, err)

max, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
be, err := state.NewStateDB(ctx, connector, migrator, "", pr, []multiaddr.Multiaddr{max}, nil, h.recorder)
be, err := state.NewStateDB(ctx, connector, migrator, "", pr, h.recorder)
require.NoError(t, err)
cc := cli.NewContext(cli.NewApp(), &flag.FlagSet{}, nil)
h.controller, err = controller.NewWithDependencies(cc, listener, nil, h.recorder, be)
h.controller, err = controller.NewWithDependencies(cc, listener, nil, h.recorder, be, nil)

h.serveErr = make(chan error, 1)
go func() {
Expand Down
77 changes: 77 additions & 0 deletions controller/publisher/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package publisher

import (
"github.com/ipfs/go-ipfs/core/bootstrap"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
)

type (
// Option represents a configurable parameter in a publisher.
Option func(*options) error

options struct {
h host.Host
btstrpCfg *bootstrap.BootstrapConfig
topic string
}
)

func apply(o ...Option) (*options, error) {
opts := &options{
topic: "/pando/v0.0.1",
}
for _, apply := range o {
if err := apply(opts); err != nil {
return nil, err
}
}
if opts.h == nil {
var err error
opts.h, err = libp2p.New()
if err != nil {
return nil, err
}
}
return opts, nil
}

// WithBootstrapPeers optionally sets the list of peers to which to remain connected.
// If unset, no bootstrapping will be performed.
//
// See: bootstrap.DefaultBootstrapConfig
func WithBootstrapPeers(b ...peer.AddrInfo) Option {
return func(o *options) error {
addrCount := len(b)
if addrCount > 0 {
o.btstrpCfg = &bootstrap.BootstrapConfig{
MinPeerThreshold: addrCount,
Period: bootstrap.DefaultBootstrapConfig.Period,
ConnectionTimeout: bootstrap.DefaultBootstrapConfig.ConnectionTimeout,
BootstrapPeers: func() []peer.AddrInfo { return b },
}
}
return nil
}
}

// WithHost sets the host on which the publisher is exposed.
// If unset, a default libp2p host is created with random identity and listen addrtesses.
//
// See: libp2p.New.
func WithHost(h host.Host) Option {
return func(o *options) error {
o.h = h
return nil
}
}

// WithTopic sets the gossipsub topic to which announcements are made.
// Defaults to `/pando/v0.0.1` if unset.
func WithTopic(topic string) Option {
return func(o *options) error {
o.topic = topic
return nil
}
}
Loading

0 comments on commit b57f2e2

Please sign in to comment.