Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: pin add --max-depth (arbitrary depth recursive pins) #5142

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 78 additions & 29 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package commands
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"time"
Expand All @@ -16,6 +17,7 @@ import (
path "github.com/ipfs/go-ipfs/path"
resolver "github.com/ipfs/go-ipfs/path/resolver"
pin "github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/thirdparty/recpinset"
"github.com/ipfs/go-ipfs/thirdparty/verifcid"
uio "github.com/ipfs/go-ipfs/unixfs/io"

Expand Down Expand Up @@ -60,6 +62,7 @@ var addPinCmd = &cmds.Command{
Options: []cmdkit.Option{
cmdkit.BoolOption("recursive", "r", "Recursively pin the object linked to by the specified object(s).").WithDefault(true),
cmdkit.BoolOption("progress", "Show progress"),
cmdkit.IntOption("max-depth", "Only for recursive pins, fetch and pin graph limiting the branch depth").WithDefault(-1),
},
Type: AddPinOutput{},
Run: func(req cmds.Request, res cmds.Response) {
Expand All @@ -77,10 +80,29 @@ var addPinCmd = &cmds.Command{
res.SetError(err, cmdkit.ErrNormal)
return
}
maxDepth, _, err := req.Option("max-depth").Int()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

switch {
case !recursive:
maxDepth = 0
case recursive && maxDepth == 0:
res.SetError(
errors.New("invalid --max-depth=0. Use a direct pin instead"),
cmdkit.ErrNormal,
)
return
case !recursive && maxDepth < 0:
maxDepth = -1
}

showProgress, _, _ := req.Option("progress").Bool()

if !showProgress {
added, err := corerepo.Pin(n, req.Context(), req.Arguments(), recursive)
added, err := corerepo.Pin(n, req.Context(), req.Arguments(), maxDepth)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
Expand All @@ -100,7 +122,7 @@ var addPinCmd = &cmds.Command{
}
ch := make(chan pinResult, 1)
go func() {
added, err := corerepo.Pin(n, ctx, req.Arguments(), recursive)
added, err := corerepo.Pin(n, ctx, req.Arguments(), maxDepth)
ch <- pinResult{pins: added, err: err}
}()

Expand Down Expand Up @@ -526,10 +548,13 @@ func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsN
return nil, fmt.Errorf("path '%s' is not pinned", p)
}

switch pinType {
case "direct", "indirect", "recursive", "internal":
default:
pinType = "indirect through " + pinType
mode, _ = pin.StringToMode(pinType)
if mode < pin.RecursiveN {
switch pinType {
case "direct", "indirect", "recursive", "internal":
default:
pinType = "indirect through " + pinType
}
}
keys[c.String()] = RefKeyObject{
Type: pinType,
Expand All @@ -555,17 +580,28 @@ func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode) (map[string
AddToResultKeys(n.Pinning.DirectKeys(), "direct")
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
err := dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(n.DAG), k, set.Visit)
set := recpinset.New()
for _, recPin := range n.Pinning.RecursivePins() {
err := dag.EnumerateChildrenMaxDepth(
ctx,
dag.GetLinksWithDAG(n.DAG),
recPin.Cid,
recPin.MaxDepth,
set.Visit,
)
if err != nil {
return nil, err
}
}
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
for _, recPin := range n.Pinning.RecursivePins() {
mode, _ := pin.ModeToString(pin.MaxDepthToMode(recPin.MaxDepth))
keys[recPin.Cid.String()] = RefKeyObject{
Type: mode,
}
}
}

return keys, nil
Expand Down Expand Up @@ -595,60 +631,73 @@ type pinVerifyOpts struct {
}

func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts) <-chan interface{} {
visited := make(map[string]PinStatus)
statuses := make(map[string]PinStatus)
visited := recpinset.New()

bs := n.Blocks.Blockstore()
DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := dag.GetLinksWithDAG(DAG)
recPins := n.Pinning.RecursiveKeys()
recPins := n.Pinning.RecursivePins()

var checkPin func(root *cid.Cid) PinStatus
checkPin = func(root *cid.Cid) PinStatus {
key := root.String()
if status, ok := visited[key]; ok {
return status
}

if err := verifcid.ValidateCid(root); err != nil {
validateCid := func(c *cid.Cid) PinStatus {
if err := verifcid.ValidateCid(c); err != nil {
status := PinStatus{Ok: false}
if opts.explain {
status.BadNodes = []BadNode{BadNode{Cid: key, Err: err.Error()}}
status.BadNodes = []BadNode{BadNode{Cid: c.String(), Err: err.Error()}}
}
visited[key] = status
return status
}
return PinStatus{Ok: true}
}

var checkPinMaxDepth func(root *cid.Cid, maxDepth int) PinStatus
checkPinMaxDepth = func(root *cid.Cid, maxDepth int) PinStatus {
key := root.String()
// it was visited already, return last status
if !visited.Visit(root, maxDepth) {
return statuses[key]
}

status := validateCid(root)
if maxDepth == 0 || !status.Ok {
statuses[key] = status
return status
}

if maxDepth > 0 {
maxDepth--
}

links, err := getLinks(ctx, root)
if err != nil {
status := PinStatus{Ok: false}
if opts.explain {
status.BadNodes = []BadNode{BadNode{Cid: key, Err: err.Error()}}
}
visited[key] = status
statuses[key] = status
return status
}

status := PinStatus{Ok: true}
for _, lnk := range links {
res := checkPin(lnk.Cid)
res := checkPinMaxDepth(lnk.Cid, maxDepth)
if !res.Ok {
status.Ok = false
status.BadNodes = append(status.BadNodes, res.BadNodes...)
}
}

visited[key] = status
statuses[key] = status
return status
}

out := make(chan interface{})
go func() {
defer close(out)
for _, cid := range recPins {
pinStatus := checkPin(cid)
for _, recPin := range recPins {
pinStatus := checkPinMaxDepth(recPin.Cid, recPin.MaxDepth)
if !pinStatus.Ok || opts.includeOk {
select {
case out <- &PinVerifyRes{cid.String(), pinStatus}:
case out <- &PinVerifyRes{recPin.Cid.String(), pinStatus}:
case <-ctx.Done():
return
}
Expand Down
93 changes: 53 additions & 40 deletions core/commands/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ipfs/go-ipfs/core"
e "github.com/ipfs/go-ipfs/core/commands/e"
path "github.com/ipfs/go-ipfs/path"
"github.com/ipfs/go-ipfs/thirdparty/recpinset"

ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
Expand Down Expand Up @@ -64,6 +65,7 @@ NOTE: List all references recursively by using the flag '-r'.
cmdkit.BoolOption("edges", "e", "Emit edge format: `<from> -> <to>`."),
cmdkit.BoolOption("unique", "u", "Omit duplicate refs from output."),
cmdkit.BoolOption("recursive", "r", "Recursively list links of child nodes."),
cmdkit.IntOption("max-depth", "Only for recursive depths, list down to a maximum branch depth").WithDefault(-1),
},
Run: func(req cmds.Request, res cmds.Response) {
ctx := req.Context()
Expand All @@ -85,6 +87,25 @@ NOTE: List all references recursively by using the flag '-r'.
return
}

maxDepth, _, err := req.Option("max-depth").Int()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

switch {
case !recursive:
maxDepth = 0
case recursive && maxDepth == 0:
res.SetError(
errors.New("invalid --max-depth=0 for recursive references"),
cmdkit.ErrNormal,
)
return
case !recursive && maxDepth < 0:
maxDepth = -1
}

format, _, err := req.Option("format").String()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
Expand Down Expand Up @@ -125,6 +146,7 @@ NOTE: List all references recursively by using the flag '-r'.
Unique: unique,
PrintFmt: format,
Recursive: recursive,
MaxDepth: maxDepth,
}

for _, o := range objs {
Expand Down Expand Up @@ -233,39 +255,53 @@ type RefWriter struct {

Unique bool
Recursive bool
MaxDepth int
PrintFmt string

seen *cid.Set
explored *recpinset.Set
}

// WriteRefs writes refs of the given object to the underlying writer.
func (rw *RefWriter) WriteRefs(n ipld.Node) (int, error) {
maxDepth := 1 // single
if rw.Recursive {
return rw.writeRefsRecursive(n)
maxDepth = rw.MaxDepth
}
return rw.writeRefsSingle(n)

return rw.writeRefsRecursive(n, maxDepth)
}

func (rw *RefWriter) writeRefsRecursive(n ipld.Node) (int, error) {
func (rw *RefWriter) writeRefsRecursive(n ipld.Node, maxDepth int) (int, error) {
if maxDepth == 0 {
return 0, nil
}

if maxDepth > 0 {
maxDepth--
}

nc := n.Cid()

var count int
for i, ng := range ipld.GetDAG(rw.Ctx, rw.DAG, n) {
lc := n.Links()[i].Cid
if rw.skip(lc) {
skipWrite, skipExplore := rw.skip(lc, maxDepth)
if skipExplore {
continue
}

if err := rw.WriteEdge(nc, lc, n.Links()[i].Name); err != nil {
return count, err
if !skipWrite {
if err := rw.WriteEdge(nc, lc, n.Links()[i].Name); err != nil {
return count, err
}
}

nd, err := ng.Get(rw.Ctx)
if err != nil {
return count, err
}

c, err := rw.writeRefsRecursive(nd)
c, err := rw.writeRefsRecursive(nd, maxDepth)
count += c
if err != nil {
return count, err
Expand All @@ -274,43 +310,20 @@ func (rw *RefWriter) writeRefsRecursive(n ipld.Node) (int, error) {
return count, nil
}

func (rw *RefWriter) writeRefsSingle(n ipld.Node) (int, error) {
c := n.Cid()

if rw.skip(c) {
return 0, nil
}

count := 0
for _, l := range n.Links() {
lc := l.Cid
if rw.skip(lc) {
continue
}

if err := rw.WriteEdge(c, lc, l.Name); err != nil {
return count, err
}
count++
}
return count, nil
}

// skip returns whether to skip a cid
func (rw *RefWriter) skip(c *cid.Cid) bool {
// skip returns whether a cid has been seen (skip write) and whether
// a cid branch has been explored (skip explore)
func (rw *RefWriter) skip(c *cid.Cid, depth int) (bool, bool) {
if !rw.Unique {
return false
return false, false
}

if rw.seen == nil {
rw.seen = cid.NewSet()
if rw.explored == nil {
rw.explored = recpinset.New()
}

has := rw.seen.Has(c)
if !has {
rw.seen.Add(c)
}
return has
skipWrite := rw.explored.Has(c)

return skipWrite, !rw.explored.Visit(c, depth)
}

// Write one edge
Expand Down
2 changes: 2 additions & 0 deletions core/coreapi/interface/options/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package options

type PinAddSettings struct {
Recursive bool
MaxDepth int
}

type PinLsSettings struct {
Expand All @@ -19,6 +20,7 @@ type PinUpdateOption func(*PinUpdateSettings) error
func PinAddOptions(opts ...PinAddOption) (*PinAddSettings, error) {
options := &PinAddSettings{
Recursive: true,
MaxDepth: -1,
}

for _, opt := range opts {
Expand Down
Loading