Skip to content

Commit

Permalink
fix (pinning): pin ls traverses all indirect pins. pin ls pin type pr…
Browse files Browse the repository at this point in the history
…ecedence change - a direct/recursive pin is now labeled as such even if also indirectly pinned.
  • Loading branch information
aschmahmann authored and Stebalien committed Dec 2, 2019
1 parent 59834fd commit 0906d7f
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 114 deletions.
92 changes: 24 additions & 68 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
coreapi "github.com/ipfs/go-ipfs/core/coreapi"
pin "github.com/ipfs/go-ipfs/pin"

bserv "github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
cidenc "github.com/ipfs/go-cidutil/cidenc"
cmds "github.com/ipfs/go-ipfs-cmds"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
verifcid "github.com/ipfs/go-verifcid"
coreiface "github.com/ipfs/interface-go-ipfs-core"
Expand Down Expand Up @@ -352,7 +354,7 @@ Example:
if len(req.Arguments) > 0 {
err = pinLsKeys(req, typeStr, n, api, emit)
} else {
err = pinLsAll(req, typeStr, n, emit)
err = pinLsAll(req, typeStr, n.Pinning, n.DAG, emit)
}
if err != nil {
return err
Expand Down Expand Up @@ -475,84 +477,38 @@ func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreifac
return nil
}

func pinLsAll(req *cmds.Request, typeStr string, n *core.IpfsNode, emit func(value interface{}) error) error {
func pinLsAll(req *cmds.Request, typeStr string, pinning pin.Pinner, dag ipld.DAGService, emit func(value interface{}) error) error {
pinCh, errCh := coreapi.PinLsAll(req.Context, typeStr, pinning, dag)

enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
}

keys := cid.NewSet()

AddToResultKeys := func(keyList []cid.Cid, typeStr string) error {
for _, c := range keyList {
if keys.Visit(c) {
err := emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: typeStr,
Cid: enc.Encode(c),
},
})
if err != nil {
return err
}
ctx := req.Context
loop:
for {
select {
case p, ok := <-pinCh:
if !ok {
break loop
}
}
return nil
}

if typeStr == "direct" || typeStr == "all" {
dkeys, err := n.Pinning.DirectKeys(req.Context)
if err != nil {
return err
}
err = AddToResultKeys(dkeys, "direct")
if err != nil {
return err
}
}
if typeStr == "recursive" || typeStr == "all" {
rkeys, err := n.Pinning.RecursiveKeys(req.Context)
if err != nil {
return err
}
err = AddToResultKeys(rkeys, "recursive")
if err != nil {
return err
}
}
if typeStr == "indirect" || typeStr == "all" {
rkeys, err := n.Pinning.RecursiveKeys(req.Context)
if err != nil {
return err
}
for _, k := range rkeys {
var visitErr error
err := dag.Walk(req.Context, dag.GetLinksWithDAG(n.DAG), k, func(c cid.Cid) bool {
r := keys.Visit(c)
if r {
err := emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: "indirect",
Cid: enc.Encode(c),
},
})
if err != nil {
visitErr = err
}
}
return r
}, dag.SkipRoot(), dag.Concurrent())

if visitErr != nil {
return visitErr
}
if err != nil {
if err := emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: p.Type(),
Cid: enc.Encode(p.Path().Cid()),
},
}); err != nil {
return err
}

case <-ctx.Done():
return ctx.Err()
}
}

return nil
err = <-errCh
return err
}

const (
Expand Down
150 changes: 108 additions & 42 deletions core/coreapi/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package coreapi
import (
"context"
"fmt"

bserv "github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-cid"
offline "github.com/ipfs/go-ipfs-exchange-offline"
merkledag "github.com/ipfs/go-merkledag"
"github.com/ipfs/go-ipfs/pin"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
coreiface "github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
path "github.com/ipfs/interface-go-ipfs-core/path"
"github.com/ipfs/interface-go-ipfs-core/path"
)

type PinAPI CoreAPI
Expand Down Expand Up @@ -194,57 +195,122 @@ func (p *pinInfo) Type() string {
}

func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) ([]coreiface.Pin, error) {
pinCh, errCh := PinLsAll(ctx, typeStr, api.pinning, api.dag)

var pins []coreiface.Pin
loop:
for {
select {
case p, ok := <-pinCh:
if !ok {
break loop
}
pins = append(pins, p)
case <-ctx.Done():
return nil, ctx.Err()
}
}
err := <-errCh
if err != nil {
return nil, err
}

return pins, nil
}

