Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More Robust GC #3712

Merged
merged 10 commits into from
Mar 22, 2017
3 changes: 3 additions & 0 deletions commands/channelmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func (cr *ChannelMarshaler) Read(p []byte) (int, error) {
if err != nil {
return 0, err
}
if r == nil {
return 0, nil
}
cr.reader = r
}

Expand Down
54 changes: 40 additions & 14 deletions core/commands/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
lockfile "github.com/ipfs/go-ipfs/repo/fsrepo/lock"

cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
u "gx/ipfs/QmZuY8aV7zbNXVy6DyN9SmnuH3o9nG852F4aTiSBpts8d1/go-ipfs-util"
)

Expand All @@ -39,6 +40,11 @@ var RepoCmd = &cmds.Command{
},
}

type GcResult struct {
Copy link
Contributor

@hoenirvili hoenirvili Mar 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every type exported and every attribute that's also exported should have a well described comment above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can actually remove this whole type in favor of the one in the gc package (just change the field names so the json marshaling matches up correctly)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is we can't pass the error type through the API. I get:

02:40:33.946 ERROR commands/h: json: cannot unmarshal object into Go value of type error client.go:247
Error: json: cannot unmarshal object into Go value of type error

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair, lets figure that out later then

Key *cid.Cid
Error string `json:",omitempty"`
}

var repoGcCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Perform a garbage collection sweep on the repo.",
Expand All @@ -50,6 +56,7 @@ order to reclaim hard disk space.
},
Options: []cmds.Option{
cmds.BoolOption("quiet", "q", "Write minimal output.").Default(false),
cmds.BoolOption("stream-errors", "Stream errors.").Default(false),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I also like this. It is a good option.

},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
Expand All @@ -58,23 +65,39 @@ order to reclaim hard disk space.
return
}

gcOutChan, err := corerepo.GarbageCollectAsync(n, req.Context())
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
streamErrors, _, _ := res.Request().Option("stream-errors").Bool()

gcOutChan := corerepo.GarbageCollectAsync(n, req.Context())

outChan := make(chan interface{})
outChan := make(chan interface{}, cap(gcOutChan))
res.SetOutput((<-chan interface{})(outChan))

go func() {
defer close(outChan)
for k := range gcOutChan {
outChan <- k
if streamErrors {
errs := false
for res := range gcOutChan {
if res.Error != nil {
outChan <- &GcResult{Error: res.Error.Error()}
errs = true
} else {
outChan <- &GcResult{Key: res.KeyRemoved}
}
}
if errs {
res.SetError(fmt.Errorf("encountered errors during gc run"), cmds.ErrNormal)
}
} else {
err := corerepo.CollectResult(req.Context(), gcOutChan, func(k *cid.Cid) {
outChan <- &GcResult{Key: k}
})
if err != nil {
res.SetError(err, cmds.ErrNormal)
}
}
}()
},
Type: corerepo.KeyRemoved{},
Type: GcResult{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(<-chan interface{})
Expand All @@ -88,18 +111,21 @@ order to reclaim hard disk space.
}

marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*corerepo.KeyRemoved)
obj, ok := v.(*GcResult)
if !ok {
return nil, u.ErrCast()
}

buf := new(bytes.Buffer)
if obj.Error != "" {
fmt.Fprintf(res.Stderr(), "Error: %s\n", obj.Error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use fmt.Fprintf and not use a standard ipfs logger?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent was for the output to go to the client's stderr. The logger won't do thus as far as I know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this part of code runs on Client CLI (IIRC) not on the daemon so we are handling logging/errors manually here.

return nil, nil
}

if quiet {
buf = bytes.NewBufferString(obj.Key.String() + "\n")
return bytes.NewBufferString(obj.Key.String() + "\n"), nil
} else {
buf = bytes.NewBufferString(fmt.Sprintf("removed %s\n", obj.Key))
return bytes.NewBufferString(fmt.Sprintf("removed %s\n", obj.Key)), nil
}
return buf, nil
}

