Skip to content

Commit

Permalink
Merge pull request #2196 from ipfs/feat/mfs-flush-cmd
Browse files Browse the repository at this point in the history
Add flush command to ipfs files
  • Loading branch information
whyrusleeping committed Feb 9, 2016
2 parents 4c02c98 + c47e12e commit 37258a2
Show file tree
Hide file tree
Showing 18 changed files with 933 additions and 257 deletions.
25 changes: 19 additions & 6 deletions blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ type GCBlockstore interface {
// GCLock locks the blockstore for garbage collection. No operations
// that expect to finish with a pin should ocurr simultaneously.
// Reading during GC is safe, and requires no lock.
GCLock() func()
GCLock() Unlocker

// PinLock locks the blockstore for sequences of puts expected to finish
// with a pin (before GC). Multiple put->pin sequences can write through
// at the same time, but no GC should not happen simulatenously.
// Reading during Pinning is safe, and requires no lock.
PinLock() func()
PinLock() Unlocker

// GcRequested returns true if GCLock has been called and is waiting to
// take the lock
Expand Down Expand Up @@ -198,16 +198,29 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
return output, nil
}

func (bs *blockstore) GCLock() func() {
type Unlocker interface {
Unlock()
}

type unlocker struct {
unlock func()
}

func (u *unlocker) Unlock() {
u.unlock()
u.unlock = nil // ensure its not called twice
}

func (bs *blockstore) GCLock() Unlocker {
atomic.AddInt32(&bs.gcreq, 1)
bs.lk.Lock()
atomic.AddInt32(&bs.gcreq, -1)
return bs.lk.Unlock
return &unlocker{bs.lk.Unlock}
}

func (bs *blockstore) PinLock() func() {
func (bs *blockstore) PinLock() Unlocker {
bs.lk.RLock()
return bs.lk.RUnlock
return &unlocker{bs.lk.RUnlock}
}

func (bs *blockstore) GCRequested() bool {
Expand Down
4 changes: 2 additions & 2 deletions blocks/blockstore/write_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func (w *writecache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
return w.blockstore.AllKeysChan(ctx)
}

func (w *writecache) GCLock() func() {
func (w *writecache) GCLock() Unlocker {
return w.blockstore.(GCBlockstore).GCLock()
}

func (w *writecache) PinLock() func() {
func (w *writecache) PinLock() Unlocker {
return w.blockstore.(GCBlockstore).PinLock()
}

Expand Down
117 changes: 93 additions & 24 deletions core/commands/files/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ var FilesCmd = &cmds.Command{
Tagline: "Manipulate unixfs files.",
ShortDescription: `
Files is an API for manipulating ipfs objects as if they were a unix filesystem.
Note:
Most of the subcommands of 'ipfs files' accept the '--flush' flag. It defaults to
true. Use caution when setting this flag to false, It will improve performance
for large numbers of file operations, but it does so at the cost of consistency
guarantees. If the daemon is unexpectedly killed before running 'ipfs files flush'
on the files in question, then data may be lost. This also applies to running
'ipfs repo gc' concurrently with '--flush=false' operations.
`,
},
Options: []cmds.Option{
Expand All @@ -41,6 +49,7 @@ Files is an API for manipulating ipfs objects as if they were a unix filesystem.
"mkdir": FilesMkdirCmd,
"stat": FilesStatCmd,
"rm": FilesRmCmd,
"flush": FilesFlushCmd,
},
}

Expand Down Expand Up @@ -100,8 +109,7 @@ func statNode(ds dag.DAGService, fsn mfs.FSNode) (*Object, error) {
return nil, err
}

// add to dagserv to ensure its available
k, err := ds.Add(nd)
k, err := nd.Key()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -150,6 +158,11 @@ var FilesCpCmd = &cmds.Command{
return
}

flush, found, _ := req.Option("flush").Bool()
if !found {
flush = true
}

src, err := checkPath(req.Arguments()[0])
if err != nil {
res.SetError(err, cmds.ErrNormal)
Expand All @@ -172,6 +185,14 @@ var FilesCpCmd = &cmds.Command{
res.SetError(err, cmds.ErrNormal)
return
}

if flush {
err := mfs.FlushPath(node.FilesRoot, dst)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
}
},
}

Expand Down Expand Up @@ -257,17 +278,10 @@ Examples:
switch fsn := fsn.(type) {
case *mfs.Directory:
if !long {
mdnd, err := fsn.GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

var output []mfs.NodeListing
for _, lnk := range mdnd.Links {
for _, name := range fsn.ListNames() {
output = append(output, mfs.NodeListing{
Name: lnk.Name,
Hash: lnk.Hash.B58String(),
Name: name,
})
}
res.SetOutput(&FilesLsOutput{output})
Expand Down Expand Up @@ -354,6 +368,14 @@ Examples:
return
}

rfd, err := fi.Open(mfs.OpenReadOnly, false)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

defer rfd.Close()

offset, _, err := req.Option("offset").Int()
if err != nil {
res.SetError(err, cmds.ErrNormal)
Expand All @@ -364,7 +386,7 @@ Examples:
return
}

filen, err := fi.Size()
filen, err := rfd.Size()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
Expand All @@ -375,12 +397,13 @@ Examples:
return
}

_, err = fi.Seek(int64(offset), os.SEEK_SET)
_, err = rfd.Seek(int64(offset), os.SEEK_SET)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
var r io.Reader = fi

var r io.Reader = &contextReaderWrapper{R: rfd, ctx: req.Context()}
count, found, err := req.Option("count").Int()
if err != nil {
res.SetError(err, cmds.ErrNormal)
Expand All @@ -391,13 +414,26 @@ Examples:
res.SetError(fmt.Errorf("cannot specify negative 'count'"), cmds.ErrNormal)
return
}
r = io.LimitReader(fi, int64(count))
r = io.LimitReader(r, int64(count))
}

res.SetOutput(r)
},
}

type contextReader interface {
CtxReadFull(context.Context, []byte) (int, error)
}

type contextReaderWrapper struct {
R contextReader
ctx context.Context
}

func (crw *contextReaderWrapper) Read(b []byte) (int, error) {
return crw.R.CtxReadFull(crw.ctx, b)
}

var FilesMvCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Move files.",
Expand Down Expand Up @@ -486,8 +522,8 @@ Warning:

create, _, _ := req.Option("create").Bool()
trunc, _, _ := req.Option("truncate").Bool()
flush, set, _ := req.Option("flush").Bool()
if !set {
flush, fset, _ := req.Option("flush").Bool()
if !fset {
flush = true
}

Expand All @@ -513,14 +549,16 @@ Warning:
return
}

if flush {
defer fi.Close()
} else {
defer fi.Sync()
wfd, err := fi.Open(mfs.OpenWriteOnly, flush)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

defer wfd.Close()

if trunc {
if err := fi.Truncate(0); err != nil {
if err := wfd.Truncate(0); err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
Expand All @@ -536,7 +574,7 @@ Warning:
return
}

_, err = fi.Seek(int64(offset), os.SEEK_SET)
_, err = wfd.Seek(int64(offset), os.SEEK_SET)
if err != nil {
log.Error("seekfail: ", err)
res.SetError(err, cmds.ErrNormal)
Expand All @@ -554,7 +592,7 @@ Warning:
r = io.LimitReader(r, int64(count))
}

n, err := io.Copy(fi, input)
n, err := io.Copy(wfd, input)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
Expand Down Expand Up @@ -613,6 +651,37 @@ Examples:
},
}

var FilesFlushCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "flush a given path's data to disk",
ShortDescription: `
flush a given path to disk. This is only useful when other commands
are run with the '--flush=false'.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("path", false, false, "path to flush (default '/')"),
},
Run: func(req cmds.Request, res cmds.Response) {
nd, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

path := "/"
if len(req.Arguments()) > 0 {
path = req.Arguments()[0]
}

err = mfs.FlushPath(nd.FilesRoot, path)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
},
}

var FilesRmCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Remove a file.",
Expand Down
3 changes: 1 addition & 2 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ on disk.
return
}

unlock := n.Blockstore.PinLock()
defer unlock()
defer n.Blockstore.PinLock().Unlock()

// set recursive flag
recursive, found, err := req.Option("recursive").Bool()
Expand Down
22 changes: 11 additions & 11 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ipfs/go-ipfs/pin"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"

bs "github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/commands/files"
core "github.com/ipfs/go-ipfs/core"
dag "github.com/ipfs/go-ipfs/merkledag"
Expand Down Expand Up @@ -100,7 +101,7 @@ type Adder struct {
Chunker string
root *dag.Node
mr *mfs.Root
unlock func()
unlocker bs.Unlocker
tempRoot key.Key
}

Expand Down Expand Up @@ -225,8 +226,7 @@ func (adder *Adder) outputDirs(path string, nd *dag.Node) error {
// Add builds a merkledag from the a reader, pinning all objects to the local
// datastore. Returns a key representing the root node.
func Add(n *core.IpfsNode, r io.Reader) (string, error) {
unlock := n.Blockstore.PinLock()
defer unlock()
defer n.Blockstore.PinLock().Unlock()

fileAdder, err := NewAdder(n.Context(), n, nil)
if err != nil {
Expand All @@ -247,8 +247,7 @@ func Add(n *core.IpfsNode, r io.Reader) (string, error) {

// AddR recursively adds files in |path|.
func AddR(n *core.IpfsNode, root string) (key string, err error) {
unlock := n.Blockstore.PinLock()
defer unlock()
n.Blockstore.PinLock().Unlock()

stat, err := os.Lstat(root)
if err != nil {
Expand Down Expand Up @@ -296,8 +295,7 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.No
}
fileAdder.Wrap = true

unlock := n.Blockstore.PinLock()
defer unlock()
defer n.Blockstore.PinLock().Unlock()

err = fileAdder.addFile(file)
if err != nil {
Expand Down Expand Up @@ -347,8 +345,10 @@ func (adder *Adder) addNode(node *dag.Node, path string) error {

// Add the given file while respecting the adder.
func (adder *Adder) AddFile(file files.File) error {
adder.unlock = adder.node.Blockstore.PinLock()
defer adder.unlock()
adder.unlocker = adder.node.Blockstore.PinLock()
defer func() {
adder.unlocker.Unlock()
}()

return adder.addFile(file)
}
Expand Down Expand Up @@ -434,8 +434,8 @@ func (adder *Adder) maybePauseForGC() error {
return err
}

adder.unlock()
adder.unlock = adder.node.Blockstore.PinLock()
adder.unlocker.Unlock()
adder.unlocker = adder.node.Blockstore.PinLock()
}
return nil
}
Expand Down
Loading

0 comments on commit 37258a2

Please sign in to comment.