Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http-transport legs #23

Merged
merged 12 commits into from
Oct 28, 2021
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/libp2p/go-libp2p-core v0.9.0
github.com/libp2p/go-libp2p-pubsub v0.5.4
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1
github.com/multiformats/go-multiaddr v0.4.0
github.com/multiformats/go-multicodec v0.3.0
github.com/whyrusleeping/cbor-gen v0.0.0-20210713220151-be142a5ae1a8
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
Expand Down
85 changes: 85 additions & 0 deletions http/publish.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package http

import (
"context"
"encoding/json"
"errors"
"net/http"
"path"
"sync"

"github.com/filecoin-project/go-legs"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagjson"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
)

type httpPublisher struct {
rl sync.RWMutex
root cid.Cid
lsys ipld.LinkSystem
}

var _ legs.LegPublisher = (*httpPublisher)(nil)
var _ http.Handler = (*httpPublisher)(nil)

// NewPublisher creates a new http publisher
func NewPublisher(ctx context.Context,
ds datastore.Batching,
lsys ipld.LinkSystem) (legs.LegPublisher, error) {
hp := &httpPublisher{}
hp.lsys = lsys
return hp, nil
}

func (h *httpPublisher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ask := path.Base(r.URL.Path)
if ask == "head" {
// serve the
h.rl.RLock()
defer h.rl.RUnlock()
out, err := json.Marshal(h.root.String())
if err != nil {
w.WriteHeader(500)
//todo: log
} else {
_, _ = w.Write(out)
}
return
}
// interpret `ask` as a CID to serve.
c, err := cid.Parse(ask)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("invalid request: not a cid"))
return
}
item, err := h.lsys.Load(ipld.LinkContext{}, cidlink.Link{Cid: c}, basicnode.Prototype.Any)
if err != nil {
if errors.Is(err, ipld.ErrNotExists{}) {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("cid not found"))
return
}
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("unable to load data for cid"))
// todo: log
return
}
// marshal to json and serve.
_ = dagjson.Encode(item, w)
}

func (h *httpPublisher) UpdateRoot(ctx context.Context, c cid.Cid) error {
h.rl.Lock()
defer h.rl.Unlock()
h.root = c
return nil
}

func (h *httpPublisher) Close() error {
return nil
}
305 changes: 305 additions & 0 deletions http/subscribe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
package http

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"path"
"sync"
"time"

"github.com/filecoin-project/go-legs"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

var defaultPollTime = time.Hour
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can imagine testing wanting to change this


var log = logging.Logger("go-legs")

// NewHTTPSubscriber creates a legs subcriber that provides subscriptions
// from publishers identified by
func NewHTTPSubscriber(ctx context.Context, host *http.Client, publisher multiaddr.Multiaddr, lsys *ipld.LinkSystem, topic string, selector ipld.Node) (legs.LegSubscriber, error) {
url, err := toURL(publisher)
if err != nil {
return nil, err
}
hs := httpSubscriber{
host,
cid.Undef,
url,

lsys,
selector,
sync.Mutex{},
make(chan req, 1),
make([]chan cid.Cid, 1),
willscott marked this conversation as resolved.
Show resolved Hide resolved
}
go hs.background()
return &hs, nil
}

// toURL takes a multiaddr of the form:
// /dnsaddr/thing.com/http/path/to/root
// /ip/192.168.0.1/http
// TODO: doesn't support including a signing key definition yet
func toURL(ma multiaddr.Multiaddr) (string, error) {
// host should be either the dns name or the IP
_, host, err := manet.DialArgs(ma)
if err != nil {
return "", err
}
_, http := multiaddr.SplitFunc(ma, func(c multiaddr.Component) bool {
return c.Protocol().Code == multiaddr.P_HTTP
})
if len(http.Bytes()) == 0 {
return "", fmt.Errorf("publisher must be HTTP protocol for this subscriber, was: %s", ma)
}
_, path := multiaddr.SplitFirst(http)
return fmt.Sprintf("https://%s/%s", host, path), nil
}

