Skip to content

Commit

Permalink
feat: improve dag/import
Browse files Browse the repository at this point in the history
- don't bypass the CoreApi
- don't use a goroutine and return channel for `importWorker`, when what's happening is really just a synchronous call
- only `PinLock()` when we are going to pin
- use `cid.Set` instead of an explicit map
- fail the request early if any pinning fail, no need to try to pin more if the request failed already
  • Loading branch information
MichaelMure committed Mar 14, 2023
1 parent 5b9442c commit 2ec59ae
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 128 deletions.
7 changes: 0 additions & 7 deletions core/commands/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,6 @@ var DagResolveCmd = &cmds.Command{
Type: ResolveOutput{},
}

type importResult struct {
blockCount uint64
blockBytesCount uint64
roots map[cid.Cid]struct{}
err error
}

// DagImportCmd is a command for importing a car to ipfs
var DagImportCmd = &cmds.Command{
Helptext: cmds.HelpText{
Expand Down
178 changes: 57 additions & 121 deletions core/commands/dag/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,22 @@ package dagcmd

import (
"errors"
"fmt"
"io"

cid "github.com/ipfs/go-cid"
cmds "github.com/ipfs/go-ipfs-cmds"
ipld "github.com/ipfs/go-ipld-format"
ipldlegacy "github.com/ipfs/go-ipld-legacy"
"github.com/ipfs/go-libipfs/files"
iface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
"github.com/ipfs/interface-go-ipfs-core/path"
gocarv2 "github.com/ipld/go-car/v2"

"github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/ipfs/kubo/core/commands/cmdutils"

cmds "github.com/ipfs/go-ipfs-cmds"
gocarv2 "github.com/ipld/go-car/v2"
)

func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {

node, err := cmdenv.GetNode(env)
if err != nil {
return err
Expand All @@ -38,127 +36,42 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
return err
}

doPinRoots, _ := req.Options[pinRootsOptionName].(bool)

// grab a pinlock ( which doubles as a GC lock ) so that regardless of the
// size of the streamed-in cars nothing will disappear on us before we had
// a chance to roots that may show up at the very end
// This is especially important for use cases like dagger:
// ipfs dag import $( ... | ipfs-dagger --stdout=carfifos )
//
unlocker := node.Blockstore.PinLock(req.Context)
defer unlocker.Unlock(req.Context)

doPinRoots, _ := req.Options[pinRootsOptionName].(bool)

retCh := make(chan importResult, 1)
go importWorker(req, res, api, retCh)

done := <-retCh
if done.err != nil {
return done.err
}

// It is not guaranteed that a root in a header is actually present in the same ( or any )
// .car file. This is the case in version 1, and ideally in further versions too
// Accumulate any root CID seen in a header, and supplement its actual node if/when encountered
// We will attempt a pin *only* at the end in case all car files were well formed
//
// The boolean value indicates whether we have encountered the root within the car file's
roots := done.roots

// opportunistic pinning: try whatever sticks
if doPinRoots {

var failedPins int
for c := range roots {

// We need to re-retrieve a block, convert it to ipld, and feed it
// to the Pinning interface, sigh...
//
// If we didn't have the problem of inability to take multiple pinlocks,
// we could use the api directly like so (though internally it does the same):
//
// // not ideal, but the pinning api takes only paths :(
// rp := path.NewResolvedPath(
// ipfspath.FromCid(c),
// c,
// c,
// "",
// )
//
// if err := api.Pin().Add(req.Context, rp, options.Pin.Recursive(true)); err != nil {

ret := RootMeta{Cid: c}

if block, err := node.Blockstore.Get(req.Context, c); err != nil {
ret.PinErrorMsg = err.Error()
} else if nd, err := ipldlegacy.DecodeNode(req.Context, block); err != nil {
ret.PinErrorMsg = err.Error()
} else if err := node.Pinning.Pin(req.Context, nd, true); err != nil {
ret.PinErrorMsg = err.Error()
} else if err := node.Pinning.Flush(req.Context); err != nil {
ret.PinErrorMsg = err.Error()
}

if ret.PinErrorMsg != "" {
failedPins++
}

if err := res.Emit(&CarImportOutput{Root: &ret}); err != nil {
return err
}
}

if failedPins > 0 {
return fmt.Errorf(
"unable to pin all roots: %d out of %d failed",
failedPins,
len(roots),
)
}
}

stats, _ := req.Options[statsOptionName].(bool)
if stats {
err = res.Emit(&CarImportOutput{
Stats: &CarImportStats{
BlockCount: done.blockCount,
BlockBytesCount: done.blockBytesCount,
},
})
if err != nil {
return err
}
unlocker := node.Blockstore.PinLock(req.Context)
defer unlocker.Unlock(req.Context)
}

return nil
}

func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI, ret chan importResult) {

// this is *not* a transaction
// it is simply a way to relieve pressure on the blockstore
// similar to pinner.Pin/pinner.Flush
batch := ipld.NewBatch(req.Context, api.Dag())

roots := make(map[cid.Cid]struct{})
roots := cid.NewSet()
var blockCount, blockBytesCount uint64

it := req.Files.Entries()
for it.Next() {

file := files.FileFromEntry(it)
if file == nil {
ret <- importResult{err: errors.New("expected a file handle")}
return
return errors.New("expected a file handle")
}

// wrap a defer-closer-scope
//
// every single file in it() is already open before we start
// just close here sooner rather than later for neatness
// and to surface potential errors writing on closed fifos
// this won't/can't help with not running out of handles
err := func() error {
// import blocks
err = func() error {
// wrap a defer-closer-scope
//
// every single file in it() is already open before we start
// just close here sooner rather than later for neatness
// and to surface potential errors writing on closed fifos
// this won't/can't help with not running out of handles
defer file.Close()

car, err := gocarv2.NewBlockReader(file)
Expand All @@ -167,7 +80,7 @@ func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI,
}

for _, c := range car.Roots {
roots[c] = struct{}{}
roots.Add(c)
}

for {
Expand All @@ -193,28 +106,51 @@ func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI,
blockCount++
blockBytesCount += uint64(len(block.RawData()))
}

return nil
}()
}

if err != nil {
ret <- importResult{err: err}
return
}
if err := batch.Commit(); err != nil {
return err
}

if err := it.Err(); err != nil {
ret <- importResult{err: err}
return
// It is not guaranteed that a root in a header is actually present in the same ( or any )
// .car file. This is the case in version 1, and ideally in further versions too.
// Accumulate any root CID seen in a header, and supplement its actual node if/when encountered
// We will attempt a pin *only* at the end in case all car files were well-formed.

// opportunistic pinning: try whatever sticks
if doPinRoots {
err = roots.ForEach(func(c cid.Cid) error {
ret := RootMeta{Cid: c}

// This will trigger a full read of the DAG in the pinner, to make sure we have all blocks.
// Ideally we would have a lighter merkledag.Walk() instead of the underlying merkledag.FetchDag,
// then pinner.PinWithMode().
err = api.Pin().Add(req.Context, path.IpldPath(c), options.Pin.Recursive(true))
if err != nil {
return err
}

return res.Emit(&CarImportOutput{Root: &ret})
})
if err != nil {
return err
}
}

if err := batch.Commit(); err != nil {
ret <- importResult{err: err}
return
stats, _ := req.Options[statsOptionName].(bool)
if stats {
err = res.Emit(&CarImportOutput{
Stats: &CarImportStats{
BlockCount: blockCount,
BlockBytesCount: blockBytesCount,
},
})
if err != nil {
return err
}
}

ret <- importResult{
blockCount: blockCount,
blockBytesCount: blockBytesCount,
roots: roots}
return nil
}

0 comments on commit 2ec59ae

Please sign in to comment.