Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More Robust GC #3712

Merged
merged 10 commits into from
Mar 22, 2017
3 changes: 3 additions & 0 deletions commands/channelmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func (cr *ChannelMarshaler) Read(p []byte) (int, error) {
if err != nil {
return 0, err
}
if r == nil {
return 0, nil
}
cr.reader = r
}

Expand Down
55 changes: 41 additions & 14 deletions core/commands/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
lockfile "github.com/ipfs/go-ipfs/repo/fsrepo/lock"

cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
u "gx/ipfs/QmZuY8aV7zbNXVy6DyN9SmnuH3o9nG852F4aTiSBpts8d1/go-ipfs-util"
)

Expand All @@ -39,6 +40,12 @@ var RepoCmd = &cmds.Command{
},
}

// GcResult is the result returned by "repo gc" command.
type GcResult struct {
Key *cid.Cid
Error string `json:",omitempty"`
}

var repoGcCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Perform a garbage collection sweep on the repo.",
Expand All @@ -50,6 +57,7 @@ order to reclaim hard disk space.
},
Options: []cmds.Option{
cmds.BoolOption("quiet", "q", "Write minimal output.").Default(false),
cmds.BoolOption("stream-errors", "Stream errors.").Default(false),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I also like this. It is a good option.

},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
Expand All @@ -58,23 +66,39 @@ order to reclaim hard disk space.
return
}

gcOutChan, err := corerepo.GarbageCollectAsync(n, req.Context())
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
streamErrors, _, _ := res.Request().Option("stream-errors").Bool()

gcOutChan := corerepo.GarbageCollectAsync(n, req.Context())

outChan := make(chan interface{})
outChan := make(chan interface{}, cap(gcOutChan))
res.SetOutput((<-chan interface{})(outChan))

go func() {
defer close(outChan)
for k := range gcOutChan {
outChan <- k
if streamErrors {
errs := false
for res := range gcOutChan {
if res.Error != nil {
outChan <- &GcResult{Error: res.Error.Error()}
errs = true
} else {
outChan <- &GcResult{Key: res.KeyRemoved}
}
}
if errs {
res.SetError(fmt.Errorf("encountered errors during gc run"), cmds.ErrNormal)
}
} else {
err := corerepo.CollectResult(req.Context(), gcOutChan, func(k *cid.Cid) {
outChan <- &GcResult{Key: k}
})
if err != nil {
res.SetError(err, cmds.ErrNormal)
}
}
}()
},
Type: corerepo.KeyRemoved{},
Type: GcResult{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(<-chan interface{})
Expand All @@ -88,18 +112,21 @@ order to reclaim hard disk space.
}

marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*corerepo.KeyRemoved)
obj, ok := v.(*GcResult)
if !ok {
return nil, u.ErrCast()
}

buf := new(bytes.Buffer)
if obj.Error != "" {
fmt.Fprintf(res.Stderr(), "Error: %s\n", obj.Error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use fmt.Fprintf and not use a standard ipfs logger?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent was for the output to go to the client's stderr. The logger won't do thus as far as I know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this part of code runs on Client CLI (IIRC) not on the daemon so we are handling logging/errors manually here.

return nil, nil
}

if quiet {
buf = bytes.NewBufferString(obj.Key.String() + "\n")
return bytes.NewBufferString(obj.Key.String() + "\n"), nil
} else {
buf = bytes.NewBufferString(fmt.Sprintf("removed %s\n", obj.Key))
return bytes.NewBufferString(fmt.Sprintf("removed %s\n", obj.Key)), nil
}
return buf, nil
}