type httpSubscriber struct {
*http.Client
head cid.Cid
root string

lsys *ipld.LinkSystem
defaultSelector ipld.Node
submtx sync.Mutex
willscott marked this conversation as resolved.
Show resolved Hide resolved
// reqs is inbound requests for syncs from `Sync` calls
reqs chan req
subs []chan cid.Cid
}

var _ (legs.LegSubscriber) = (*httpSubscriber)(nil)
willscott marked this conversation as resolved.
Show resolved Hide resolved

type req struct {
cid.Cid
Selector ipld.Node
ctx context.Context
resp chan cid.Cid
}

func (h *httpSubscriber) OnChange() (chan cid.Cid, context.CancelFunc) {
ch := make(chan cid.Cid)
h.submtx.Lock()
defer h.submtx.Unlock()
h.subs = append(h.subs, ch)
cncl := func() {
h.submtx.Lock()
defer h.submtx.Unlock()
for i, ca := range h.subs {
if ca == ch {
h.subs[i] = h.subs[len(h.subs)-1]
h.subs[len(h.subs)-1] = nil
h.subs = h.subs[:len(h.subs)-1]
close(ch)
break
}
}
}
return ch, cncl
}

// Not supported, since gossip-sub is not supported by this handler.
// `Sync` must be called explicitly to trigger a fetch instead.
func (h *httpSubscriber) SetPolicyHandler(p legs.PolicyHandler) error {
return nil
}

func (h *httpSubscriber) SetLatestSync(c cid.Cid) error {
h.submtx.Lock()
defer h.submtx.Unlock()
h.head = c
return nil
}

func (h *httpSubscriber) Sync(ctx context.Context, p peer.ID, c cid.Cid, selector ipld.Node) (chan cid.Cid, context.CancelFunc, error) {
respChan := make(chan cid.Cid, 1)
cctx, cncl := context.WithCancel(ctx)
h.submtx.Lock()
defer h.submtx.Unlock()
// todo: error if reqs is full
h.reqs <- req{
Cid: c,
Selector: selector,
ctx: cctx,
resp: respChan,
}
return respChan, cncl, nil
}

func (h *httpSubscriber) Close() error {
// cancel out subscribers.
h.Client.CloseIdleConnections()
h.submtx.Lock()
defer h.submtx.Unlock()
for _, ca := range h.subs {
close(ca)
}
h.subs = make([]chan cid.Cid, 0)
return nil
}

// background event loop for scheduling
// a. time-scheduled fetches to the provider
// b. interrupted fetches in response to synchronous 'Sync' calls.
func (h *httpSubscriber) background() {
var nextCid cid.Cid
var workResp chan cid.Cid
var ctx context.Context
var sel ipld.Node
var xsel selector.Selector
var err error
for {
select {
case r := <-h.reqs:
nextCid = r.Cid
workResp = r.resp
sel = r.Selector
ctx = r.ctx
case <-time.After(defaultPollTime):
willscott marked this conversation as resolved.
Show resolved Hide resolved
nextCid = cid.Undef
workResp = nil
ctx = context.Background()
sel = nil
}
if nextCid == cid.Undef {
nextCid, err = h.fetchHead(ctx)
}
if sel == nil {
sel = h.defaultSelector
}
if err != nil {
log.Warnf("failed to fetch new head: %s", err)
err = nil
goto next
}
sel = legs.ExploreRecursiveWithStopNode(selector.RecursionLimitNone(),
sel,
cidlink.Link{Cid: h.head})
if err := h.fetchBlock(ctx, nextCid); err != nil {
//log
goto next
}
xsel, err = selector.CompileSelector(sel)
if err != nil {
//log
goto next
}

err = h.walkFetch(ctx, nextCid, xsel)
if err != nil {
//log
goto next
}

next:
if workResp != nil {
willscott marked this conversation as resolved.
Show resolved Hide resolved
workResp <- nextCid
close(workResp)
workResp = nil
}
}
}

