From 6bb1e56497afb21e3cef43b640f91c2595222ce6 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Fri, 15 Oct 2021 14:26:51 -0700 Subject: [PATCH 01/11] [wip] http-transport legs sync --- go.mod | 2 + go.sum | 2 + http/subscribe.go | 164 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+) create mode 100644 http/subscribe.go diff --git a/go.mod b/go.mod index 68c28da..f42a15f 100644 --- a/go.mod +++ b/go.mod @@ -11,8 +11,10 @@ require ( github.com/ipld/go-ipld-prime v0.12.0 github.com/libp2p/go-libp2p v0.15.0 github.com/libp2p/go-libp2p-core v0.9.0 + github.com/libp2p/go-libp2p-peer v0.2.0 github.com/libp2p/go-libp2p-pubsub v0.5.4 github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 + github.com/multiformats/go-multiaddr v0.4.0 github.com/multiformats/go-multicodec v0.3.0 github.com/whyrusleeping/cbor-gen v0.0.0-20210713220151-be142a5ae1a8 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 diff --git a/go.sum b/go.sum index b088125..0f6b8a7 100644 --- a/go.sum +++ b/go.sum @@ -598,6 +598,7 @@ github.com/libp2p/go-libp2p-core v0.8.5/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJB github.com/libp2p/go-libp2p-core v0.8.6/go.mod h1:dgHr0l0hIKfWpGpqAMbpo19pen9wJfdCGv51mTmdpmM= github.com/libp2p/go-libp2p-core v0.9.0 h1:t97Mv0LIBZlP2FXVRNKKVzHJCIjbIWGxYptGId4+htU= github.com/libp2p/go-libp2p-core v0.9.0/go.mod h1:ESsbz31oC3C1AvMJoGx26RTuCkNhmkSRCqZ0kQtJ2/8= +github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ= github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI= github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g= github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= @@ -622,6 +623,7 @@ github.com/libp2p/go-libp2p-netutil v0.1.0/go.mod h1:3Qv/aDqtMLTUyQeundkKsA+YCTh github.com/libp2p/go-libp2p-noise v0.1.1/go.mod h1:QDFLdKX7nluB7DEnlVPbz7xlLHdwHFA9HiohJRr3vwM= github.com/libp2p/go-libp2p-noise v0.2.2 h1:MRt5XGfYziDXIUy2udtMWfPmzZqUDYoC1FZoKnqPzwk= github.com/libp2p/go-libp2p-noise v0.2.2/go.mod h1:IEbYhBBzGyvdLBoxxULL/SGbJARhUeqlO8lVSREYu2Q= +github.com/libp2p/go-libp2p-peer v0.2.0 h1:EQ8kMjaCUwt/Y5uLgjT8iY2qg0mGUT0N1zUjer50DsY= github.com/libp2p/go-libp2p-peer v0.2.0/go.mod h1:RCffaCvUyW2CJmG2gAWVqwePwW7JMgxjsHm7+J5kjWY= github.com/libp2p/go-libp2p-peerstore v0.1.0/go.mod h1:2CeHkQsr8svp4fZ+Oi9ykN1HBb6u0MOvdJ7YIsmcwtY= github.com/libp2p/go-libp2p-peerstore v0.1.3/go.mod h1:BJ9sHlm59/80oSkpWgr1MyY1ciXAXV397W6h1GH/uKI= diff --git a/http/subscribe.go b/http/subscribe.go new file mode 100644 index 0000000..bdd5247 --- /dev/null +++ b/http/subscribe.go @@ -0,0 +1,164 @@ +package http + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "path" + "sync" + + "github.com/filecoin-project/go-legs" + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + basicnode "github.com/ipld/go-ipld-prime/node/basic" + peer "github.com/libp2p/go-libp2p-peer" + "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" + "github.com/multiformats/go-multicodec" +) + +// NewHTTPSubscriber creates a legs subcriber that provides subscriptions +// from publishers identified by +func NewHTTPSubscriber(ctx context.Context, host *http.Client, publisher multiaddr.Multiaddr, lsys *ipld.LinkSystem, topic string) (legs.LegSubscriber, error) { + url, err := toURL(publisher) + if err != nil { + return nil, err + } + hs := httpSubscriber{ + host, + cid.Undef, + url, + + lsys, + sync.Mutex{}, + make([]chan cid.Cid, 1), + } + return &hs, nil +} + +// toURL takes a multiaddr of the form: +// /dnsaddr/thing.com/http/path/to/root +// /ip/192.168.0.1/http +// TODO: doesn't support including a signing key definition yet +func toURL(ma multiaddr.Multiaddr) (string, error) { + // host should be either the dns name or the IP + _, host, err := manet.DialArgs(ma) + if err != nil { + return "", err + } + _, http := multiaddr.SplitFunc(ma, func(c multiaddr.Component) bool { + return c.Protocol().Code == multiaddr.P_HTTP + }) + if len(http.Bytes()) == 0 { + return "", fmt.Errorf("publisher must be HTTP protocol for this subscriber, was: %s", ma) + } + _, path := multiaddr.SplitFirst(http) + return fmt.Sprintf("https://%s/%s", host, path), nil +} + +type httpSubscriber struct { + *http.Client + head cid.Cid + root string + + lsys *ipld.LinkSystem + submtx sync.Mutex + subs []chan cid.Cid +} + +func (h *httpSubscriber) OnChange() (chan cid.Cid, context.CancelFunc) { + ch := make(chan cid.Cid) + h.submtx.Lock() + defer h.submtx.Unlock() + h.subs = append(h.subs, ch) + cncl := func() { + h.submtx.Lock() + defer h.submtx.Unlock() + for i, ca := range h.subs { + if ca == ch { + h.subs[i] = h.subs[len(h.subs)-1] + h.subs[len(h.subs)-1] = nil + h.subs = h.subs[:len(h.subs)-1] + close(ch) + break + } + } + } + return ch, cncl +} + +// Not supported, since gossip-sub is not supported by this handler. +// `Sync` must be called explicitly to trigger a fetch instead. +func (h *httpSubscriber) SetPolicyHandler(p legs.PolicyHandler) error { + return nil +} + +func (h *httpSubscriber) SetLatestSync(c cid.Cid) error { + h.head = c + return nil +} + +func (h *httpSubscriber) Sync(ctx context.Context, p peer.ID, c cid.Cid) (chan cid.Cid, context.CancelFunc, error) { + return nil, nil, nil +} + +func (h *httpSubscriber) Close() error { + return nil +} + +func (h *httpSubscriber) fetch(url string, cb func(io.Reader) error) error { + resp, err := h.Client.Get(path.Join(h.root, url)) + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("non success http code at %s/%s: %d", h.root, url, resp.StatusCode) + } + + defer resp.Body.Close() + return cb(resp.Body) +} + +func (h *httpSubscriber) fetchHead() (cid.Cid, error) { + var cidStr string + if err := h.fetch("head", func(msg io.Reader) error { + return json.NewDecoder(msg).Decode(&cidStr) + }); err != nil { + return cid.Undef, err + } + + return cid.Decode(cidStr) +} + +// fetch an item into the datastore at c if not locally avilable. +func (h *httpSubscriber) fetchDag(c cid.Cid) error { + n, err := h.lsys.Load(ipld.LinkContext{}, cidlink.Link{Cid: c}, basicnode.Prototype.Any) + // node is already present. + if n != nil && err == nil { + return nil + } + + return h.fetch(c.String(), func(data io.Reader) error { + buf := bytes.NewBuffer(nil) + if _, err := io.Copy(buf, data); err != nil { + return err + } + b := basicnode.NewBytes(buf.Bytes()) + // we're storing just as 'raw', but will later interpret with c's codec + pref := c.Prefix() + pref.Codec = uint64(multicodec.Raw) + ec, err := h.lsys.Store(ipld.LinkContext{}, cidlink.LinkPrototype{Prefix: pref}, b) + if err != nil { + return err + } + + if ec.(cidlink.Link).Cid.Hash().B58String() != c.Hash().B58String() { + return fmt.Errorf("content did not match expected hash: %s vs %s", ec, c) + } + return nil + }) +} From a030dec50b32b18093880482e3c5867e1a634363 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 20 Oct 2021 17:00:28 -0700 Subject: [PATCH 02/11] update subscribe for selector traversal in both HTTP and graphsync --- http/subscribe.go | 186 ++++++++++++++++++++++++++++++++++++++--- interface.go | 10 ++- legs_test.go | 2 +- multisubscribe.go | 30 +++---- multisubscribe_test.go | 4 +- selector.go | 15 +++- subscribe.go | 44 ++++++---- sync_test.go | 16 ++-- 8 files changed, 247 insertions(+), 60 deletions(-) diff --git a/http/subscribe.go b/http/subscribe.go index bdd5247..2a35f9b 100644 --- a/http/subscribe.go +++ b/http/subscribe.go @@ -9,21 +9,30 @@ import ( "net/http" "path" "sync" + "time" "github.com/filecoin-project/go-legs" "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" basicnode "github.com/ipld/go-ipld-prime/node/basic" + "github.com/ipld/go-ipld-prime/traversal" + "github.com/ipld/go-ipld-prime/traversal/selector" peer "github.com/libp2p/go-libp2p-peer" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/multiformats/go-multicodec" ) +var defaultPollTime = time.Hour + +var log = logging.Logger("go-legs") + // NewHTTPSubscriber creates a legs subcriber that provides subscriptions // from publishers identified by -func NewHTTPSubscriber(ctx context.Context, host *http.Client, publisher multiaddr.Multiaddr, lsys *ipld.LinkSystem, topic string) (legs.LegSubscriber, error) { +func NewHTTPSubscriber(ctx context.Context, host *http.Client, publisher multiaddr.Multiaddr, lsys *ipld.LinkSystem, topic string, selector ipld.Node) (legs.LegSubscriber, error) { url, err := toURL(publisher) if err != nil { return nil, err @@ -34,9 +43,12 @@ func NewHTTPSubscriber(ctx context.Context, host *http.Client, publisher multiad url, lsys, + selector, sync.Mutex{}, + make(chan req, 1), make([]chan cid.Cid, 1), } + go hs.background() return &hs, nil } @@ -65,9 +77,19 @@ type httpSubscriber struct { head cid.Cid root string - lsys *ipld.LinkSystem - submtx sync.Mutex - subs []chan cid.Cid + lsys *ipld.LinkSystem + defaultSelector ipld.Node + submtx sync.Mutex + // reqs is inbound requests for syncs from `Sync` calls + reqs chan req + subs []chan cid.Cid +} + +type req struct { + cid.Cid + Selector ipld.Node + ctx context.Context + resp chan cid.Cid } func (h *httpSubscriber) OnChange() (chan cid.Cid, context.CancelFunc) { @@ -98,20 +120,160 @@ func (h *httpSubscriber) SetPolicyHandler(p legs.PolicyHandler) error { } func (h *httpSubscriber) SetLatestSync(c cid.Cid) error { + h.submtx.Lock() + defer h.submtx.Unlock() h.head = c return nil } -func (h *httpSubscriber) Sync(ctx context.Context, p peer.ID, c cid.Cid) (chan cid.Cid, context.CancelFunc, error) { - return nil, nil, nil +func (h *httpSubscriber) Sync(ctx context.Context, p peer.ID, c cid.Cid, selector ipld.Node) (chan cid.Cid, context.CancelFunc, error) { + respChan := make(chan cid.Cid, 1) + cctx, cncl := context.WithCancel(ctx) + h.submtx.Lock() + defer h.submtx.Unlock() + // todo: error if reqs is full + h.reqs <- req{ + Cid: c, + Selector: selector, + ctx: cctx, + resp: respChan, + } + return respChan, cncl, nil } func (h *httpSubscriber) Close() error { + // cancel out subscribers. + h.Client.CloseIdleConnections() + h.submtx.Lock() + defer h.submtx.Unlock() + for _, ca := range h.subs { + close(ca) + } + h.subs = make([]chan cid.Cid, 0) return nil } -func (h *httpSubscriber) fetch(url string, cb func(io.Reader) error) error { - resp, err := h.Client.Get(path.Join(h.root, url)) +// background event loop for scheduling +// a. time-scheduled fetches to the provider +// b. interrupted fetches in response to synchronous 'Sync' calls. +func (h *httpSubscriber) background() { + var nextCid cid.Cid + var workResp chan cid.Cid + var ctx context.Context + var sel ipld.Node + var xsel selector.Selector + var err error + for { + select { + case r := <-h.reqs: + nextCid = r.Cid + workResp = r.resp + sel = r.Selector + ctx = r.ctx + case <-time.After(defaultPollTime): + nextCid = cid.Undef + workResp = nil + ctx = context.Background() + sel = nil + } + if nextCid == cid.Undef { + nextCid, err = h.fetchHead(ctx) + } + if sel == nil { + sel = h.defaultSelector + } + if err != nil { + log.Warnf("failed to fetch new head: %s", err) + err = nil + goto next + } + sel = legs.ExploreRecursiveWithStopNode(selector.RecursionLimitNone(), + sel, + cidlink.Link{Cid: h.head}) + if err := h.fetchBlock(ctx, nextCid); err != nil { + //log + goto next + } + xsel, err = selector.CompileSelector(sel) + if err != nil { + //log + goto next + } + + err = h.walkFetch(ctx, nextCid, xsel) + if err != nil { + //log + goto next + } + + next: + if workResp != nil { + workResp <- nextCid + close(workResp) + workResp = nil + } + } +} + +func (h *httpSubscriber) walkFetch(ctx context.Context, root cid.Cid, sel selector.Selector) error { + getMissingLs := cidlink.DefaultLinkSystem() + getMissingLs.TrustedStorage = true + getMissingLs.StorageReadOpener = func(lc ipld.LinkContext, l ipld.Link) (io.Reader, error) { + r, err := h.lsys.StorageReadOpener(lc, l) + if err == nil { + return r, nil + } + // get. + writer, committer, err := h.lsys.StorageWriteOpener(lc) + if err != nil { + return nil, err + } + c := l.(cidlink.Link).Cid + if err := h.fetch(ctx, c.String(), func(r io.Reader) error { + if _, err := io.Copy(writer, r); err != nil { + return err + } + return nil + }); err != nil { + return nil, err + } + if err := committer(l); err != nil { + return nil, err + } + return h.lsys.StorageReadOpener(lc, l) + } + + progress := traversal.Progress{ + Cfg: &traversal.Config{ + Ctx: ctx, + LinkSystem: getMissingLs, + LinkTargetNodePrototypeChooser: basicnode.Chooser, + }, + Path: datamodel.NewPath([]datamodel.PathSegment{}), + LastBlock: struct { + Path datamodel.Path + Link datamodel.Link + }{ + Path: ipld.Path{}, + Link: nil, + }, + } + // get the direct node. + rootNode, err := getMissingLs.Load(ipld.LinkContext{}, cidlink.Link{Cid: root}, basicnode.Prototype.Any) + if err != nil { + return err + } + return progress.WalkAdv(rootNode, sel, func(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { + return nil + }) +} + +func (h *httpSubscriber) fetch(ctx context.Context, url string, cb func(io.Reader) error) error { + req, err := http.NewRequestWithContext(ctx, "GET", path.Join(h.root, url), nil) + if err != nil { + return err + } + resp, err := h.Client.Do(req) if err != nil { return err } @@ -123,9 +285,9 @@ func (h *httpSubscriber) fetch(url string, cb func(io.Reader) error) error { return cb(resp.Body) } -func (h *httpSubscriber) fetchHead() (cid.Cid, error) { +func (h *httpSubscriber) fetchHead(ctx context.Context) (cid.Cid, error) { var cidStr string - if err := h.fetch("head", func(msg io.Reader) error { + if err := h.fetch(ctx, "head", func(msg io.Reader) error { return json.NewDecoder(msg).Decode(&cidStr) }); err != nil { return cid.Undef, err @@ -135,14 +297,14 @@ func (h *httpSubscriber) fetchHead() (cid.Cid, error) { } // fetch an item into the datastore at c if not locally avilable. -func (h *httpSubscriber) fetchDag(c cid.Cid) error { +func (h *httpSubscriber) fetchBlock(ctx context.Context, c cid.Cid) error { n, err := h.lsys.Load(ipld.LinkContext{}, cidlink.Link{Cid: c}, basicnode.Prototype.Any) // node is already present. if n != nil && err == nil { return nil } - return h.fetch(c.String(), func(data io.Reader) error { + return h.fetch(ctx, c.String(), func(data io.Reader) error { buf := bytes.NewBuffer(nil) if _, err := io.Copy(buf, data); err != nil { return err diff --git a/interface.go b/interface.go index e53390f..6a3f275 100644 --- a/interface.go +++ b/interface.go @@ -4,6 +4,7 @@ import ( "context" "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" ) @@ -29,12 +30,15 @@ type LegSubscriber interface { OnChange() (chan cid.Cid, context.CancelFunc) // SetPolicyHandler triggered to know if an exchange needs to be made. SetPolicyHandler(PolicyHandler) error - // SetLatestSync updates the latest sync of a subcriber in case it has - // already update some data off-band. + // SetLatestSync updates the latest sync of a subcriber to account for + // out of band updates. SetLatestSync(c cid.Cid) error // Sync to a specific Cid of a peer's DAG without having to wait for a // publication. - Sync(ctx context.Context, p peer.ID, c cid.Cid) (chan cid.Cid, context.CancelFunc, error) + // Returns a channel that will resolve with the same cid, c that was passed as input when + // the dag from 'c' is available, a function to cancel, or a synchronous error if + // the subscriber is not in a state where it can sync from the specified peer. + Sync(ctx context.Context, p peer.ID, c cid.Cid, s ipld.Node) (chan cid.Cid, context.CancelFunc, error) // Close subscriber Close() error } diff --git a/legs_test.go b/legs_test.go index 12baf50..a4c5fee 100644 --- a/legs_test.go +++ b/legs_test.go @@ -63,7 +63,7 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host, t.Fatal(err) } dstLnkS := mkLinkSystem(dstStore) - ls, err := legs.NewSubscriber(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic") + ls, err := legs.NewSubscriber(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic", nil) if err != nil { t.Fatal(err) } diff --git a/multisubscribe.go b/multisubscribe.go index 6265e0c..b967459 100644 --- a/multisubscribe.go +++ b/multisubscribe.go @@ -32,16 +32,17 @@ type LegMultiSubscriber interface { } type legMultiSubscriber struct { - ctx context.Context - tmpDir string - t dt.Manager - gs graphsync.GraphExchange - topic *pubsub.Topic - refc int32 + ctx context.Context + tmpDir string + t dt.Manager + gs graphsync.GraphExchange + topic *pubsub.Topic + refc int32 + defaultSelector ipld.Node } // NewMultiSubscriber sets up a new instance of a multi subscriber -func NewMultiSubscriber(ctx context.Context, host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, topic string) (LegMultiSubscriber, error) { +func NewMultiSubscriber(ctx context.Context, host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, topic string, selector ipld.Node) (LegMultiSubscriber, error) { t, err := makePubsub(ctx, host, topic) if err != nil { return nil, err @@ -53,17 +54,18 @@ func NewMultiSubscriber(ctx context.Context, host host.Host, ds datastore.Batchi } return &legMultiSubscriber{ - ctx: ctx, - tmpDir: tmpDir, - t: dt, - gs: gs, - topic: t, + ctx: ctx, + tmpDir: tmpDir, + t: dt, + gs: gs, + topic: t, + defaultSelector: selector, }, nil } func (lt *legMultiSubscriber) NewSubscriber(policy PolicyHandler) (LegSubscriber, error) { - l, err := newSubscriber(lt.ctx, lt.t, lt.topic, lt.onCloseSubscriber, policy) + l, err := newSubscriber(lt.ctx, lt.t, lt.topic, lt.onCloseSubscriber, policy, lt.defaultSelector) if err != nil { return nil, err } @@ -72,7 +74,7 @@ func (lt *legMultiSubscriber) NewSubscriber(policy PolicyHandler) (LegSubscriber } func (lt *legMultiSubscriber) NewSubscriberPartiallySynced(policy PolicyHandler, latestSync cid.Cid) (LegSubscriber, error) { - l, err := newSubscriber(lt.ctx, lt.t, lt.topic, lt.onCloseSubscriber, policy) + l, err := newSubscriber(lt.ctx, lt.t, lt.topic, lt.onCloseSubscriber, policy, lt.defaultSelector) if err != nil { return nil, err } diff --git a/multisubscribe_test.go b/multisubscribe_test.go index 0707b05..10af502 100644 --- a/multisubscribe_test.go +++ b/multisubscribe_test.go @@ -44,7 +44,7 @@ func TestMultiSusbscribeRoundTrip(t *testing.T) { t.Fatal(err) } dstLnkS := mkLinkSystem(dstStore) - ms, err := legs.NewMultiSubscriber(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic") + ms, err := legs.NewMultiSubscriber(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic", nil) if err != nil { t.Fatal(err) } @@ -117,7 +117,7 @@ func TestCloseTransport(t *testing.T) { st := dssync.MutexWrap(datastore.NewMapDatastore()) sh := mkTestHost() lsys := mkLinkSystem(st) - ms, err := legs.NewMultiSubscriber(context.Background(), sh, st, lsys, "legs/testtopic") + ms, err := legs.NewMultiSubscriber(context.Background(), sh, st, lsys, "legs/testtopic", nil) if err != nil { t.Fatal(err) } diff --git a/selector.go b/selector.go index 2357323..b2bcd7f 100644 --- a/selector.go +++ b/selector.go @@ -12,6 +12,18 @@ import ( // until the link stopLnk is seen. It prevents from having to sync DAGs from // scratch with every update. func ExploreRecursiveWithStop(limit selector.RecursionLimit, sequence selectorbuilder.SelectorSpec, stopLnk ipld.Link) ipld.Node { + return ExploreRecursiveWithStopNode(limit, sequence.Node(), stopLnk) +} + +// ExploreRecursiveWithStopNode builds a selector that recursively syncs a DAG +// until the link stopLnk is seen. It prevents from having to sync DAGs from +// scratch with every update. +func ExploreRecursiveWithStopNode(limit selector.RecursionLimit, sequence ipld.Node, stopLnk ipld.Link) ipld.Node { + if sequence == nil { + np := basicnode.Prototype__Any{} + ssb := selectorbuilder.NewSelectorSpecBuilder(np) + sequence = ssb.ExploreAll(ssb.ExploreRecursiveEdge()).Node() + } np := basicnode.Prototype__Map{} return fluent.MustBuildMap(np, 1, func(na fluent.MapAssembler) { // RecursionLimit @@ -27,7 +39,7 @@ func ExploreRecursiveWithStop(limit selector.RecursionLimit, sequence selectorbu } }) // Sequence - na.AssembleEntry(selector.SelectorKey_Sequence).AssignNode(sequence.Node()) + na.AssembleEntry(selector.SelectorKey_Sequence).AssignNode(sequence) // Stop condition if stopLnk != nil { @@ -38,7 +50,6 @@ func ExploreRecursiveWithStop(limit selector.RecursionLimit, sequence selectorbu } }) }) - } // LegSelector is a convenient function that returns the selector diff --git a/subscribe.go b/subscribe.go index a19aeff..6aecc76 100644 --- a/subscribe.go +++ b/subscribe.go @@ -10,6 +10,7 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -27,8 +28,9 @@ type legSubscriber struct { subs []chan cid.Cid cancel context.CancelFunc - hndmtx sync.RWMutex - policy PolicyHandler + hndmtx sync.RWMutex + policy PolicyHandler + defaultSelector ipld.Node syncmtx sync.Mutex latestSync ipld.Link @@ -41,12 +43,13 @@ func NewSubscriber(ctx context.Context, host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, - topic string) (LegSubscriber, error) { + topic string, + selector ipld.Node) (LegSubscriber, error) { ss, err := newSimpleSetup(ctx, host, ds, lsys, topic) if err != nil { return nil, err } - return newSubscriber(ctx, ss.dt, ss.t, ss.onClose, nil) + return newSubscriber(ctx, ss.dt, ss.t, ss.onClose, nil, selector) } // NewSubscriberPartiallySynced creates a new leg subscriber with a specific latestSync. @@ -58,12 +61,13 @@ func NewSubscriberPartiallySynced( ds datastore.Batching, lsys ipld.LinkSystem, topic string, - latestSync cid.Cid) (LegSubscriber, error) { + latestSync cid.Cid, + selector ipld.Node) (LegSubscriber, error) { ss, err := newSimpleSetup(ctx, host, ds, lsys, topic) if err != nil { return nil, err } - l, err := newSubscriber(ctx, ss.dt, ss.t, ss.onClose, nil) + l, err := newSubscriber(ctx, ss.dt, ss.t, ss.onClose, nil, selector) if err != nil { return nil, err } @@ -75,16 +79,16 @@ func NewSubscriberPartiallySynced( return l, nil } -func newSubscriber(ctx context.Context, dt dt.Manager, topic *pubsub.Topic, onClose func() error, policy PolicyHandler) (*legSubscriber, error) { - +func newSubscriber(ctx context.Context, dt dt.Manager, topic *pubsub.Topic, onClose func() error, policy PolicyHandler, selector ipld.Node) (*legSubscriber, error) { ls := &legSubscriber{ - dt: dt, - topic: topic, - onClose: onClose, - updates: make(chan cid.Cid, 5), - subs: make([]chan cid.Cid, 0), - cancel: nil, - policy: policy, + dt: dt, + topic: topic, + onClose: onClose, + updates: make(chan cid.Cid, 5), + subs: make([]chan cid.Cid, 0), + cancel: nil, + policy: policy, + defaultSelector: selector, } // Start subscription @@ -195,7 +199,11 @@ func (ls *legSubscriber) watch(ctx context.Context, sub *pubsub.Subscription) { ls.syncmtx.Lock() log.Debugf("Starting data channel (cid: %s, latestSync: %s)", c, ls.latestSync) ls.syncing = c - _, err = ls.dt.OpenPullDataChannel(ctx, src, &v, c, LegSelector(ls.latestSync)) + _, err = ls.dt.OpenPullDataChannel(ctx, src, &v, c, + ExploreRecursiveWithStopNode( + selector.RecursionLimitNone(), + ls.defaultSelector, + ls.latestSync)) if err != nil { // Log error for now. log.Errorf("Error in data channel: %v", err) @@ -255,7 +263,7 @@ func (ls *legSubscriber) unlockOnce(ulOnce *sync.Once) { ulOnce.Do(ls.syncmtx.Unlock) } -func (ls *legSubscriber) Sync(ctx context.Context, p peer.ID, c cid.Cid) (chan cid.Cid, context.CancelFunc, error) { +func (ls *legSubscriber) Sync(ctx context.Context, p peer.ID, c cid.Cid, s ipld.Node) (chan cid.Cid, context.CancelFunc, error) { out := make(chan cid.Cid) v := Voucher{&c} var ulOnce sync.Once @@ -263,7 +271,7 @@ func (ls *legSubscriber) Sync(ctx context.Context, p peer.ID, c cid.Cid) (chan c ls.syncmtx.Lock() unsub := ls.dt.SubscribeToEvents(ls.onSyncEvent(c, out, &ulOnce)) - _, err := ls.dt.OpenPullDataChannel(ctx, p, &v, c, LegSelector(ls.latestSync)) + _, err := ls.dt.OpenPullDataChannel(ctx, p, &v, c, s) if err != nil { log.Errorf("Error in data channel for sync: %v", err) ls.syncmtx.Unlock() diff --git a/sync_test.go b/sync_test.go index 604b5d8..e7877f1 100644 --- a/sync_test.go +++ b/sync_test.go @@ -144,7 +144,7 @@ func TestLatestSyncSuccess(t *testing.T) { t.Fatal(err) } dstLnkS := mkLinkSystem(dstStore) - ls, err := NewSubscriber(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic") + ls, err := NewSubscriber(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic", nil) if err != nil { t.Fatal(err) } @@ -180,7 +180,7 @@ func TestSyncFn(t *testing.T) { } dstLnkS := mkLinkSystem(dstStore) - ls, err := NewSubscriber(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic") + ls, err := NewSubscriber(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic", nil) if err != nil { t.Fatal(err) } @@ -196,7 +196,7 @@ func TestSyncFn(t *testing.T) { // Try to sync with a non-existing cid, and cancel right away. // This is to check that we unlock syncmtx if the exchange is cancelled. cids, _ := RandomCids(1) - _, syncncl, err := ls.Sync(context.Background(), srcHost.ID(), cids[0]) + _, syncncl, err := ls.Sync(context.Background(), srcHost.ID(), cids[0], LegSelector(nil)) if err != nil { t.Fatal(err) } @@ -206,7 +206,7 @@ func TestSyncFn(t *testing.T) { lnk := chainLnks[1] lsT := ls.(*legSubscriber) // Proactively sync with publisher without him publishing to gossipsub channel. - out, syncncl, err := ls.Sync(context.Background(), srcHost.ID(), lnk.(cidlink.Link).Cid) + out, syncncl, err := ls.Sync(context.Background(), srcHost.ID(), lnk.(cidlink.Link).Cid, LegSelector(nil)) if err != nil { t.Fatal(err) } @@ -251,7 +251,7 @@ func TestPartialSync(t *testing.T) { t.Fatal(err) } dstLnkS := mkLinkSystem(dstStore) - ls, err := NewSubscriberPartiallySynced(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic", chainLnks[3].(cidlink.Link).Cid) + ls, err := NewSubscriberPartiallySynced(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic", chainLnks[3].(cidlink.Link).Cid, nil) if err != nil { t.Fatal(err) } @@ -306,7 +306,7 @@ func TestStepByStepSync(t *testing.T) { t.Fatal(err) } dstLnkS := mkLinkSystem(dstStore) - ls, err := NewSubscriber(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic") + ls, err := NewSubscriber(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic", nil) if err != nil { t.Fatal(err) } @@ -349,7 +349,7 @@ func TestLatestSyncFailure(t *testing.T) { t.Fatal(err) } dstLnkS := mkLinkSystem(dstStore) - ls, err := NewSubscriberPartiallySynced(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic", chainLnks[3].(cidlink.Link).Cid) + ls, err := NewSubscriberPartiallySynced(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic", chainLnks[3].(cidlink.Link).Cid, nil) if err != nil { t.Fatal(err) } @@ -360,7 +360,7 @@ func TestLatestSyncFailure(t *testing.T) { // The other end doesn't have the data newUpdateTest(t, lp, ls, dstStore, watcher, cidlink.Link{Cid: cid.Undef}, true, chainLnks[3].(cidlink.Link).Cid) dstStore = dssync.MutexWrap(datastore.NewMapDatastore()) - ls, err = NewSubscriberPartiallySynced(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic", chainLnks[3].(cidlink.Link).Cid) + ls, err = NewSubscriberPartiallySynced(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic", chainLnks[3].(cidlink.Link).Cid, nil) if err != nil { t.Fatal(err) } From e09d61d4e465c66558998335cc5996706d01a55d Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 20 Oct 2021 17:08:37 -0700 Subject: [PATCH 03/11] simplify --- http/subscribe.go | 33 +++++---------------------------- 1 file changed, 5 insertions(+), 28 deletions(-) diff --git a/http/subscribe.go b/http/subscribe.go index 2a35f9b..0417184 100644 --- a/http/subscribe.go +++ b/http/subscribe.go @@ -1,7 +1,6 @@ package http import ( - "bytes" "context" "encoding/json" "fmt" @@ -23,7 +22,6 @@ import ( peer "github.com/libp2p/go-libp2p-peer" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" - "github.com/multiformats/go-multicodec" ) var defaultPollTime = time.Hour @@ -224,20 +222,8 @@ func (h *httpSubscriber) walkFetch(ctx context.Context, root cid.Cid, sel select return r, nil } // get. - writer, committer, err := h.lsys.StorageWriteOpener(lc) - if err != nil { - return nil, err - } c := l.(cidlink.Link).Cid - if err := h.fetch(ctx, c.String(), func(r io.Reader) error { - if _, err := io.Copy(writer, r); err != nil { - return err - } - return nil - }); err != nil { - return nil, err - } - if err := committer(l); err != nil { + if err := h.fetchBlock(ctx, c); err != nil { return nil, err } return h.lsys.StorageReadOpener(lc, l) @@ -305,22 +291,13 @@ func (h *httpSubscriber) fetchBlock(ctx context.Context, c cid.Cid) error { } return h.fetch(ctx, c.String(), func(data io.Reader) error { - buf := bytes.NewBuffer(nil) - if _, err := io.Copy(buf, data); err != nil { - return err - } - b := basicnode.NewBytes(buf.Bytes()) - // we're storing just as 'raw', but will later interpret with c's codec - pref := c.Prefix() - pref.Codec = uint64(multicodec.Raw) - ec, err := h.lsys.Store(ipld.LinkContext{}, cidlink.LinkPrototype{Prefix: pref}, b) + writer, committer, err := h.lsys.StorageWriteOpener(ipld.LinkContext{}) if err != nil { return err } - - if ec.(cidlink.Link).Cid.Hash().B58String() != c.Hash().B58String() { - return fmt.Errorf("content did not match expected hash: %s vs %s", ec, c) + if _, err := io.Copy(writer, data); err != nil { + return err } - return nil + return committer(cidlink.Link{Cid: c}) }) } From 9fb6e33ddef82cfdd4d52080f825a08bde37bb4a Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 20 Oct 2021 19:04:27 -0700 Subject: [PATCH 04/11] add publisher complement to subscriber --- http/publish.go | 85 +++++++++++++++++++++++++++++++++++++++++++++++ http/subscribe.go | 2 ++ 2 files changed, 87 insertions(+) create mode 100644 http/publish.go diff --git a/http/publish.go b/http/publish.go new file mode 100644 index 0000000..17fbd7b --- /dev/null +++ b/http/publish.go @@ -0,0 +1,85 @@ +package http + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "path" + "sync" + + "github.com/filecoin-project/go-legs" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagjson" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + basicnode "github.com/ipld/go-ipld-prime/node/basic" +) + +type httpPublisher struct { + rl sync.RWMutex + root cid.Cid + lsys ipld.LinkSystem +} + +var _ legs.LegPublisher = (*httpPublisher)(nil) +var _ http.Handler = (*httpPublisher)(nil) + +// NewPublisher creates a new http publisher +func NewPublisher(ctx context.Context, + ds datastore.Batching, + lsys ipld.LinkSystem) (legs.LegPublisher, error) { + hp := &httpPublisher{} + hp.lsys = lsys + return hp, nil +} + +func (h *httpPublisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ask := path.Base(r.URL.Path) + if ask == "head" { + // serve the + h.rl.RLock() + defer h.rl.RUnlock() + out, err := json.Marshal(h.root.String()) + if err != nil { + w.WriteHeader(500) + //todo: log + } else { + _, _ = w.Write(out) + } + return + } + // interpret `ask` as a CID to serve. + c, err := cid.Parse(ask) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("invalid request: not a cid")) + return + } + item, err := h.lsys.Load(ipld.LinkContext{}, cidlink.Link{Cid: c}, basicnode.Prototype.Any) + if err != nil { + if errors.Is(err, ipld.ErrNotExists{}) { + w.WriteHeader(http.StatusNotFound) + w.Write([]byte("cid not found")) + return + } + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("unable to load data for cid")) + // todo: log + return + } + // marshal to json and serve. + _ = dagjson.Encode(item, w) +} + +func (h *httpPublisher) UpdateRoot(ctx context.Context, c cid.Cid) error { + h.rl.Lock() + defer h.rl.Unlock() + h.root = c + return nil +} + +func (h *httpPublisher) Close() error { + return nil +} diff --git a/http/subscribe.go b/http/subscribe.go index 0417184..77d0c45 100644 --- a/http/subscribe.go +++ b/http/subscribe.go @@ -83,6 +83,8 @@ type httpSubscriber struct { subs []chan cid.Cid } +var _ (legs.LegSubscriber) = (*httpSubscriber)(nil) + type req struct { cid.Cid Selector ipld.Node From b111cb4d1010002468430277f00c290df5f536c9 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 20 Oct 2021 19:07:33 -0700 Subject: [PATCH 05/11] correct mod --- go.mod | 1 - go.sum | 2 -- http/subscribe.go | 2 +- 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/go.mod b/go.mod index f42a15f..5e0328c 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/ipld/go-ipld-prime v0.12.0 github.com/libp2p/go-libp2p v0.15.0 github.com/libp2p/go-libp2p-core v0.9.0 - github.com/libp2p/go-libp2p-peer v0.2.0 github.com/libp2p/go-libp2p-pubsub v0.5.4 github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 github.com/multiformats/go-multiaddr v0.4.0 diff --git a/go.sum b/go.sum index 0f6b8a7..b088125 100644 --- a/go.sum +++ b/go.sum @@ -598,7 +598,6 @@ github.com/libp2p/go-libp2p-core v0.8.5/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJB github.com/libp2p/go-libp2p-core v0.8.6/go.mod h1:dgHr0l0hIKfWpGpqAMbpo19pen9wJfdCGv51mTmdpmM= github.com/libp2p/go-libp2p-core v0.9.0 h1:t97Mv0LIBZlP2FXVRNKKVzHJCIjbIWGxYptGId4+htU= github.com/libp2p/go-libp2p-core v0.9.0/go.mod h1:ESsbz31oC3C1AvMJoGx26RTuCkNhmkSRCqZ0kQtJ2/8= -github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ= github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI= github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g= github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= @@ -623,7 +622,6 @@ github.com/libp2p/go-libp2p-netutil v0.1.0/go.mod h1:3Qv/aDqtMLTUyQeundkKsA+YCTh github.com/libp2p/go-libp2p-noise v0.1.1/go.mod h1:QDFLdKX7nluB7DEnlVPbz7xlLHdwHFA9HiohJRr3vwM= github.com/libp2p/go-libp2p-noise v0.2.2 h1:MRt5XGfYziDXIUy2udtMWfPmzZqUDYoC1FZoKnqPzwk= github.com/libp2p/go-libp2p-noise v0.2.2/go.mod h1:IEbYhBBzGyvdLBoxxULL/SGbJARhUeqlO8lVSREYu2Q= -github.com/libp2p/go-libp2p-peer v0.2.0 h1:EQ8kMjaCUwt/Y5uLgjT8iY2qg0mGUT0N1zUjer50DsY= github.com/libp2p/go-libp2p-peer v0.2.0/go.mod h1:RCffaCvUyW2CJmG2gAWVqwePwW7JMgxjsHm7+J5kjWY= github.com/libp2p/go-libp2p-peerstore v0.1.0/go.mod h1:2CeHkQsr8svp4fZ+Oi9ykN1HBb6u0MOvdJ7YIsmcwtY= github.com/libp2p/go-libp2p-peerstore v0.1.3/go.mod h1:BJ9sHlm59/80oSkpWgr1MyY1ciXAXV397W6h1GH/uKI= diff --git a/http/subscribe.go b/http/subscribe.go index 77d0c45..42a9f96 100644 --- a/http/subscribe.go +++ b/http/subscribe.go @@ -19,7 +19,7 @@ import ( basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" - peer "github.com/libp2p/go-libp2p-peer" + peer "github.com/libp2p/go-libp2p-core/peer" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" ) From 9d2e9158f34ad03c3049f4a5db175433666546a7 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Thu, 21 Oct 2021 13:06:45 -0700 Subject: [PATCH 06/11] code review --- http/subscribe.go | 36 +++++++++++++++++++----------------- subscribe.go | 2 +- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/http/subscribe.go b/http/subscribe.go index 42a9f96..457a4e5 100644 --- a/http/subscribe.go +++ b/http/subscribe.go @@ -163,14 +163,26 @@ func (h *httpSubscriber) background() { var sel ipld.Node var xsel selector.Selector var err error + defaultRate := time.NewTimer(defaultPollTime) for { + // finish up from previous iteration + if workResp != nil { + workResp <- nextCid + close(workResp) + workResp = nil + } + if !defaultRate.Stop() { + <-defaultRate.C + } + defaultRate.Reset(defaultPollTime) select { + case r := <-h.reqs: nextCid = r.Cid workResp = r.resp sel = r.Selector ctx = r.ctx - case <-time.After(defaultPollTime): + case <-defaultRate.C: nextCid = cid.Undef workResp = nil ctx = context.Background() @@ -185,32 +197,25 @@ func (h *httpSubscriber) background() { if err != nil { log.Warnf("failed to fetch new head: %s", err) err = nil - goto next + continue } sel = legs.ExploreRecursiveWithStopNode(selector.RecursionLimitNone(), sel, cidlink.Link{Cid: h.head}) if err := h.fetchBlock(ctx, nextCid); err != nil { //log - goto next + continue } xsel, err = selector.CompileSelector(sel) if err != nil { //log - goto next + continue } err = h.walkFetch(ctx, nextCid, xsel) if err != nil { //log - goto next - } - - next: - if workResp != nil { - workResp <- nextCid - close(workResp) - workResp = nil + continue } } } @@ -241,17 +246,14 @@ func (h *httpSubscriber) walkFetch(ctx context.Context, root cid.Cid, sel select LastBlock: struct { Path datamodel.Path Link datamodel.Link - }{ - Path: ipld.Path{}, - Link: nil, - }, + }{}, } // get the direct node. rootNode, err := getMissingLs.Load(ipld.LinkContext{}, cidlink.Link{Cid: root}, basicnode.Prototype.Any) if err != nil { return err } - return progress.WalkAdv(rootNode, sel, func(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { + return progress.WalkMatching(rootNode, sel, func(p traversal.Progress, n datamodel.Node) error { return nil }) } diff --git a/subscribe.go b/subscribe.go index 6aecc76..a51de82 100644 --- a/subscribe.go +++ b/subscribe.go @@ -84,7 +84,7 @@ func newSubscriber(ctx context.Context, dt dt.Manager, topic *pubsub.Topic, onCl dt: dt, topic: topic, onClose: onClose, - updates: make(chan cid.Cid, 5), + updates: make(chan cid.Cid, 1), subs: make([]chan cid.Cid, 0), cancel: nil, policy: policy, From 08f3269fc805617ea26ffa2ccdcef68bca238e9d Mon Sep 17 00:00:00 2001 From: Will Scott Date: Sun, 24 Oct 2021 09:50:12 -0700 Subject: [PATCH 07/11] working pub/sub test --- http/http_test.go | 69 +++++++++++++++++ http/multiaddr/convert.go | 136 +++++++++++++++++++++++++++++++++ http/multiaddr/convert_test.go | 34 +++++++++ http/subscribe.go | 38 +++------ legs_test.go | 55 ++----------- multisubscribe_test.go | 13 ++-- test/util.go | 53 +++++++++++++ 7 files changed, 315 insertions(+), 83 deletions(-) create mode 100644 http/http_test.go create mode 100644 http/multiaddr/convert.go create mode 100644 http/multiaddr/convert_test.go create mode 100644 test/util.go diff --git a/http/http_test.go b/http/http_test.go new file mode 100644 index 0000000..866a38a --- /dev/null +++ b/http/http_test.go @@ -0,0 +1,69 @@ +package http_test + +import ( + "context" + "net" + nhttp "net/http" + "testing" + "time" + + "github.com/filecoin-project/go-legs/http" + "github.com/filecoin-project/go-legs/test" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +func TestManualSync(t *testing.T) { + srcStore := dssync.MutexWrap(datastore.NewMapDatastore()) + srcSys := test.MkLinkSystem(srcStore) + p, err := http.NewPublisher(context.Background(), srcStore, srcSys) + if err != nil { + t.Fatal(err) + } + nl, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatal(err) + } + go func() { + _ = nhttp.Serve(nl, p.(nhttp.Handler)) + }() + nlm, err := manet.FromNetAddr(nl.Addr()) + if err != nil { + t.Fatal(err) + } + proto, _ := multiaddr.NewMultiaddr("/http") + nlm = multiaddr.Join(nlm, proto) + + dstStore := dssync.MutexWrap(datastore.NewMapDatastore()) + dstSys := test.MkLinkSystem(dstStore) + s, err := http.NewHTTPSubscriber(context.Background(), nhttp.DefaultClient, nlm, &dstSys, "", nil) + if err != nil { + t.Fatal(err) + } + + rootLnk, err := test.Store(srcStore, basicnode.NewString("hello world")) + if err := p.UpdateRoot(context.Background(), rootLnk.(cidlink.Link).Cid); err != nil { + t.Fatal(err) + } + + cchan, cncl, err := s.Sync(context.Background(), peer.NewPeerRecord().PeerID, cid.Undef, nil) + if err != nil { + t.Fatal(err) + } + + select { + case rc := <-cchan: + if !rc.Equals(rootLnk.(cidlink.Link).Cid) { + t.Fatalf("didn't get expected cid. expected %s, got %s", rootLnk, rc) + } + case <-time.After(5 * time.Second): + t.Fatal("timed out") + } + cncl() +} diff --git a/http/multiaddr/convert.go b/http/multiaddr/convert.go new file mode 100644 index 0000000..7bbb8c1 --- /dev/null +++ b/http/multiaddr/convert.go @@ -0,0 +1,136 @@ +package multiaddr + +import ( + "bytes" + "encoding/base64" + "fmt" + "net" + "net/url" + + "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +// register an 'httpath' component: +var transcodePath = multiaddr.NewTranscoderFromFunctions(pathStB, pathBtS, pathVal) + +func pathVal(b []byte) error { + if bytes.IndexByte(b, '/') >= 0 { + return fmt.Errorf("encoded path '%s' contains a slash", string(b)) + } + return nil +} + +func pathStB(s string) ([]byte, error) { + return []byte(s), nil +} + +func pathBtS(b []byte) (string, error) { + return string(b), nil +} + +func init() { + _ = multiaddr.AddProtocol(protoHTTPath) +} + +var protoHTTPath = multiaddr.Protocol{ + Name: "httpath", + Code: 0x300200, + VCode: multiaddr.CodeToVarint(0x300200), + Size: multiaddr.LengthPrefixedVarSize, + Transcoder: transcodePath, +} + +// ToURL takes a multiaddr of the form: +// /dns/thing.com/http/base64 +// /ip/192.168.0.1/tcp/80/http +func ToURL(ma multiaddr.Multiaddr) (*url.URL, error) { + // host should be either the dns name or the IP + _, host, err := manet.DialArgs(ma) + if err != nil { + return nil, err + } + if ip := net.ParseIP(host); ip != nil { + if !ip.To4().Equal(ip) { + // raw v6 IPs need `[ip]` encapsulation. + host = fmt.Sprintf("[%s]", host) + } + } + + protos := ma.Protocols() + pm := make(map[int]string, len(protos)) + for _, p := range protos { + v, err := ma.ValueForProtocol(p.Code) + if err == nil { + pm[p.Code] = v + } + } + + scheme := "http" + if _, ok := pm[multiaddr.P_HTTPS]; ok { + scheme = "https" + } // todo: ws/wss + + path := "" + if pb, ok := pm[protoHTTPath.Code]; ok { + pathDec, err := base64.URLEncoding.DecodeString(pb) + if err == nil { + path = string(pathDec) + } + } + + out := url.URL{ + Scheme: scheme, + Host: host, + Path: path, + } + return &out, nil +} + +// ToMA takes a url and converts it into a multiaddr. +// converts scheme://host:port/path -> /ip/host/tcp/port/scheme/base64{path} +func ToMA(u *url.URL) (*multiaddr.Multiaddr, error) { + h := u.Hostname() + var addr *multiaddr.Multiaddr + if n := net.ParseIP(h); n != nil { + ipAddr, err := manet.FromIP(n) + if err != nil { + return nil, err + } + addr = &ipAddr + } else { + // domain name + ma, err := multiaddr.NewComponent(multiaddr.ProtocolWithCode(multiaddr.P_DNS).Name, h) + if err != nil { + return nil, err + } + mab := multiaddr.Cast(ma.Bytes()) + addr = &mab + } + pv := u.Port() + if pv != "" { + port, err := multiaddr.NewComponent(multiaddr.ProtocolWithCode(multiaddr.P_TCP).Name, pv) + if err != nil { + return nil, err + } + wport := multiaddr.Join(*addr, port) + addr = &wport + } + + http, err := multiaddr.NewComponent(u.Scheme, "") + if err != nil { + return nil, err + } + + joint := multiaddr.Join(*addr, http) + if u.Path != "" { + path := base64.URLEncoding.EncodeToString([]byte(u.Path)) + httpath, err := multiaddr.NewComponent(protoHTTPath.Name, path) + if err != nil { + return nil, err + } + joint = multiaddr.Join(joint, httpath) + } + + return &joint, nil +} diff --git a/http/multiaddr/convert_test.go b/http/multiaddr/convert_test.go new file mode 100644 index 0000000..902e638 --- /dev/null +++ b/http/multiaddr/convert_test.go @@ -0,0 +1,34 @@ +package multiaddr + +import ( + "net/url" + "testing" +) + +func TestRoundtrip(t *testing.T) { + samples := []string{ + "http://www.google.com/path/to/rsrc", + "https://protocol.ai", + "http://192.168.0.1:8080/admin", + "https://[2a00:1450:400e:80d::200e]:443/", + "https://[2a00:1450:400e:80d::200e]/", + } + + for _, s := range samples { + u, _ := url.Parse(s) + mu, err := ToMA(u) + if err != nil { + t.Fatal(err) + } + u2, err := ToURL(*mu) + if u2.Scheme != u.Scheme { + t.Fatalf("scheme didn't roundtrip. got %s expected %s", u2.Scheme, u.Scheme) + } + if u2.Host != u.Host { + t.Fatalf("host didn't roundtrip. got %s, expected %s", u2.Host, u.Host) + } + if u2.Path != u.Path { + t.Fatalf("path didn't roundtrip. got %s, expected %s", u2.Path, u.Path) + } + } +} diff --git a/http/subscribe.go b/http/subscribe.go index 457a4e5..6b2a524 100644 --- a/http/subscribe.go +++ b/http/subscribe.go @@ -6,11 +6,13 @@ import ( "fmt" "io" "net/http" + "net/url" "path" "sync" "time" "github.com/filecoin-project/go-legs" + maurl "github.com/filecoin-project/go-legs/http/multiaddr" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" @@ -21,7 +23,6 @@ import ( "github.com/ipld/go-ipld-prime/traversal/selector" peer "github.com/libp2p/go-libp2p-core/peer" "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr/net" ) var defaultPollTime = time.Hour @@ -31,14 +32,14 @@ var log = logging.Logger("go-legs") // NewHTTPSubscriber creates a legs subcriber that provides subscriptions // from publishers identified by func NewHTTPSubscriber(ctx context.Context, host *http.Client, publisher multiaddr.Multiaddr, lsys *ipld.LinkSystem, topic string, selector ipld.Node) (legs.LegSubscriber, error) { - url, err := toURL(publisher) + url, err := maurl.ToURL(publisher) if err != nil { return nil, err } hs := httpSubscriber{ host, cid.Undef, - url, + *url, lsys, selector, @@ -50,30 +51,10 @@ func NewHTTPSubscriber(ctx context.Context, host *http.Client, publisher multiad return &hs, nil } -// toURL takes a multiaddr of the form: -// /dnsaddr/thing.com/http/path/to/root -// /ip/192.168.0.1/http -// TODO: doesn't support including a signing key definition yet -func toURL(ma multiaddr.Multiaddr) (string, error) { - // host should be either the dns name or the IP - _, host, err := manet.DialArgs(ma) - if err != nil { - return "", err - } - _, http := multiaddr.SplitFunc(ma, func(c multiaddr.Component) bool { - return c.Protocol().Code == multiaddr.P_HTTP - }) - if len(http.Bytes()) == 0 { - return "", fmt.Errorf("publisher must be HTTP protocol for this subscriber, was: %s", ma) - } - _, path := multiaddr.SplitFirst(http) - return fmt.Sprintf("https://%s/%s", host, path), nil -} - type httpSubscriber struct { *http.Client head cid.Cid - root string + root url.URL lsys *ipld.LinkSystem defaultSelector ipld.Node @@ -258,8 +239,11 @@ func (h *httpSubscriber) walkFetch(ctx context.Context, root cid.Cid, sel select }) } -func (h *httpSubscriber) fetch(ctx context.Context, url string, cb func(io.Reader) error) error { - req, err := http.NewRequestWithContext(ctx, "GET", path.Join(h.root, url), nil) +func (h *httpSubscriber) fetch(ctx context.Context, rsrc string, cb func(io.Reader) error) error { + localURL := h.root + localURL.Path = path.Join(h.root.Path, rsrc) + + req, err := http.NewRequestWithContext(ctx, "GET", localURL.String(), nil) if err != nil { return err } @@ -268,7 +252,7 @@ func (h *httpSubscriber) fetch(ctx context.Context, url string, cb func(io.Reade return err } if resp.StatusCode != http.StatusOK { - return fmt.Errorf("non success http code at %s/%s: %d", h.root, url, resp.StatusCode) + return fmt.Errorf("non success http code at %s: %d", localURL.String(), resp.StatusCode) } defer resp.Body.Close() diff --git a/legs_test.go b/legs_test.go index a4c5fee..2ef382c 100644 --- a/legs_test.go +++ b/legs_test.go @@ -1,26 +1,22 @@ package legs_test import ( - "bytes" "context" - "io" "testing" "time" - "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" - "github.com/ipld/go-ipld-prime" // dagjson codec registered for encoding legs "github.com/filecoin-project/go-legs" + "github.com/filecoin-project/go-legs/test" _ "github.com/ipld/go-ipld-prime/codec/dagcbor" _ "github.com/ipld/go-ipld-prime/codec/dagjson" cidlink "github.com/ipld/go-ipld-prime/linking/cid" basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/host" - "github.com/multiformats/go-multicodec" ) func mkTestHost() host.Host { @@ -28,29 +24,9 @@ func mkTestHost() host.Host { return h } -func mkLinkSystem(ds datastore.Batching) ipld.LinkSystem { - lsys := cidlink.DefaultLinkSystem() - lsys.StorageReadOpener = func(_ ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { - c := lnk.(cidlink.Link).Cid - val, err := ds.Get(datastore.NewKey(c.String())) - if err != nil { - return nil, err - } - return bytes.NewBuffer(val), nil - } - lsys.StorageWriteOpener = func(_ ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) { - buf := bytes.NewBuffer(nil) - return buf, func(lnk ipld.Link) error { - c := lnk.(cidlink.Link).Cid - return ds.Put(datastore.NewKey(c.String()), buf.Bytes()) - }, nil - } - return lsys -} - func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host, host.Host, legs.LegPublisher, legs.LegSubscriber) { srcHost := mkTestHost() - srcLnkS := mkLinkSystem(srcStore) + srcLnkS := test.MkLinkSystem(srcStore) lp, err := legs.NewPublisher(context.Background(), srcHost, srcStore, srcLnkS, "legs/testtopic") if err != nil { t.Fatal(err) @@ -62,7 +38,7 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host, if err := srcHost.Connect(context.Background(), dstHost.Peerstore().PeerInfo(dstHost.ID())); err != nil { t.Fatal(err) } - dstLnkS := mkLinkSystem(dstStore) + dstLnkS := test.MkLinkSystem(dstStore) ls, err := legs.NewSubscriber(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic", nil) if err != nil { t.Fatal(err) @@ -70,27 +46,6 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host, return srcHost, dstHost, lp, ls } -func mkRoot(srcStore datastore.Batching, n ipld.Node) (ipld.Link, error) { - linkproto := cidlink.LinkPrototype{ - Prefix: cid.Prefix{ - Version: 1, - Codec: uint64(multicodec.DagJson), - MhType: uint64(multicodec.Sha2_256), - MhLength: 16, - }, - } - lsys := cidlink.DefaultLinkSystem() - lsys.StorageWriteOpener = func(_ ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) { - buf := bytes.NewBuffer(nil) - return buf, func(lnk ipld.Link) error { - c := lnk.(cidlink.Link).Cid - return srcStore.Put(datastore.NewKey(c.String()), buf.Bytes()) - }, nil - } - - return lsys.Store(ipld.LinkContext{}, linkproto, n) -} - func TestRoundTrip(t *testing.T) { // Init legs publisher and subscriber srcStore := dssync.MutexWrap(datastore.NewMapDatastore()) @@ -101,7 +56,7 @@ func TestRoundTrip(t *testing.T) { // Update root with item itm := basicnode.NewString("hello world") - lnk, err := mkRoot(srcStore, itm) + lnk, err := test.Store(srcStore, itm) if err != nil { t.Fatal(err) } @@ -160,7 +115,7 @@ func TestSetAndFilterPeerPolicy(t *testing.T) { ma.AssembleValue().AssignBool(true) ma.Finish() n := nb.Build() - lnk, err := mkRoot(srcStore, n) + lnk, err := test.Store(srcStore, n) if err != nil { t.Fatal(err) } diff --git a/multisubscribe_test.go b/multisubscribe_test.go index 10af502..eb4e4b7 100644 --- a/multisubscribe_test.go +++ b/multisubscribe_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/filecoin-project/go-legs" + "github.com/filecoin-project/go-legs/test" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" cidlink "github.com/ipld/go-ipld-prime/linking/cid" @@ -16,7 +17,7 @@ func TestMultiSusbscribeRoundTrip(t *testing.T) { // Init legs publisher and subscriber srcStore1 := dssync.MutexWrap(datastore.NewMapDatastore()) srcHost1 := mkTestHost() - srcLnkS1 := mkLinkSystem(srcStore1) + srcLnkS1 := test.MkLinkSystem(srcStore1) lp1, err := legs.NewPublisher(context.Background(), srcHost1, srcStore1, srcLnkS1, "legs/testtopic") if err != nil { @@ -25,7 +26,7 @@ func TestMultiSusbscribeRoundTrip(t *testing.T) { srcStore2 := dssync.MutexWrap(datastore.NewMapDatastore()) srcHost2 := mkTestHost() - srcLnkS2 := mkLinkSystem(srcStore2) + srcLnkS2 := test.MkLinkSystem(srcStore2) lp2, err := legs.NewPublisher(context.Background(), srcHost2, srcStore2, srcLnkS2, "legs/testtopic") if err != nil { t.Fatal(err) @@ -43,7 +44,7 @@ func TestMultiSusbscribeRoundTrip(t *testing.T) { if err := srcHost2.Connect(context.Background(), dstHost.Peerstore().PeerInfo(dstHost.ID())); err != nil { t.Fatal(err) } - dstLnkS := mkLinkSystem(dstStore) + dstLnkS := test.MkLinkSystem(dstStore) ms, err := legs.NewMultiSubscriber(context.Background(), dstHost, dstStore, dstLnkS, "legs/testtopic", nil) if err != nil { t.Fatal(err) @@ -62,13 +63,13 @@ func TestMultiSusbscribeRoundTrip(t *testing.T) { // Update root on publisher one with item itm1 := basicnode.NewString("hello world") - lnk1, err := mkRoot(srcStore1, itm1) + lnk1, err := test.Store(srcStore1, itm1) if err != nil { t.Fatal(err) } // Update root on publisher one with item itm2 := basicnode.NewString("hello world 2") - lnk2, err := mkRoot(srcStore2, itm2) + lnk2, err := test.Store(srcStore2, itm2) if err != nil { t.Fatal(err) } @@ -116,7 +117,7 @@ func TestMultiSusbscribeRoundTrip(t *testing.T) { func TestCloseTransport(t *testing.T) { st := dssync.MutexWrap(datastore.NewMapDatastore()) sh := mkTestHost() - lsys := mkLinkSystem(st) + lsys := test.MkLinkSystem(st) ms, err := legs.NewMultiSubscriber(context.Background(), sh, st, lsys, "legs/testtopic", nil) if err != nil { t.Fatal(err) diff --git a/test/util.go b/test/util.go new file mode 100644 index 0000000..03e3ea1 --- /dev/null +++ b/test/util.go @@ -0,0 +1,53 @@ +package test + +import ( + "bytes" + "io" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/multiformats/go-multicodec" +) + +func MkLinkSystem(ds datastore.Batching) ipld.LinkSystem { + lsys := cidlink.DefaultLinkSystem() + lsys.StorageReadOpener = func(_ ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { + c := lnk.(cidlink.Link).Cid + val, err := ds.Get(datastore.NewKey(c.String())) + if err != nil { + return nil, err + } + return bytes.NewBuffer(val), nil + } + lsys.StorageWriteOpener = func(_ ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) { + buf := bytes.NewBuffer(nil) + return buf, func(lnk ipld.Link) error { + c := lnk.(cidlink.Link).Cid + return ds.Put(datastore.NewKey(c.String()), buf.Bytes()) + }, nil + } + return lsys +} + +func Store(srcStore datastore.Batching, n ipld.Node) (ipld.Link, error) { + linkproto := cidlink.LinkPrototype{ + Prefix: cid.Prefix{ + Version: 1, + Codec: uint64(multicodec.DagJson), + MhType: uint64(multicodec.Sha2_256), + MhLength: 16, + }, + } + lsys := cidlink.DefaultLinkSystem() + lsys.StorageWriteOpener = func(_ ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) { + buf := bytes.NewBuffer(nil) + return buf, func(lnk ipld.Link) error { + c := lnk.(cidlink.Link).Cid + return srcStore.Put(datastore.NewKey(c.String()), buf.Bytes()) + }, nil + } + + return lsys.Store(ipld.LinkContext{}, linkproto, n) +} From 9eda8bd969e6abcf3df0da5284963f8e056d0f01 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Sun, 24 Oct 2021 19:29:37 -0700 Subject: [PATCH 08/11] additional logging --- http/publish.go | 4 ++-- http/subscribe.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/http/publish.go b/http/publish.go index 17fbd7b..bf8562a 100644 --- a/http/publish.go +++ b/http/publish.go @@ -44,7 +44,7 @@ func (h *httpPublisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { out, err := json.Marshal(h.root.String()) if err != nil { w.WriteHeader(500) - //todo: log + log.Infow("failed to serve root", "err", err) } else { _, _ = w.Write(out) } @@ -66,7 +66,7 @@ func (h *httpPublisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("unable to load data for cid")) - // todo: log + log.Infow("failed to load requested block", "err", err) return } // marshal to json and serve. diff --git a/http/subscribe.go b/http/subscribe.go index 6b2a524..7ce86aa 100644 --- a/http/subscribe.go +++ b/http/subscribe.go @@ -27,7 +27,7 @@ import ( var defaultPollTime = time.Hour -var log = logging.Logger("go-legs") +var log = logging.Logger("go-legs-http") // NewHTTPSubscriber creates a legs subcriber that provides subscriptions // from publishers identified by @@ -184,18 +184,18 @@ func (h *httpSubscriber) background() { sel, cidlink.Link{Cid: h.head}) if err := h.fetchBlock(ctx, nextCid); err != nil { - //log + log.Infow("failed to fetch requested block", "err", err) continue } xsel, err = selector.CompileSelector(sel) if err != nil { - //log + log.Infow("failed to compile selector", "err", err, "selector", sel) continue } err = h.walkFetch(ctx, nextCid, xsel) if err != nil { - //log + log.Infow("failed to walk requested dag", "err", err, "root", nextCid) continue } } From f7ae6952e9082fbdb1bc390dd1806cfedb77cbcc Mon Sep 17 00:00:00 2001 From: Will Scott Date: Mon, 25 Oct 2021 18:09:49 -0700 Subject: [PATCH 09/11] use url escape rather than base64 --- http/multiaddr/convert.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/http/multiaddr/convert.go b/http/multiaddr/convert.go index 7bbb8c1..faed727 100644 --- a/http/multiaddr/convert.go +++ b/http/multiaddr/convert.go @@ -2,7 +2,6 @@ package multiaddr import ( "bytes" - "encoding/base64" "fmt" "net" "net/url" @@ -42,7 +41,7 @@ var protoHTTPath = multiaddr.Protocol{ } // ToURL takes a multiaddr of the form: -// /dns/thing.com/http/base64 +// /dns/thing.com/http/urlescape // /ip/192.168.0.1/tcp/80/http func ToURL(ma multiaddr.Multiaddr) (*url.URL, error) { // host should be either the dns name or the IP @@ -73,9 +72,9 @@ func ToURL(ma multiaddr.Multiaddr) (*url.URL, error) { path := "" if pb, ok := pm[protoHTTPath.Code]; ok { - pathDec, err := base64.URLEncoding.DecodeString(pb) - if err == nil { - path = string(pathDec) + path, err = url.PathUnescape(pb) + if err != nil { + path = "" } } @@ -88,7 +87,7 @@ func ToURL(ma multiaddr.Multiaddr) (*url.URL, error) { } // ToMA takes a url and converts it into a multiaddr. -// converts scheme://host:port/path -> /ip/host/tcp/port/scheme/base64{path} +// converts scheme://host:port/path -> /ip/host/tcp/port/scheme/urlescape{path} func ToMA(u *url.URL) (*multiaddr.Multiaddr, error) { h := u.Hostname() var addr *multiaddr.Multiaddr @@ -124,8 +123,7 @@ func ToMA(u *url.URL) (*multiaddr.Multiaddr, error) { joint := multiaddr.Join(*addr, http) if u.Path != "" { - path := base64.URLEncoding.EncodeToString([]byte(u.Path)) - httpath, err := multiaddr.NewComponent(protoHTTPath.Name, path) + httpath, err := multiaddr.NewComponent(protoHTTPath.Name, url.PathEscape(u.Path)) if err != nil { return nil, err } From 71359ae59810d3ba7dc0979e8f274cbb683293f2 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 27 Oct 2021 18:24:44 -0700 Subject: [PATCH 10/11] code review --- http/subscribe.go | 55 +++++++++++++++++++++++++++-------------------- interface.go | 2 +- subscribe.go | 3 +-- test/util.go | 15 +++---------- 4 files changed, 37 insertions(+), 38 deletions(-) diff --git a/http/subscribe.go b/http/subscribe.go index 7ce86aa..ee6a5d0 100644 --- a/http/subscribe.go +++ b/http/subscribe.go @@ -37,15 +37,15 @@ func NewHTTPSubscriber(ctx context.Context, host *http.Client, publisher multiad return nil, err } hs := httpSubscriber{ - host, - cid.Undef, - *url, + Client: host, + head: cid.Undef, + root: *url, - lsys, - selector, - sync.Mutex{}, - make(chan req, 1), - make([]chan cid.Cid, 1), + lsys: lsys, + defaultSelector: selector, + mtx: sync.Mutex{}, + reqs: make(chan req, 1), + subs: make([]chan cid.Cid, 1), } go hs.background() return &hs, nil @@ -53,18 +53,20 @@ func NewHTTPSubscriber(ctx context.Context, host *http.Client, publisher multiad type httpSubscriber struct { *http.Client - head cid.Cid root url.URL lsys *ipld.LinkSystem defaultSelector ipld.Node - submtx sync.Mutex // reqs is inbound requests for syncs from `Sync` calls reqs chan req + + // mtx protects state below accessed both by the background thread and public state + mtx sync.Mutex + head cid.Cid subs []chan cid.Cid } -var _ (legs.LegSubscriber) = (*httpSubscriber)(nil) +var _ legs.LegSubscriber = (*httpSubscriber)(nil) type req struct { cid.Cid @@ -75,12 +77,12 @@ type req struct { func (h *httpSubscriber) OnChange() (chan cid.Cid, context.CancelFunc) { ch := make(chan cid.Cid) - h.submtx.Lock() - defer h.submtx.Unlock() + h.mtx.Lock() + defer h.mtx.Unlock() h.subs = append(h.subs, ch) cncl := func() { - h.submtx.Lock() - defer h.submtx.Unlock() + h.mtx.Lock() + defer h.mtx.Unlock() for i, ca := range h.subs { if ca == ch { h.subs[i] = h.subs[len(h.subs)-1] @@ -101,17 +103,16 @@ func (h *httpSubscriber) SetPolicyHandler(p legs.PolicyHandler) error { } func (h *httpSubscriber) SetLatestSync(c cid.Cid) error { - h.submtx.Lock() - defer h.submtx.Unlock() + h.mtx.Lock() + defer h.mtx.Unlock() h.head = c return nil } -func (h *httpSubscriber) Sync(ctx context.Context, p peer.ID, c cid.Cid, selector ipld.Node) (chan cid.Cid, context.CancelFunc, error) { +func (h *httpSubscriber) Sync(ctx context.Context, p peer.ID, c cid.Cid, selector ipld.Node) (<-chan cid.Cid, context.CancelFunc, error) { respChan := make(chan cid.Cid, 1) cctx, cncl := context.WithCancel(ctx) - h.submtx.Lock() - defer h.submtx.Unlock() + // todo: error if reqs is full h.reqs <- req{ Cid: c, @@ -125,8 +126,8 @@ func (h *httpSubscriber) Sync(ctx context.Context, p peer.ID, c cid.Cid, selecto func (h *httpSubscriber) Close() error { // cancel out subscribers. h.Client.CloseIdleConnections() - h.submtx.Lock() - defer h.submtx.Unlock() + h.mtx.Lock() + defer h.mtx.Unlock() for _, ca := range h.subs { close(ca) } @@ -180,9 +181,12 @@ func (h *httpSubscriber) background() { err = nil continue } + h.mtx.Lock() + currHead := h.head + h.mtx.Unlock() sel = legs.ExploreRecursiveWithStopNode(selector.RecursionLimitNone(), sel, - cidlink.Link{Cid: h.head}) + cidlink.Link{Cid: currHead}) if err := h.fetchBlock(ctx, nextCid); err != nil { log.Infow("failed to fetch requested block", "err", err) continue @@ -198,11 +202,16 @@ func (h *httpSubscriber) background() { log.Infow("failed to walk requested dag", "err", err, "root", nextCid) continue } + // now head is updated. save it. + h.mtx.Lock() + h.head = nextCid + h.mtx.Unlock() } } func (h *httpSubscriber) walkFetch(ctx context.Context, root cid.Cid, sel selector.Selector) error { getMissingLs := cidlink.DefaultLinkSystem() + // trusted because it'll be hashed/verified on the way into the link system when fetched. getMissingLs.TrustedStorage = true getMissingLs.StorageReadOpener = func(lc ipld.LinkContext, l ipld.Link) (io.Reader, error) { r, err := h.lsys.StorageReadOpener(lc, l) diff --git a/interface.go b/interface.go index 6a3f275..4b72fd2 100644 --- a/interface.go +++ b/interface.go @@ -38,7 +38,7 @@ type LegSubscriber interface { // Returns a channel that will resolve with the same cid, c that was passed as input when // the dag from 'c' is available, a function to cancel, or a synchronous error if // the subscriber is not in a state where it can sync from the specified peer. - Sync(ctx context.Context, p peer.ID, c cid.Cid, s ipld.Node) (chan cid.Cid, context.CancelFunc, error) + Sync(ctx context.Context, p peer.ID, c cid.Cid, selector ipld.Node) (<-chan cid.Cid, context.CancelFunc, error) // Close subscriber Close() error } diff --git a/subscribe.go b/subscribe.go index a51de82..a3341ee 100644 --- a/subscribe.go +++ b/subscribe.go @@ -86,7 +86,6 @@ func newSubscriber(ctx context.Context, dt dt.Manager, topic *pubsub.Topic, onCl onClose: onClose, updates: make(chan cid.Cid, 1), subs: make([]chan cid.Cid, 0), - cancel: nil, policy: policy, defaultSelector: selector, } @@ -263,7 +262,7 @@ func (ls *legSubscriber) unlockOnce(ulOnce *sync.Once) { ulOnce.Do(ls.syncmtx.Unlock) } -func (ls *legSubscriber) Sync(ctx context.Context, p peer.ID, c cid.Cid, s ipld.Node) (chan cid.Cid, context.CancelFunc, error) { +func (ls *legSubscriber) Sync(ctx context.Context, p peer.ID, c cid.Cid, s ipld.Node) (<-chan cid.Cid, context.CancelFunc, error) { out := make(chan cid.Cid) v := Voucher{&c} var ulOnce sync.Once diff --git a/test/util.go b/test/util.go index 03e3ea1..290cede 100644 --- a/test/util.go +++ b/test/util.go @@ -14,8 +14,7 @@ import ( func MkLinkSystem(ds datastore.Batching) ipld.LinkSystem { lsys := cidlink.DefaultLinkSystem() lsys.StorageReadOpener = func(_ ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { - c := lnk.(cidlink.Link).Cid - val, err := ds.Get(datastore.NewKey(c.String())) + val, err := ds.Get(datastore.NewKey(lnk.String())) if err != nil { return nil, err } @@ -24,8 +23,7 @@ func MkLinkSystem(ds datastore.Batching) ipld.LinkSystem { lsys.StorageWriteOpener = func(_ ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) { buf := bytes.NewBuffer(nil) return buf, func(lnk ipld.Link) error { - c := lnk.(cidlink.Link).Cid - return ds.Put(datastore.NewKey(c.String()), buf.Bytes()) + return ds.Put(datastore.NewKey(lnk.String()), buf.Bytes()) }, nil } return lsys @@ -40,14 +38,7 @@ func Store(srcStore datastore.Batching, n ipld.Node) (ipld.Link, error) { MhLength: 16, }, } - lsys := cidlink.DefaultLinkSystem() - lsys.StorageWriteOpener = func(_ ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) { - buf := bytes.NewBuffer(nil) - return buf, func(lnk ipld.Link) error { - c := lnk.(cidlink.Link).Cid - return srcStore.Put(datastore.NewKey(c.String()), buf.Bytes()) - }, nil - } + lsys := MkLinkSystem(srcStore) return lsys.Store(ipld.LinkContext{}, linkproto, n) } From b9e3576ecdd98b086a51b0d01d15813bd9f4be4b Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 27 Oct 2021 18:25:46 -0700 Subject: [PATCH 11/11] simplify --- http/subscribe.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/http/subscribe.go b/http/subscribe.go index ee6a5d0..cc53b6d 100644 --- a/http/subscribe.go +++ b/http/subscribe.go @@ -233,10 +233,6 @@ func (h *httpSubscriber) walkFetch(ctx context.Context, root cid.Cid, sel select LinkTargetNodePrototypeChooser: basicnode.Chooser, }, Path: datamodel.NewPath([]datamodel.PathSegment{}), - LastBlock: struct { - Path datamodel.Path - Link datamodel.Link - }{}, } // get the direct node. rootNode, err := getMissingLs.Load(ipld.LinkContext{}, cidlink.Link{Cid: root}, basicnode.Prototype.Any)