Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

change the Pinner interface to have async pin listing #24

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 40 additions & 45 deletions dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"
"sync"

"github.com/Jorropo/channel"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
Expand Down Expand Up @@ -665,61 +666,55 @@ func (p *pinner) loadPin(ctx context.Context, pid string) (*pin, error) {
}

// DirectKeys returns a slice containing the directly pinned keys
func (p *pinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()

cidSet := cid.NewSet()
var e error
err := p.cidDIndex.ForEach(ctx, "", func(key, value string) bool {
var c cid.Cid
c, e = cid.Cast([]byte(key))
if e != nil {
return false
}
cidSet.Add(c)
return true
})
if err != nil {
return nil, err
}
if e != nil {
return nil, e
}

return cidSet.Keys(), nil
func (p *pinner) DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid] {
return p.streamIndex(ctx, p.cidDIndex)
}

// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()
func (p *pinner) RecursiveKeys(ctx context.Context) channel.ReadOnly[cid.Cid] {
return p.streamIndex(ctx, p.cidRIndex)
}

cidSet := cid.NewSet()
var e error
err := p.cidRIndex.ForEach(ctx, "", func(key, value string) bool {
var c cid.Cid
c, e = cid.Cast([]byte(key))
if e != nil {
return false
func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) channel.ReadOnly[cid.Cid] {
out := channel.New[cid.Cid]()

go func() {
defer out.Close()

p.lock.RLock()
defer p.lock.RUnlock()

cidSet := cid.NewSet()

err := index.ForEach(ctx, "", func(key, value string) bool {
c, err := cid.Cast([]byte(key))
if err != nil {
out.SetError(err)
return false
}
if !cidSet.Has(c) {
err = out.WriteContext(ctx, c)
if err != nil {
return false
}
cidSet.Add(c)
}
return true
})
if err != nil {
out.SetError(err)
}
cidSet.Add(c)
return true
})
if err != nil {
return nil, err
}
if e != nil {
return nil, e
}
}()

return cidSet.Keys(), nil
return out.ReadOnly()
}

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
func (p *pinner) InternalPins(ctx context.Context) ([]cid.Cid, error) {
return nil, nil
func (p *pinner) InternalPins(ctx context.Context) channel.ReadOnly[cid.Cid] {
c := channel.New[cid.Cid]()
c.Close()
return c.ReadOnly()
}

// Update updates a recursive pin from one cid to another. This is equivalent
Expand Down
25 changes: 17 additions & 8 deletions dspinner/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/Jorropo/channel"
bs "github.com/ipfs/go-blockservice"
mdag "github.com/ipfs/go-merkledag"

Expand Down Expand Up @@ -199,10 +200,21 @@ func TestPinnerBasic(t *testing.T) {
dk := d.Cid()
assertPinned(t, p, dk, "pinned node not found.")

cids, err := p.RecursiveKeys(ctx)
if err != nil {
t.Fatal(err)
allCids := func(ch channel.ReadOnly[cid.Cid]) (cids []cid.Cid) {
for {
c, err := ch.Read()
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
cids = append(cids, c)
}
return cids
}

cids := allCids(p.RecursiveKeys(ctx))
if len(cids) != 2 {
t.Error("expected 2 recursive pins")
}
Expand Down Expand Up @@ -244,18 +256,15 @@ func TestPinnerBasic(t *testing.T) {
}
}

cids, err = p.DirectKeys(ctx)
if err != nil {
t.Fatal(err)
}
cids = allCids(p.DirectKeys(ctx))
if len(cids) != 1 {
t.Error("expected 1 direct pin")
}
if cids[0] != ak {
t.Error("wrong direct pin")
}

cids, _ = p.InternalPins(ctx)
cids = allCids(p.InternalPins(ctx))
if len(cids) != 0 {
t.Error("shound not have internal keys")
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/ipfs/go-ipfs-pinner
go 1.19

require (
github.com/Jorropo/channel v0.0.0-20230303124104-2821e25e07ff

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not add this dependency?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to return a union of "val or err" from a chan, then it's perfectly idiomatic to return a struct { Pin coreiface.Pin, Err error}, I'd prefer not to add another module to our rat's nest of dependencies for something that already has an established idiom.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has an established idiom

I don't belive that true, most code in the std is synchronous or uses methods of interface who secretly manage synchronisation.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a dependency for this. Please don't do it

github.com/ipfs/go-blockservice v0.2.1
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIo
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Jorropo/channel v0.0.0-20230303124104-2821e25e07ff h1:esIsTO+B4ChRmGN1F2NQXe3y1zV6+VtW5q2NfiKKptc=
github.com/Jorropo/channel v0.0.0-20230303124104-2821e25e07ff/go.mod h1:mI95Zfa5HM2woyGuaxl2tTnfZKKzPAyjwcbvmMk7hwI=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
Expand Down
9 changes: 5 additions & 4 deletions pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"

"github.com/Jorropo/channel"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
)
Expand Down Expand Up @@ -80,7 +81,7 @@ var ErrNotPinned = fmt.Errorf("not pinned or pinned indirectly")

// A Pinner provides the necessary methods to keep track of Nodes which are
// to be kept locally, according to a pin mode. In practice, a Pinner is in
// in charge of keeping the list of items from the local storage that should
// charge of keeping the list of items from the local storage that should
// not be garbage-collected.
type Pinner interface {
// IsPinned returns whether or not the given cid is pinned
Expand Down Expand Up @@ -119,14 +120,14 @@ type Pinner interface {
Flush(ctx context.Context) error

// DirectKeys returns all directly pinned cids
DirectKeys(ctx context.Context) ([]cid.Cid, error)
DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid]
type StreamedCid struct {
C cid.Cid
Err error
}
DirectKeys(ctx context.Context) chan StreamedCid


// RecursiveKeys returns all recursively pinned cids
RecursiveKeys(ctx context.Context) ([]cid.Cid, error)
RecursiveKeys(ctx context.Context) channel.ReadOnly[cid.Cid]

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
InternalPins(ctx context.Context) ([]cid.Cid, error)
InternalPins(ctx context.Context) channel.ReadOnly[cid.Cid]
}

// Pinned represents CID which has been pinned with a pinning strategy.
Expand Down