func (h *httpSubscriber) walkFetch(ctx context.Context, root cid.Cid, sel selector.Selector) error {
getMissingLs := cidlink.DefaultLinkSystem()
getMissingLs.TrustedStorage = true
willscott marked this conversation as resolved.
Show resolved Hide resolved
getMissingLs.StorageReadOpener = func(lc ipld.LinkContext, l ipld.Link) (io.Reader, error) {
r, err := h.lsys.StorageReadOpener(lc, l)
if err == nil {
return r, nil
}
// get.
c := l.(cidlink.Link).Cid
if err := h.fetchBlock(ctx, c); err != nil {
return nil, err
}
return h.lsys.StorageReadOpener(lc, l)
}

progress := traversal.Progress{
Cfg: &traversal.Config{
Ctx: ctx,
LinkSystem: getMissingLs,
LinkTargetNodePrototypeChooser: basicnode.Chooser,
},
Path: datamodel.NewPath([]datamodel.PathSegment{}),
LastBlock: struct {
Path datamodel.Path
Link datamodel.Link
}{
Path: ipld.Path{},
Link: nil,
},
willscott marked this conversation as resolved.
Show resolved Hide resolved
}
// get the direct node.
rootNode, err := getMissingLs.Load(ipld.LinkContext{}, cidlink.Link{Cid: root}, basicnode.Prototype.Any)
if err != nil {
return err
}
return progress.WalkAdv(rootNode, sel, func(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error {
willscott marked this conversation as resolved.
Show resolved Hide resolved
return nil
})
}

func (h *httpSubscriber) fetch(ctx context.Context, url string, cb func(io.Reader) error) error {
req, err := http.NewRequestWithContext(ctx, "GET", path.Join(h.root, url), nil)
if err != nil {
return err
}
resp, err := h.Client.Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("non success http code at %s/%s: %d", h.root, url, resp.StatusCode)
}

defer resp.Body.Close()
return cb(resp.Body)
}

func (h *httpSubscriber) fetchHead(ctx context.Context) (cid.Cid, error) {
var cidStr string
if err := h.fetch(ctx, "head", func(msg io.Reader) error {
return json.NewDecoder(msg).Decode(&cidStr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beware that you might have fallen prey to golang/go#36225 here. You should either ensure there isn't a second JSON value, or use json.Unmarshal, which does that for you.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the worry is there's extra tailing error from the server after a valid CID, and we should error rather than reading the first CID and moving on?
It feels like an edge case where i'd rather not write the longer code of reading into a buffer and then unmarshaling since the end result here is we're trusting the server to tell us the head, and if we don't trust that response there's an availability attack anyway.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@willscott I think the issue is that if there is an error with that first CID, it will not be caught:

r := strings.NewReader("{} bad data")

var m map[string]interface{}
d := json.NewDecoder(r)
if err := d.Decode(&m); err != nil {
	panic(err) // not triggered
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the first entry would need to deserialize to a valid cid.
in the example the first entry {} deserializes to a valid map.

the fact remains: we're asking the server for a head. if they give us a failure / non-understandable response we fail availability. if they're purposefully corrupting their head response it's the same net result even if we go and ask for the CID here, so i'm not seeing this as causing any additional failure modes.

Copy link
Contributor

@mvdan mvdan Oct 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this is about a valid JSON value followed by garbage (or another JSON value) instead of EOF. Unmarshal catches that as an error, while Decode silently ignores the extra garbage.

This distinction is probably not hugely important in this particular scenario, so Will's response seems reasonable. We trust the response here. I think I'd still use Unmarshal for the sake of defensive programming, but I don't think it would actually make a difference in practice, unless the server has some serious bugs.

}); err != nil {
return cid.Undef, err
}

return cid.Decode(cidStr)
}

// fetch an item into the datastore at c if not locally avilable.
func (h *httpSubscriber) fetchBlock(ctx context.Context, c cid.Cid) error {
n, err := h.lsys.Load(ipld.LinkContext{}, cidlink.Link{Cid: c}, basicnode.Prototype.Any)
// node is already present.
if n != nil && err == nil {
return nil
}

return h.fetch(ctx, c.String(), func(data io.Reader) error {
writer, committer, err := h.lsys.StorageWriteOpener(ipld.LinkContext{})
if err != nil {
return err
}
if _, err := io.Copy(writer, data); err != nil {
return err
}
return committer(cidlink.Link{Cid: c})
})
}
Loading