From 9da23c3181da29452034a8a56381ae0f05cc42e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Fri, 20 Jan 2023 12:29:00 +0100 Subject: [PATCH 1/2] change the Pinner interface to have async Direct, Recursive and Internal pin listing --- dspinner/pin.go | 92 ++++++++++++++++++++++---------------------- dspinner/pin_test.go | 20 ++++++---- pin.go | 15 ++++++-- 3 files changed, 70 insertions(+), 57 deletions(-) diff --git a/dspinner/pin.go b/dspinner/pin.go index efe36df..91cadbc 100644 --- a/dspinner/pin.go +++ b/dspinner/pin.go @@ -665,61 +665,63 @@ func (p *pinner) loadPin(ctx context.Context, pid string) (*pin, error) { } // DirectKeys returns a slice containing the directly pinned keys -func (p *pinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) { - p.lock.RLock() - defer p.lock.RUnlock() - - cidSet := cid.NewSet() - var e error - err := p.cidDIndex.ForEach(ctx, "", func(key, value string) bool { - var c cid.Cid - c, e = cid.Cast([]byte(key)) - if e != nil { - return false - } - cidSet.Add(c) - return true - }) - if err != nil { - return nil, err - } - if e != nil { - return nil, e - } - - return cidSet.Keys(), nil +func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedCid { + return p.streamIndex(ctx, p.cidDIndex) } // RecursiveKeys returns a slice containing the recursively pinned keys -func (p *pinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) { - p.lock.RLock() - defer p.lock.RUnlock() +func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedCid { + return p.streamIndex(ctx, p.cidRIndex) +} - cidSet := cid.NewSet() - var e error - err := p.cidRIndex.ForEach(ctx, "", func(key, value string) bool { - var c cid.Cid - c, e = cid.Cast([]byte(key)) - if e != nil { - return false +func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedCid { + out := make(chan ipfspinner.StreamedCid) + + go func() { + defer close(out) + + p.lock.RLock() + defer p.lock.RUnlock() + + cidSet := cid.NewSet() + + err := index.ForEach(ctx, "", func(key, value string) bool { + c, err := cid.Cast([]byte(key)) + if err != nil { + select { + case <-ctx.Done(): + case out <- ipfspinner.StreamedCid{Err: err}: + } + return false + } + if !cidSet.Has(c) { + select { + case <-ctx.Done(): + return false + case out <- ipfspinner.StreamedCid{Cid: c}: + } + cidSet.Add(c) + } + return true + }) + if err != nil { + select { + case <-ctx.Done(): + case out <- ipfspinner.StreamedCid{Err: err}: + } + return } - cidSet.Add(c) - return true - }) - if err != nil { - return nil, err - } - if e != nil { - return nil, e - } + }() - return cidSet.Keys(), nil + return out } // InternalPins returns all cids kept pinned for the internal state of the // pinner -func (p *pinner) InternalPins(ctx context.Context) ([]cid.Cid, error) { - return nil, nil +func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedCid { + out := make(chan ipfspinner.StreamedCid) + close(out) + return out } // Update updates a recursive pin from one cid to another. This is equivalent diff --git a/dspinner/pin_test.go b/dspinner/pin_test.go index 11c7ade..4bd896d 100644 --- a/dspinner/pin_test.go +++ b/dspinner/pin_test.go @@ -199,10 +199,17 @@ func TestPinnerBasic(t *testing.T) { dk := d.Cid() assertPinned(t, p, dk, "pinned node not found.") - cids, err := p.RecursiveKeys(ctx) - if err != nil { - t.Fatal(err) + allCids := func(c <-chan ipfspin.StreamedCid) (cids []cid.Cid) { + for streamedCid := range c { + if streamedCid.Err != nil { + t.Fatal(streamedCid.Err) + } + cids = append(cids, streamedCid.Cid) + } + return cids } + + cids := allCids(p.RecursiveKeys(ctx)) if len(cids) != 2 { t.Error("expected 2 recursive pins") } @@ -244,10 +251,7 @@ func TestPinnerBasic(t *testing.T) { } } - cids, err = p.DirectKeys(ctx) - if err != nil { - t.Fatal(err) - } + cids = allCids(p.DirectKeys(ctx)) if len(cids) != 1 { t.Error("expected 1 direct pin") } @@ -255,7 +259,7 @@ func TestPinnerBasic(t *testing.T) { t.Error("wrong direct pin") } - cids, _ = p.InternalPins(ctx) + cids = allCids(p.InternalPins(ctx)) if len(cids) != 0 { t.Error("shound not have internal keys") } diff --git a/pin.go b/pin.go index fcf7d76..16b910b 100644 --- a/pin.go +++ b/pin.go @@ -80,7 +80,7 @@ var ErrNotPinned = fmt.Errorf("not pinned or pinned indirectly") // A Pinner provides the necessary methods to keep track of Nodes which are // to be kept locally, according to a pin mode. In practice, a Pinner is in -// in charge of keeping the list of items from the local storage that should +// charge of keeping the list of items from the local storage that should // not be garbage-collected. type Pinner interface { // IsPinned returns whether or not the given cid is pinned @@ -119,14 +119,21 @@ type Pinner interface { Flush(ctx context.Context) error // DirectKeys returns all directly pinned cids - DirectKeys(ctx context.Context) ([]cid.Cid, error) + DirectKeys(ctx context.Context) <-chan StreamedCid // RecursiveKeys returns all recursively pinned cids - RecursiveKeys(ctx context.Context) ([]cid.Cid, error) + RecursiveKeys(ctx context.Context) <-chan StreamedCid // InternalPins returns all cids kept pinned for the internal state of the // pinner - InternalPins(ctx context.Context) ([]cid.Cid, error) + InternalPins(ctx context.Context) <-chan StreamedCid +} + +// StreamedCid is a cid.Cid that carries an error, to be sent through a channel. +type StreamedCid struct { + // if not nil, an error happened. Everything else should be ignored. + Err error + Cid cid.Cid } // Pinned represents CID which has been pinned with a pinning strategy. From e9bb7525f460bfea91e0a141c81f7b82092f3cdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Thu, 2 Mar 2023 20:00:08 +0100 Subject: [PATCH 2/2] Update streamed pin listing with Jorropo/channel --- dspinner/pin.go | 37 +++++++++++++++---------------------- dspinner/pin_test.go | 15 ++++++++++----- go.mod | 1 + go.sum | 2 ++ pin.go | 14 ++++---------- 5 files changed, 32 insertions(+), 37 deletions(-) diff --git a/dspinner/pin.go b/dspinner/pin.go index 91cadbc..df4ba0c 100644 --- a/dspinner/pin.go +++ b/dspinner/pin.go @@ -10,6 +10,7 @@ import ( "path" "sync" + "github.com/Jorropo/channel" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" @@ -665,20 +666,20 @@ func (p *pinner) loadPin(ctx context.Context, pid string) (*pin, error) { } // DirectKeys returns a slice containing the directly pinned keys -func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedCid { +func (p *pinner) DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid] { return p.streamIndex(ctx, p.cidDIndex) } // RecursiveKeys returns a slice containing the recursively pinned keys -func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedCid { +func (p *pinner) RecursiveKeys(ctx context.Context) channel.ReadOnly[cid.Cid] { return p.streamIndex(ctx, p.cidRIndex) } -func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedCid { - out := make(chan ipfspinner.StreamedCid) +func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) channel.ReadOnly[cid.Cid] { + out := channel.New[cid.Cid]() go func() { - defer close(out) + defer out.Close() p.lock.RLock() defer p.lock.RUnlock() @@ -688,40 +689,32 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan err := index.ForEach(ctx, "", func(key, value string) bool { c, err := cid.Cast([]byte(key)) if err != nil { - select { - case <-ctx.Done(): - case out <- ipfspinner.StreamedCid{Err: err}: - } + out.SetError(err) return false } if !cidSet.Has(c) { - select { - case <-ctx.Done(): + err = out.WriteContext(ctx, c) + if err != nil { return false - case out <- ipfspinner.StreamedCid{Cid: c}: } cidSet.Add(c) } return true }) if err != nil { - select { - case <-ctx.Done(): - case out <- ipfspinner.StreamedCid{Err: err}: - } - return + out.SetError(err) } }() - return out + return out.ReadOnly() } // InternalPins returns all cids kept pinned for the internal state of the // pinner -func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedCid { - out := make(chan ipfspinner.StreamedCid) - close(out) - return out +func (p *pinner) InternalPins(ctx context.Context) channel.ReadOnly[cid.Cid] { + c := channel.New[cid.Cid]() + c.Close() + return c.ReadOnly() } // Update updates a recursive pin from one cid to another. This is equivalent diff --git a/dspinner/pin_test.go b/dspinner/pin_test.go index 4bd896d..0fc4d1c 100644 --- a/dspinner/pin_test.go +++ b/dspinner/pin_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/Jorropo/channel" bs "github.com/ipfs/go-blockservice" mdag "github.com/ipfs/go-merkledag" @@ -199,12 +200,16 @@ func TestPinnerBasic(t *testing.T) { dk := d.Cid() assertPinned(t, p, dk, "pinned node not found.") - allCids := func(c <-chan ipfspin.StreamedCid) (cids []cid.Cid) { - for streamedCid := range c { - if streamedCid.Err != nil { - t.Fatal(streamedCid.Err) + allCids := func(ch channel.ReadOnly[cid.Cid]) (cids []cid.Cid) { + for { + c, err := ch.Read() + if err == io.EOF { + break } - cids = append(cids, streamedCid.Cid) + if err != nil { + t.Fatal(err) + } + cids = append(cids, c) } return cids } diff --git a/go.mod b/go.mod index 21dee2e..5aca8a2 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/ipfs/go-ipfs-pinner go 1.19 require ( + github.com/Jorropo/channel v0.0.0-20230303124104-2821e25e07ff github.com/ipfs/go-blockservice v0.2.1 github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-datastore v0.5.0 diff --git a/go.sum b/go.sum index 20a1819..cbb7dcc 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,8 @@ github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIo github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Jorropo/channel v0.0.0-20230303124104-2821e25e07ff h1:esIsTO+B4ChRmGN1F2NQXe3y1zV6+VtW5q2NfiKKptc= +github.com/Jorropo/channel v0.0.0-20230303124104-2821e25e07ff/go.mod h1:mI95Zfa5HM2woyGuaxl2tTnfZKKzPAyjwcbvmMk7hwI= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= diff --git a/pin.go b/pin.go index 16b910b..b129026 100644 --- a/pin.go +++ b/pin.go @@ -6,6 +6,7 @@ import ( "context" "fmt" + "github.com/Jorropo/channel" cid "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" ) @@ -119,21 +120,14 @@ type Pinner interface { Flush(ctx context.Context) error // DirectKeys returns all directly pinned cids - DirectKeys(ctx context.Context) <-chan StreamedCid + DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid] // RecursiveKeys returns all recursively pinned cids - RecursiveKeys(ctx context.Context) <-chan StreamedCid + RecursiveKeys(ctx context.Context) channel.ReadOnly[cid.Cid] // InternalPins returns all cids kept pinned for the internal state of the // pinner - InternalPins(ctx context.Context) <-chan StreamedCid -} - -// StreamedCid is a cid.Cid that carries an error, to be sent through a channel. -type StreamedCid struct { - // if not nil, an error happened. Everything else should be ignored. - Err error - Cid cid.Cid + InternalPins(ctx context.Context) channel.ReadOnly[cid.Cid] } // Pinned represents CID which has been pinned with a pinning strategy.