diff --git a/ipld/merkledag/merkledag.go b/ipld/merkledag/merkledag.go index 6420e69e6..568d44543 100644 --- a/ipld/merkledag/merkledag.go +++ b/ipld/merkledag/merkledag.go @@ -286,19 +286,29 @@ func GetLinksWithDAG(ng ipld.NodeGetter) GetLinks { // that 'fetchNodes' will start at a time const defaultConcurrentFetch = 32 -// WalkOptions represent the parameters of a graph walking algorithm -type WalkOptions struct { +// walkOptions represent the parameters of a graph walking algorithm +type walkOptions struct { WithRoot bool - IgnoreErrors bool Concurrency int + ErrorHandler func(c cid.Cid, err error) error } -// WalkOption is a setter for WalkOptions -type WalkOption func(*WalkOptions) +// WalkOption is a setter for walkOptions +type WalkOption func(*walkOptions) + +func (wo *walkOptions) addHandler(handler func(c cid.Cid, err error) error) { + if wo.ErrorHandler != nil { + wo.ErrorHandler = func(c cid.Cid, err error) error { + return handler(c, wo.ErrorHandler(c, err)) + } + } else { + wo.ErrorHandler = handler + } +} // WithRoot is a WalkOption indicating that the root node should be visited func WithRoot() WalkOption { - return func(walkOptions *WalkOptions) { + return func(walkOptions *walkOptions) { walkOptions.WithRoot = true } } @@ -308,7 +318,7 @@ func WithRoot() WalkOption { // NOTE: When using that option, the walk order is *not* guarantee. // NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. func Concurrent() WalkOption { - return func(walkOptions *WalkOptions) { + return func(walkOptions *walkOptions) { walkOptions.Concurrency = defaultConcurrentFetch } } @@ -318,7 +328,7 @@ func Concurrent() WalkOption { // NOTE: When using that option, the walk order is *not* guarantee. // NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. func Concurrency(worker int) WalkOption { - return func(walkOptions *WalkOptions) { + return func(walkOptions *walkOptions) { walkOptions.Concurrency = worker } } @@ -326,8 +336,44 @@ func Concurrency(worker int) WalkOption { // IgnoreErrors is a WalkOption indicating that the walk should attempt to // continue even when an error occur. func IgnoreErrors() WalkOption { - return func(walkOptions *WalkOptions) { - walkOptions.IgnoreErrors = true + return func(walkOptions *walkOptions) { + walkOptions.addHandler(func(c cid.Cid, err error) error { + return nil + }) + } +} + +// IgnoreMissing is a WalkOption indicating that the walk should continue when +// a node is missing. +func IgnoreMissing() WalkOption { + return func(walkOptions *walkOptions) { + walkOptions.addHandler(func(c cid.Cid, err error) error { + if err == ipld.ErrNotFound { + return nil + } + return err + }) + } +} + +// OnMissing is a WalkOption adding a callback that will be triggered on a missing +// node. +func OnMissing(callback func(c cid.Cid)) WalkOption { + return func(walkOptions *walkOptions) { + walkOptions.addHandler(func(c cid.Cid, err error) error { + if err == ipld.ErrNotFound { + callback(c) + } + return err + }) + } +} + +// OnError is a WalkOption adding a custom error handler. +// If this handler return a nil error, the walk will continue. +func OnError(handler func(c cid.Cid, err error) error) WalkOption { + return func(walkOptions *walkOptions) { + walkOptions.addHandler(handler) } } @@ -344,7 +390,7 @@ func Walk(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid) // depth to a given visit function. The visit function can be used to limit DAG // exploration. func WalkDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid, int) bool, options ...WalkOption) error { - opts := &WalkOptions{} + opts := &walkOptions{} for _, opt := range options { opt(opts) } @@ -356,7 +402,7 @@ func WalkDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid } } -func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, depth int, visit func(cid.Cid, int) bool, options *WalkOptions) error { +func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, depth int, visit func(cid.Cid, int) bool, options *walkOptions) error { if depth != 0 || options.WithRoot { if !visit(root, depth) { return nil @@ -364,7 +410,10 @@ func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, d } links, err := getLinks(ctx, root) - if err != nil && !options.IgnoreErrors { + if err != nil && options.ErrorHandler != nil { + err = options.ErrorHandler(root, err) + } + if err != nil { return err } @@ -402,7 +451,7 @@ func (p *ProgressTracker) Value() int { return p.Total } -func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, visit func(cid.Cid, int) bool, options *WalkOptions) error { +func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, visit func(cid.Cid, int) bool, options *walkOptions) error { type cidDepth struct { cid cid.Cid depth int @@ -445,7 +494,10 @@ func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, vis if shouldVisit { links, err := getLinks(ctx, ci) - if err != nil && !options.IgnoreErrors { + if err != nil && options.ErrorHandler != nil { + err = options.ErrorHandler(root, err) + } + if err != nil { select { case errChan <- err: case <-fetchersCtx.Done():