keys := make(map[cid.Cid]*pinInfo)
// PinLsAll is an internal function for returning a list of pins
func PinLsAll(ctx context.Context, typeStr string, pin pin.Pinner, dag ipld.DAGService) (chan coreiface.Pin, chan error) {
ch := make(chan coreiface.Pin, 32)
errCh := make(chan error, 1)

AddToResultKeys := func(keyList []cid.Cid, typeStr string) {
keys := cid.NewSet()
AddToResultKeys := func(keyList []cid.Cid, typeStr string) error {
for _, c := range keyList {
keys[c] = &pinInfo{
pinType: typeStr,
path: path.IpldPath(c),
if keys.Visit(c) {
select {
case ch <- &pinInfo{
pinType: typeStr,
path: path.IpldPath(c),
}:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
}

if typeStr == "direct" || typeStr == "all" {
dkeys, err := api.pinning.DirectKeys(ctx)
if err != nil {
return nil, err
}
AddToResultKeys(dkeys, "direct")
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
rkeys, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
for _, k := range rkeys {
err := merkledag.Walk(
ctx, merkledag.GetLinksWithDAG(api.dag), k,
set.Visit,
merkledag.SkipRoot(), merkledag.Concurrent(),
)
go func() {
defer close(ch)
defer close(errCh)
if typeStr == "direct" || typeStr == "all" {
dkeys, err := pin.DirectKeys(ctx)
if err != nil {
return nil, err
errCh <- err
return
}
if err := AddToResultKeys(dkeys, "direct"); err != nil {
errCh <- err
return
}
}
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
rkeys, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
if typeStr == "recursive" || typeStr == "all" {
rkeys, err := pin.RecursiveKeys(ctx)
if err != nil {
errCh <- err
return
}
if err := AddToResultKeys(rkeys, "recursive"); err != nil {
errCh <- err
return
}
}
AddToResultKeys(rkeys, "recursive")
}
if typeStr == "indirect" || typeStr == "all" {
rkeys, err := pin.RecursiveKeys(ctx)
if err != nil {
errCh <- err
return
}

out := make([]coreiface.Pin, 0, len(keys))
for _, v := range keys {
out = append(out, v)
}
// If we're only listing indirect pins, we need to
// explicitly mark direct/recursive pins so we don't
// send them.
if typeStr == "indirect" {
dkeys, err := pin.DirectKeys(ctx)
if err != nil {
errCh <- err
return
}

for _, k := range dkeys {
keys.Add(k)
}
for _, k := range rkeys {
keys.Add(k)
}
}

return out, nil
indirectKeys := cid.NewSet()
for _, k := range rkeys {
err := merkledag.Walk(ctx, merkledag.GetLinksWithDAG(dag), k, func(c cid.Cid) bool {
r := indirectKeys.Visit(c)
if r {
if err := AddToResultKeys([]cid.Cid{c}, "indirect"); err != nil {
return false
}
}
return r
}, merkledag.SkipRoot(), merkledag.Concurrent())

if err != nil {
errCh <- err
return
}
}
}
}()

return ch, errCh
}

func (api *PinAPI) core() coreiface.CoreAPI {
Expand Down
2 changes: 1 addition & 1 deletion core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type UnixfsAPI CoreAPI
var nilNode *core.IpfsNode
var once sync.Once

func getOrCreateNilNode() (*core.IpfsNode,error) {
func getOrCreateNilNode() (*core.IpfsNode, error) {
once.Do(func() {
if nilNode != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/ipfs/go-path v0.0.7
github.com/ipfs/go-unixfs v0.2.1
github.com/ipfs/go-verifcid v0.0.1
github.com/ipfs/interface-go-ipfs-core v0.2.3
github.com/ipfs/interface-go-ipfs-core v0.2.5
github.com/jbenet/go-is-domain v1.0.3
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ github.com/ipfs/go-unixfs v0.2.1 h1:g51t9ODICFZ3F51FPivm8dE7NzYcdAQNUL9wGP5AYa0=
github.com/ipfs/go-unixfs v0.2.1/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k=
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
github.com/ipfs/interface-go-ipfs-core v0.2.3 h1:E6uQ+1fJjkxJWlL9lAE72a5FWeyeeNL3GitLy8+jq3Y=
github.com/ipfs/interface-go-ipfs-core v0.2.3/go.mod h1:Tihp8zxGpUeE3Tokr94L6zWZZdkRQvG5TL6i9MuNE+s=
github.com/ipfs/interface-go-ipfs-core v0.2.5 h1:/rspOe8RbIxwtssEXHB+X9JXhOBDCQt8x50d2kFPXL8=
github.com/ipfs/interface-go-ipfs-core v0.2.5/go.mod h1:Tihp8zxGpUeE3Tokr94L6zWZZdkRQvG5TL6i9MuNE+s=
github.com/jackpal/gateway v1.0.4/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/gateway v1.0.5 h1:qzXWUJfuMdlLMtt0a3Dgt+xkWQiA5itDEITVJtuSwMc=
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
Expand Down

0 comments on commit 0906d7f

Please sign in to comment.