From 6ef190f5a646b51ea092b7e35479cca26c187d89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Thu, 5 Dec 2019 12:43:32 +0100 Subject: [PATCH 1/2] pin: implement pin/ls with only CoreApi --- core/commands/pin.go | 86 +++++++++++----------- core/coreapi/pin.go | 165 ++++++++++++++++++++++--------------------- go.mod | 2 +- go.sum | 5 +- 4 files changed, 131 insertions(+), 127 deletions(-) diff --git a/core/commands/pin.go b/core/commands/pin.go index d27b0212110..165dad26d23 100644 --- a/core/commands/pin.go +++ b/core/commands/pin.go @@ -13,8 +13,6 @@ import ( cidenc "github.com/ipfs/go-cidutil/cidenc" cmds "github.com/ipfs/go-ipfs-cmds" offline "github.com/ipfs/go-ipfs-exchange-offline" - pin "github.com/ipfs/go-ipfs-pinner" - ipld "github.com/ipfs/go-ipld-format" dag "github.com/ipfs/go-merkledag" verifcid "github.com/ipfs/go-verifcid" coreiface "github.com/ipfs/interface-go-ipfs-core" @@ -24,7 +22,6 @@ import ( core "github.com/ipfs/go-ipfs/core" cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" e "github.com/ipfs/go-ipfs/core/commands/e" - coreapi "github.com/ipfs/go-ipfs/core/coreapi" ) var PinCmd = &cmds.Command{ @@ -320,11 +317,6 @@ Example: cmds.BoolOption(pinStreamOptionName, "s", "Enable streaming of pins as they are discovered."), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - n, err := cmdenv.GetNode(env) - if err != nil { - return err - } - api, err := cmdenv.GetApi(env, req) if err != nil { return err @@ -352,9 +344,9 @@ Example: } if len(req.Arguments) > 0 { - err = pinLsKeys(req, typeStr, n, api, emit) + err = pinLsKeys(req, typeStr, api, emit) } else { - err = pinLsAll(req, typeStr, n.Pinning, n.DAG, emit) + err = pinLsAll(req, typeStr, api, emit) } if err != nil { return err @@ -431,24 +423,30 @@ type PinLsObject struct { Type string `json:",omitempty"` } -func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreiface.CoreAPI, emit func(value interface{}) error) error { - mode, ok := pin.StringToMode(typeStr) - if !ok { - return fmt.Errorf("invalid pin mode '%s'", typeStr) - } - +func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value interface{}) error) error { enc, err := cmdenv.GetCidEncoder(req) if err != nil { return err } + switch typeStr { + case "all", "direct", "indirect", "recursive": + default: + return fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", typeStr) + } + + opt, err := options.Pin.IsPinned.Type(typeStr) + if err != nil { + panic("unhandled pin type") + } + for _, p := range req.Arguments { - c, err := api.ResolvePath(req.Context, path.New(p)) + rp, err := api.ResolvePath(req.Context, path.New(p)) if err != nil { return err } - pinType, pinned, err := n.Pinning.IsPinnedWithType(req.Context, c.Cid(), mode) + pinType, pinned, err := api.Pin().IsPinned(req.Context, rp, opt) if err != nil { return err } @@ -466,7 +464,7 @@ func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreifac err = emit(&PinLsOutputWrapper{ PinLsObject: PinLsObject{ Type: pinType, - Cid: enc.Encode(c.Cid()), + Cid: enc.Encode(rp.Cid()), }, }) if err != nil { @@ -477,38 +475,42 @@ func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreifac return nil } -func pinLsAll(req *cmds.Request, typeStr string, pinning pin.Pinner, dag ipld.DAGService, emit func(value interface{}) error) error { - pinCh, errCh := coreapi.PinLsAll(req.Context, typeStr, pinning, dag) - +func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value interface{}) error) error { enc, err := cmdenv.GetCidEncoder(req) if err != nil { return err } - ctx := req.Context -loop: - for { - select { - case p, ok := <-pinCh: - if !ok { - break loop - } - if err := emit(&PinLsOutputWrapper{ - PinLsObject: PinLsObject{ - Type: p.Type(), - Cid: enc.Encode(p.Path().Cid()), - }, - }); err != nil { - return err - } + switch typeStr { + case "all", "direct", "indirect", "recursive": + default: + err = fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", typeStr) + return err + } + + opt, err := options.Pin.Ls.Type(typeStr) + if err != nil { + panic("unhandled pin type") + } + + pins, err := api.Pin().Ls(req.Context, opt) + if err != nil { + return err + } - case <-ctx.Done(): - return ctx.Err() + for p := range pins { + err = emit(&PinLsOutputWrapper{ + PinLsObject: PinLsObject{ + Type: p.Type(), + Cid: enc.Encode(p.Path().Cid()), + }, + }) + if err != nil { + return err } } - err = <-errCh - return err + return nil } const ( diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 70b69b5b36c..000d7d593ea 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -3,11 +3,11 @@ package coreapi import ( "context" "fmt" + bserv "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" offline "github.com/ipfs/go-ipfs-exchange-offline" pin "github.com/ipfs/go-ipfs-pinner" - ipld "github.com/ipfs/go-ipld-format" "github.com/ipfs/go-merkledag" coreiface "github.com/ipfs/interface-go-ipfs-core" caopts "github.com/ipfs/interface-go-ipfs-core/options" @@ -41,7 +41,7 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp return api.pinning.Flush(ctx) } -func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]coreiface.Pin, error) { +func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan coreiface.Pin, error) { settings, err := caopts.PinLsOptions(opts...) if err != nil { return nil, err @@ -53,7 +53,26 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]coreif return nil, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type) } - return api.pinLsAll(settings.Type, ctx) + return api.pinLsAll(settings.Type, ctx), nil +} + +func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.PinIsPinnedOption) (string, bool, error) { + dagNode, err := api.core().ResolveNode(ctx, p) + if err != nil { + return "", false, fmt.Errorf("pin: %s", err) + } + + settings, err := caopts.PinIsPinnedOptions(opts...) + if err != nil { + return "", false, err + } + + mode, ok := pin.StringToMode(settings.WithType) + if !ok { + return "", false, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.WithType) + } + + return api.pinning.IsPinnedWithType(ctx, dagNode.Cid(), mode) } // Rm pin rm api @@ -184,6 +203,7 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro type pinInfo struct { pinType string path path.Resolved + err error } func (p *pinInfo) Path() path.Resolved { @@ -194,123 +214,106 @@ func (p *pinInfo) Type() string { return p.pinType } -func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) ([]coreiface.Pin, error) { - pinCh, errCh := PinLsAll(ctx, typeStr, api.pinning, api.dag) - - var pins []coreiface.Pin -loop: - for { - select { - case p, ok := <-pinCh: - if !ok { - break loop - } - pins = append(pins, p) - case <-ctx.Done(): - return nil, ctx.Err() - } - } - err := <-errCh - if err != nil { - return nil, err - } - - return pins, nil +func (p *pinInfo) Err() error { + return p.err } -// PinLsAll is an internal function for returning a list of pins -func PinLsAll(ctx context.Context, typeStr string, pin pin.Pinner, dag ipld.DAGService) (chan coreiface.Pin, chan error) { - ch := make(chan coreiface.Pin, 32) - errCh := make(chan error, 1) +func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) <-chan coreiface.Pin { + out := make(chan coreiface.Pin) keys := cid.NewSet() - AddToResultKeys := func(keyList []cid.Cid, typeStr string) error { + + AddToResultKeys := func(keyList []cid.Cid, typeStr string) { for _, c := range keyList { if keys.Visit(c) { - select { - case ch <- &pinInfo{ + out <- &pinInfo{ pinType: typeStr, path: path.IpldPath(c), - }: - case <-ctx.Done(): - return ctx.Err() } } } - return nil + } + + VisitKeys := func(keyList []cid.Cid) { + for _, c := range keyList { + keys.Visit(c) + } } go func() { - defer close(ch) - defer close(errCh) - if typeStr == "direct" || typeStr == "all" { - dkeys, err := pin.DirectKeys(ctx) + defer close(out) + + if typeStr == "recursive" || typeStr == "all" { + rkeys, err := api.pinning.RecursiveKeys(ctx) if err != nil { - errCh <- err - return - } - if err := AddToResultKeys(dkeys, "direct"); err != nil { - errCh <- err + out <- &pinInfo{err: err} return } + AddToResultKeys(rkeys, "recursive") } - if typeStr == "recursive" || typeStr == "all" { - rkeys, err := pin.RecursiveKeys(ctx) + if typeStr == "direct" || typeStr == "all" { + dkeys, err := api.pinning.DirectKeys(ctx) if err != nil { - errCh <- err - return - } - if err := AddToResultKeys(rkeys, "recursive"); err != nil { - errCh <- err + out <- &pinInfo{err: err} return } + AddToResultKeys(dkeys, "direct") } - if typeStr == "indirect" || typeStr == "all" { - rkeys, err := pin.RecursiveKeys(ctx) + if typeStr == "all" { + set := cid.NewSet() + rkeys, err := api.pinning.RecursiveKeys(ctx) if err != nil { - errCh <- err + out <- &pinInfo{err: err} return } - - // If we're only listing indirect pins, we need to - // explicitly mark direct/recursive pins so we don't - // send them. - if typeStr == "indirect" { - dkeys, err := pin.DirectKeys(ctx) + for _, k := range rkeys { + err := merkledag.Walk( + ctx, merkledag.GetLinksWithDAG(api.dag), k, + set.Visit, + merkledag.SkipRoot(), merkledag.Concurrent(), + ) if err != nil { - errCh <- err + out <- &pinInfo{err: err} return } + } + AddToResultKeys(set.Keys(), "indirect") + } + if typeStr == "indirect" { + // We need to first visit the direct pins that have priority + // without emitting them - for _, k := range dkeys { - keys.Add(k) - } - for _, k := range rkeys { - keys.Add(k) - } + dkeys, err := api.pinning.DirectKeys(ctx) + if err != nil { + out <- &pinInfo{err: err} + return } + VisitKeys(dkeys) - indirectKeys := cid.NewSet() - for _, k := range rkeys { - err := merkledag.Walk(ctx, merkledag.GetLinksWithDAG(dag), k, func(c cid.Cid) bool { - r := indirectKeys.Visit(c) - if r { - if err := AddToResultKeys([]cid.Cid{c}, "indirect"); err != nil { - return false - } - } - return r - }, merkledag.SkipRoot(), merkledag.Concurrent()) + rkeys, err := api.pinning.RecursiveKeys(ctx) + if err != nil { + out <- &pinInfo{err: err} + return + } + VisitKeys(rkeys) + set := cid.NewSet() + for _, k := range rkeys { + err := merkledag.Walk( + ctx, merkledag.GetLinksWithDAG(api.dag), k, + set.Visit, + merkledag.SkipRoot(), merkledag.Concurrent(), + ) if err != nil { - errCh <- err + out <- &pinInfo{err: err} return } } + AddToResultKeys(set.Keys(), "indirect") } }() - return ch, errCh + return out } func (api *PinAPI) core() coreiface.CoreAPI { diff --git a/go.mod b/go.mod index 16626fdbfed..627d34a6649 100644 --- a/go.mod +++ b/go.mod @@ -54,7 +54,7 @@ require ( github.com/ipfs/go-path v0.0.7 github.com/ipfs/go-unixfs v0.2.4 github.com/ipfs/go-verifcid v0.0.1 - github.com/ipfs/interface-go-ipfs-core v0.2.7 + github.com/ipfs/interface-go-ipfs-core v0.3.0 github.com/ipld/go-car v0.1.0 github.com/jbenet/go-is-domain v1.0.3 github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c diff --git a/go.sum b/go.sum index 3e6d0c346d4..39f03033f3b 100644 --- a/go.sum +++ b/go.sum @@ -398,8 +398,8 @@ github.com/ipfs/go-unixfs v0.2.4 h1:6NwppOXefWIyysZ4LR/qUBPvXd5//8J3jiMdvpbw6Lo= github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw= github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E= github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0= -github.com/ipfs/interface-go-ipfs-core v0.2.7 h1:HCwVmU9Tmba6jdMGxMcPsfwKUBY4y+6bLHp8T+t9hTU= -github.com/ipfs/interface-go-ipfs-core v0.2.7/go.mod h1:Tihp8zxGpUeE3Tokr94L6zWZZdkRQvG5TL6i9MuNE+s= +github.com/ipfs/interface-go-ipfs-core v0.3.0 h1:oZdLLfh256gPGcYPURjivj/lv296GIcr8mUqZUnXOEI= +github.com/ipfs/interface-go-ipfs-core v0.3.0/go.mod h1:Tihp8zxGpUeE3Tokr94L6zWZZdkRQvG5TL6i9MuNE+s= github.com/ipld/go-car v0.1.0 h1:AaIEA5ITRnFA68uMyuIPYGM2XXllxsu8sNjFJP797us= github.com/ipld/go-car v0.1.0/go.mod h1:RCWzaUh2i4mOEkB3W45Vc+9jnS/M6Qay5ooytiBHl3g= github.com/ipld/go-ipld-prime v0.0.2-0.20191108012745-28a82f04c785 h1:fASnkvtR+SmB2y453RxmDD3Uvd4LonVUgFGk9JoDaZs= @@ -1198,7 +1198,6 @@ golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200413165638-669c56c373c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f h1:gWF768j/LaZugp8dyS4UwsslYCYz9XgFxvlgsn0n9H8= golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= From 6388f5a1c2439a3d3677f67c0deb79d3cb91f504 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Tue, 5 May 2020 14:19:38 +0200 Subject: [PATCH 2/2] pin: honor the context more accurately --- core/coreapi/pin.go | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 000d7d593ea..5bd2f9d5af3 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -53,7 +53,7 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan c return nil, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type) } - return api.pinLsAll(settings.Type, ctx), nil + return api.pinLsAll(ctx, settings.Type), nil } func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.PinIsPinnedOption) (string, bool, error) { @@ -218,20 +218,26 @@ func (p *pinInfo) Err() error { return p.err } -func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) <-chan coreiface.Pin { +// pinLsAll is an internal function for returning a list of pins +func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreiface.Pin { out := make(chan coreiface.Pin) keys := cid.NewSet() - AddToResultKeys := func(keyList []cid.Cid, typeStr string) { + AddToResultKeys := func(keyList []cid.Cid, typeStr string) error { for _, c := range keyList { if keys.Visit(c) { - out <- &pinInfo{ + select { + case out <- &pinInfo{ pinType: typeStr, path: path.IpldPath(c), + }: + case <-ctx.Done(): + return ctx.Err() } } } + return nil } VisitKeys := func(keyList []cid.Cid) { @@ -249,7 +255,10 @@ func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) <-chan coreifac out <- &pinInfo{err: err} return } - AddToResultKeys(rkeys, "recursive") + if err := AddToResultKeys(rkeys, "recursive"); err != nil { + out <- &pinInfo{err: err} + return + } } if typeStr == "direct" || typeStr == "all" { dkeys, err := api.pinning.DirectKeys(ctx) @@ -257,7 +266,10 @@ func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) <-chan coreifac out <- &pinInfo{err: err} return } - AddToResultKeys(dkeys, "direct") + if err := AddToResultKeys(dkeys, "direct"); err != nil { + out <- &pinInfo{err: err} + return + } } if typeStr == "all" { set := cid.NewSet() @@ -277,7 +289,10 @@ func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) <-chan coreifac return } } - AddToResultKeys(set.Keys(), "indirect") + if err := AddToResultKeys(set.Keys(), "indirect"); err != nil { + out <- &pinInfo{err: err} + return + } } if typeStr == "indirect" { // We need to first visit the direct pins that have priority @@ -309,7 +324,10 @@ func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) <-chan coreifac return } } - AddToResultKeys(set.Keys(), "indirect") + if err := AddToResultKeys(set.Keys(), "indirect"); err != nil { + out <- &pinInfo{err: err} + return + } } }()