Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
Update streamed pin listing with Jorropo/channel
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelMure committed Mar 2, 2023
1 parent 9da23c3 commit 02fd0c3
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 30 deletions.
37 changes: 15 additions & 22 deletions dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"
"sync"

"github.com/Jorropo/channel"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
Expand Down Expand Up @@ -665,20 +666,20 @@ func (p *pinner) loadPin(ctx context.Context, pid string) (*pin, error) {
}

// DirectKeys returns a slice containing the directly pinned keys
func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedCid {
func (p *pinner) DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid] {
return p.streamIndex(ctx, p.cidDIndex)
}

// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedCid {
func (p *pinner) RecursiveKeys(ctx context.Context) channel.ReadOnly[cid.Cid] {
return p.streamIndex(ctx, p.cidRIndex)
}

func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedCid {
out := make(chan ipfspinner.StreamedCid)
func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) channel.ReadOnly[cid.Cid] {
out := channel.New[cid.Cid]()

go func() {
defer close(out)
defer out.Close()

p.lock.RLock()
defer p.lock.RUnlock()
Expand All @@ -688,40 +689,32 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan
err := index.ForEach(ctx, "", func(key, value string) bool {
c, err := cid.Cast([]byte(key))
if err != nil {
select {
case <-ctx.Done():
case out <- ipfspinner.StreamedCid{Err: err}:
}
out.SetError(err)
return false
}
if !cidSet.Has(c) {
select {
case <-ctx.Done():
err = out.WriteContext(ctx, c)
if err != nil {
return false
case out <- ipfspinner.StreamedCid{Cid: c}:
}
cidSet.Add(c)
}
return true
})
if err != nil {
select {
case <-ctx.Done():
case out <- ipfspinner.StreamedCid{Err: err}:
}
return
out.SetError(err)
}
}()

return out
return out.ReadOnly()
}

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedCid {
out := make(chan ipfspinner.StreamedCid)
close(out)
return out
func (p *pinner) InternalPins(ctx context.Context) channel.ReadOnly[cid.Cid] {
c := channel.New[cid.Cid]()
c.Close()
return c.ReadOnly()
}

// Update updates a recursive pin from one cid to another. This is equivalent
Expand Down
15 changes: 10 additions & 5 deletions dspinner/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/Jorropo/channel"
bs "github.com/ipfs/go-blockservice"
mdag "github.com/ipfs/go-merkledag"

Expand Down Expand Up @@ -199,12 +200,16 @@ func TestPinnerBasic(t *testing.T) {
dk := d.Cid()
assertPinned(t, p, dk, "pinned node not found.")

allCids := func(c <-chan ipfspin.StreamedCid) (cids []cid.Cid) {
for streamedCid := range c {
if streamedCid.Err != nil {
t.Fatal(streamedCid.Err)
allCids := func(ch channel.ReadOnly[cid.Cid]) (cids []cid.Cid) {
for {
c, err := ch.Read()
if err == io.EOF {
break
}
cids = append(cids, streamedCid.Cid)
if err != nil {
t.Fatal(err)
}
cids = append(cids, c)
}
return cids
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/ipfs/go-ipfs-pinner
go 1.19

require (
github.com/Jorropo/channel v0.0.0-20230302184439-7ec509945d60
github.com/ipfs/go-blockservice v0.2.1
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIo
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Jorropo/channel v0.0.0-20230302184439-7ec509945d60 h1:/PGSDln05TokWwDCltOgcf/uYfuVPK8JV2hcQ3Nz2zo=
github.com/Jorropo/channel v0.0.0-20230302184439-7ec509945d60/go.mod h1:mI95Zfa5HM2woyGuaxl2tTnfZKKzPAyjwcbvmMk7hwI=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
Expand Down
7 changes: 4 additions & 3 deletions pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"

"github.com/Jorropo/channel"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
)
Expand Down Expand Up @@ -119,14 +120,14 @@ type Pinner interface {
Flush(ctx context.Context) error

// DirectKeys returns all directly pinned cids
DirectKeys(ctx context.Context) <-chan StreamedCid
DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid]

// RecursiveKeys returns all recursively pinned cids
RecursiveKeys(ctx context.Context) <-chan StreamedCid
RecursiveKeys(ctx context.Context) channel.ReadOnly[cid.Cid]

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
InternalPins(ctx context.Context) <-chan StreamedCid
InternalPins(ctx context.Context) channel.ReadOnly[cid.Cid]
}

// StreamedCid is a cid.Cid that carries an error, to be sent through a channel.
Expand Down

0 comments on commit 02fd0c3

Please sign in to comment.