return &cmds.ChannelMarshaler{
Expand Down
84 changes: 55 additions & 29 deletions core/corerepo/gc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package corerepo

import (
"bytes"
"context"
"errors"
"time"
Expand All @@ -19,10 +20,6 @@ var log = logging.Logger("corerepo")

var ErrMaxStorageExceeded = errors.New("Maximum storage limit exceeded. Maybe unpin some files?")

type KeyRemoved struct {
Key *cid.Cid
}

type GC struct {
Node *core.IpfsNode
Repo repo.Repo
Expand Down Expand Up @@ -89,46 +86,75 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
if err != nil {
return err
}
rmed, err := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
if err != nil {
return err
}
rmed := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)

return CollectResult(ctx, rmed, nil)
}

// CollectResult collects the output of a garbage collection run and calls the
// given callback for each object removed. It also collects all errors into a
// MultiError which is returned after the gc is completed.
func CollectResult(ctx context.Context, gcOut <-chan gc.Result, cb func(*cid.Cid)) error {
var errors []error
loop:
for {
select {
case _, ok := <-rmed:
case res, ok := <-gcOut:
if !ok {
return nil
break loop
}
if res.Error != nil {
errors = append(errors, res.Error)
} else if res.KeyRemoved != nil && cb != nil {
cb(res.KeyRemoved)
}
case <-ctx.Done():
return ctx.Err()
errors = append(errors, ctx.Err())
break loop
}
}

switch len(errors) {
case 0:
return nil
case 1:
return errors[0]
default:
return NewMultiError(errors...)
}
}

func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemoved, error) {
roots, err := BestEffortRoots(n.FilesRoot)
if err != nil {
return nil, err
// NewMultiError creates a new MultiError object from a given slice of errors.
func NewMultiError(errs ...error) *MultiError {
return &MultiError{errs[:len(errs)-1], errs[len(errs)-1]}
}

// MultiError contains the results of multiple errors.
type MultiError struct {
Errors []error
Summary error
}

func (e *MultiError) Error() string {
var buf bytes.Buffer
for _, err := range e.Errors {
buf.WriteString(err.Error())
buf.WriteString("; ")
}
rmed, err := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
buf.WriteString(e.Summary.Error())
return buf.String()
}

func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) <-chan gc.Result {
roots, err := BestEffortRoots(n.FilesRoot)
if err != nil {
return nil, err
out := make(chan gc.Result)
out <- gc.Result{Error: err}
close(out)
return out
}

out := make(chan *KeyRemoved)
go func() {
defer close(out)
for k := range rmed {
select {
case out <- &KeyRemoved{k}:
case <-ctx.Done():
return
}
}
}()
return out, nil
return gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
}

func PeriodicGC(ctx context.Context, node *core.IpfsNode) error {
Expand Down
25 changes: 7 additions & 18 deletions core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func TestAddGCLive(t *testing.T) {
t.Fatal(err)
}

errs := make(chan error)
out := make(chan interface{})
adder, err := NewAdder(context.Background(), node.Pinning, node.Blockstore, node.DAG)
if err != nil {
Expand Down Expand Up @@ -99,18 +98,11 @@ func TestAddGCLive(t *testing.T) {
t.Fatal("add shouldnt complete yet")
}

var gcout <-chan *cid.Cid
var gcout <-chan gc.Result
gcstarted := make(chan struct{})
go func() {
defer close(gcstarted)
gcchan, err := gc.GC(context.Background(), node.Blockstore, node.DAG, node.Pinning, nil)
if err != nil {
log.Error("GC ERROR:", err)
errs <- err
return
}

gcout = gcchan
gcout = gc.GC(context.Background(), node.Blockstore, node.DAG, node.Pinning, nil)
}()

// gc shouldnt start until we let the add finish its current file.
Expand All @@ -119,8 +111,6 @@ func TestAddGCLive(t *testing.T) {
select {
case <-gcstarted:
t.Fatal("gc shouldnt have started yet")
case err := <-errs:
t.Fatal(err)
default:
}

Expand All @@ -133,18 +123,17 @@ func TestAddGCLive(t *testing.T) {
select {
case o := <-out:
addedHashes[o.(*AddedObject).Hash] = struct{}{}
case err := <-errs:
t.Fatal(err)
}

select {
case <-gcstarted:
case err := <-errs:
t.Fatal(err)
}

for k := range gcout {
if _, ok := addedHashes[k.String()]; ok {
for r := range gcout {
if r.Error != nil {
t.Fatal(err)
}
if _, ok := addedHashes[r.KeyRemoved.String()]; ok {
t.Fatal("gc'ed a hash we just added")
}
}
Expand Down
Loading