return &cmds.ChannelMarshaler{
Expand Down
79 changes: 50 additions & 29 deletions core/corerepo/gc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package corerepo

import (
"bytes"
"context"
"errors"
"time"
Expand All @@ -19,10 +20,6 @@ var log = logging.Logger("corerepo")

var ErrMaxStorageExceeded = errors.New("Maximum storage limit exceeded. Maybe unpin some files?")

type KeyRemoved struct {
Key *cid.Cid
}

type GC struct {
Node *core.IpfsNode
Repo repo.Repo
Expand Down Expand Up @@ -89,46 +86,70 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
if err != nil {
return err
}
rmed, err := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
if err != nil {
return err
}
rmed := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)

return CollectResult(ctx, rmed, nil)
}

func CollectResult(ctx context.Context, gcOut <-chan gc.Result, cb func(*cid.Cid)) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add here also a doc string above the function name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// CollectResult collects the output of a garbage collection run and calls the given callback for each object removed. It also collects all errors into a MultiError which is returned after the gc is completed

var errors []error
loop:
for {
select {
case _, ok := <-rmed:
case res, ok := <-gcOut:
if !ok {
return nil
break loop
}
if res.Error != nil {
errors = append(errors, res.Error)
} else if res.KeyRemoved != nil && cb != nil {
cb(res.KeyRemoved)
}
case <-ctx.Done():
return ctx.Err()
errors = append(errors, ctx.Err())
break loop
}
}

switch len(errors) {
case 0:
return nil
case 1:
return errors[0]
default:
return NewMultiError(errors...)
}
}

func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemoved, error) {
roots, err := BestEffortRoots(n.FilesRoot)
if err != nil {
return nil, err
func NewMultiError(errs ...error) *MultiError {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc string.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// NewMultiError creates a new MultiError object from a given slice of errors

return &MultiError{errs[:len(errs)-1], errs[len(errs)-1]}
}

type MultiError struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also you could tell that the type implements the Error interface.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioning that it implements the error interface is pretty redundant and not standard go convention

Errors []error
Summary error
}

func (e *MultiError) Error() string {
var buf bytes.Buffer
for _, err := range e.Errors {
buf.WriteString(err.Error())
buf.WriteString("; ")
}
rmed, err := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
buf.WriteString(e.Summary.Error())
return buf.String()
}

func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) <-chan gc.Result {
roots, err := BestEffortRoots(n.FilesRoot)
if err != nil {
return nil, err
out := make(chan gc.Result)
out <- gc.Result{Error: err}
close(out)
return out
}

out := make(chan *KeyRemoved)
go func() {
defer close(out)
for k := range rmed {
select {
case out <- &KeyRemoved{k}:
case <-ctx.Done():
return
}
}
}()
return out, nil
return gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
}

func PeriodicGC(ctx context.Context, node *core.IpfsNode) error {
Expand Down
25 changes: 7 additions & 18 deletions core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func TestAddGCLive(t *testing.T) {
t.Fatal(err)
}

errs := make(chan error)
out := make(chan interface{})
adder, err := NewAdder(context.Background(), node.Pinning, node.Blockstore, node.DAG)
if err != nil {
Expand Down Expand Up @@ -99,18 +98,11 @@ func TestAddGCLive(t *testing.T) {
t.Fatal("add shouldnt complete yet")
}

var gcout <-chan *cid.Cid
var gcout <-chan gc.Result
gcstarted := make(chan struct{})
go func() {
defer close(gcstarted)
gcchan, err := gc.GC(context.Background(), node.Blockstore, node.DAG, node.Pinning, nil)
if err != nil {
log.Error("GC ERROR:", err)
errs <- err
return
}

gcout = gcchan
gcout = gc.GC(context.Background(), node.Blockstore, node.DAG, node.Pinning, nil)
}()

// gc shouldnt start until we let the add finish its current file.
Expand All @@ -119,8 +111,6 @@ func TestAddGCLive(t *testing.T) {
select {
case <-gcstarted:
t.Fatal("gc shouldnt have started yet")
case err := <-errs:
t.Fatal(err)
default:
}

Expand All @@ -133,18 +123,17 @@ func TestAddGCLive(t *testing.T) {
select {
case o := <-out:
addedHashes[o.(*AddedObject).Hash] = struct{}{}
case err := <-errs:
t.Fatal(err)
}

select {
case <-gcstarted:
case err := <-errs:
t.Fatal(err)
}

for k := range gcout {
if _, ok := addedHashes[k.String()]; ok {
for r := range gcout {
if r.Error != nil {
t.Fatal(err)
}
if _, ok := addedHashes[r.KeyRemoved.String()]; ok {
t.Fatal("gc'ed a hash we just added")
}
}
Expand Down
Loading