Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve dag/import #9721

Merged
merged 2 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions core/commands/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,6 @@ var DagResolveCmd = &cmds.Command{
Type: ResolveOutput{},
}

type importResult struct {
blockCount uint64
blockBytesCount uint64
roots map[cid.Cid]struct{}
err error
}

// DagImportCmd is a command for importing a car to ipfs
var DagImportCmd = &cmds.Command{
Helptext: cmds.HelpText{
Expand Down
178 changes: 57 additions & 121 deletions core/commands/dag/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,22 @@ package dagcmd

import (
"errors"
"fmt"
"io"

cid "github.com/ipfs/go-cid"
cmds "github.com/ipfs/go-ipfs-cmds"
ipld "github.com/ipfs/go-ipld-format"
ipldlegacy "github.com/ipfs/go-ipld-legacy"
"github.com/ipfs/go-libipfs/files"
iface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
"github.com/ipfs/interface-go-ipfs-core/path"
gocarv2 "github.com/ipld/go-car/v2"

MichaelMure marked this conversation as resolved.
Show resolved Hide resolved
"github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/ipfs/kubo/core/commands/cmdutils"

cmds "github.com/ipfs/go-ipfs-cmds"
gocarv2 "github.com/ipld/go-car/v2"
)

func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {

node, err := cmdenv.GetNode(env)
if err != nil {
return err
Expand All @@ -38,127 +36,42 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
return err
}

doPinRoots, _ := req.Options[pinRootsOptionName].(bool)

// grab a pinlock ( which doubles as a GC lock ) so that regardless of the
// size of the streamed-in cars nothing will disappear on us before we had
// a chance to roots that may show up at the very end
// This is especially important for use cases like dagger:
// ipfs dag import $( ... | ipfs-dagger --stdout=carfifos )
//
unlocker := node.Blockstore.PinLock(req.Context)
defer unlocker.Unlock(req.Context)

doPinRoots, _ := req.Options[pinRootsOptionName].(bool)

retCh := make(chan importResult, 1)
go importWorker(req, res, api, retCh)

done := <-retCh
if done.err != nil {
return done.err
}

// It is not guaranteed that a root in a header is actually present in the same ( or any )
// .car file. This is the case in version 1, and ideally in further versions too
// Accumulate any root CID seen in a header, and supplement its actual node if/when encountered
// We will attempt a pin *only* at the end in case all car files were well formed
//
// The boolean value indicates whether we have encountered the root within the car file's
roots := done.roots

// opportunistic pinning: try whatever sticks
if doPinRoots {

var failedPins int
for c := range roots {

// We need to re-retrieve a block, convert it to ipld, and feed it
// to the Pinning interface, sigh...
//
// If we didn't have the problem of inability to take multiple pinlocks,
// we could use the api directly like so (though internally it does the same):
//
// // not ideal, but the pinning api takes only paths :(
// rp := path.NewResolvedPath(
// ipfspath.FromCid(c),
// c,
// c,
// "",
// )
//
// if err := api.Pin().Add(req.Context, rp, options.Pin.Recursive(true)); err != nil {

ret := RootMeta{Cid: c}

if block, err := node.Blockstore.Get(req.Context, c); err != nil {
ret.PinErrorMsg = err.Error()
} else if nd, err := ipldlegacy.DecodeNode(req.Context, block); err != nil {
ret.PinErrorMsg = err.Error()
} else if err := node.Pinning.Pin(req.Context, nd, true); err != nil {
ret.PinErrorMsg = err.Error()
} else if err := node.Pinning.Flush(req.Context); err != nil {
ret.PinErrorMsg = err.Error()
}

if ret.PinErrorMsg != "" {
failedPins++
}

if err := res.Emit(&CarImportOutput{Root: &ret}); err != nil {
return err
}
}

if failedPins > 0 {
return fmt.Errorf(
"unable to pin all roots: %d out of %d failed",
failedPins,
len(roots),
)
}
}

stats, _ := req.Options[statsOptionName].(bool)
if stats {
err = res.Emit(&CarImportOutput{
Stats: &CarImportStats{
BlockCount: done.blockCount,
BlockBytesCount: done.blockBytesCount,
},
})
if err != nil {
return err
}
unlocker := node.Blockstore.PinLock(req.Context)
defer unlocker.Unlock(req.Context)
}

return nil
}

func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI, ret chan importResult) {

// this is *not* a transaction
// it is simply a way to relieve pressure on the blockstore
// similar to pinner.Pin/pinner.Flush
batch := ipld.NewBatch(req.Context, api.Dag())

roots := make(map[cid.Cid]struct{})
roots := cid.NewSet()
var blockCount, blockBytesCount uint64

it := req.Files.Entries()
for it.Next() {

file := files.FileFromEntry(it)
if file == nil {
ret <- importResult{err: errors.New("expected a file handle")}
return
return errors.New("expected a file handle")
}

// wrap a defer-closer-scope
//
// every single file in it() is already open before we start
// just close here sooner rather than later for neatness
// and to surface potential errors writing on closed fifos
// this won't/can't help with not running out of handles
err := func() error {
// import blocks
err = func() error {
// wrap a defer-closer-scope
//
// every single file in it() is already open before we start
// just close here sooner rather than later for neatness
// and to surface potential errors writing on closed fifos
// this won't/can't help with not running out of handles
defer file.Close()

car, err := gocarv2.NewBlockReader(file)
Expand All @@ -167,7 +80,7 @@ func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI,
}

for _, c := range car.Roots {
roots[c] = struct{}{}
roots.Add(c)
}

for {
Expand All @@ -193,28 +106,51 @@ func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI,
blockCount++
blockBytesCount += uint64(len(block.RawData()))
}

return nil
}()
}

if err != nil {
ret <- importResult{err: err}
return
}
if err := batch.Commit(); err != nil {
return err
}

if err := it.Err(); err != nil {
ret <- importResult{err: err}
return
// It is not guaranteed that a root in a header is actually present in the same ( or any )
// .car file. This is the case in version 1, and ideally in further versions too.
// Accumulate any root CID seen in a header, and supplement its actual node if/when encountered
// We will attempt a pin *only* at the end in case all car files were well-formed.

// opportunistic pinning: try whatever sticks
if doPinRoots {
err = roots.ForEach(func(c cid.Cid) error {
ret := RootMeta{Cid: c}

// This will trigger a full read of the DAG in the pinner, to make sure we have all blocks.
// Ideally we would have a lighter merkledag.Walk() instead of the underlying merkledag.FetchDag,
// then pinner.PinWithMode().
err = api.Pin().Add(req.Context, path.IpldPath(c), options.Pin.Recursive(true))
if err != nil {
return err
}

return res.Emit(&CarImportOutput{Root: &ret})
})
if err != nil {
return err
}
}

if err := batch.Commit(); err != nil {
ret <- importResult{err: err}
return
stats, _ := req.Options[statsOptionName].(bool)
if stats {
err = res.Emit(&CarImportOutput{
Stats: &CarImportStats{
BlockCount: blockCount,
BlockBytesCount: blockBytesCount,
},
})
if err != nil {
return err
}
}

ret <- importResult{
blockCount: blockCount,
blockBytesCount: blockBytesCount,
roots: roots}
return nil
}