Skip to content

Commit

Permalink
Update to pinner having async pin listing
Browse files Browse the repository at this point in the history
Original change: ipfs/go-ipfs-pinner#24
  • Loading branch information
MichaelMure committed Mar 3, 2023
1 parent 408f15c commit 093cba1
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 23 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.18
retract [v1.0.0, v1.0.1]

require (
github.com/Jorropo/channel v0.0.0-20230303124104-2821e25e07ff
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-blockservice v0.4.0
Expand All @@ -24,6 +25,8 @@ require (
github.com/multiformats/go-multihash v0.2.1
)

replace github.com/Jorropo/channel => github.com/MichaelMure/channel v0.0.0-20230303132646-a77d888b67d4

require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOv
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/MichaelMure/channel v0.0.0-20230303132646-a77d888b67d4 h1:sdBlAQ0I35hcQ8eqA5O48wNoJ7e99z4DSTrc5PDrSRI=
github.com/MichaelMure/channel v0.0.0-20230303132646-a77d888b67d4/go.mod h1:mI95Zfa5HM2woyGuaxl2tTnfZKKzPAyjwcbvmMk7hwI=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
Expand Down
43 changes: 24 additions & 19 deletions simple/reprovide.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"io"
"time"

"github.com/Jorropo/channel"
"github.com/cenkalti/backoff"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil"
Expand Down Expand Up @@ -180,8 +182,8 @@ func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
// Pinner interface defines how the simple.Reprovider wants to interact
// with a Pinning service
type Pinner interface {
DirectKeys(ctx context.Context) ([]cid.Cid, error)
RecursiveKeys(ctx context.Context) ([]cid.Cid, error)
DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid]
RecursiveKeys(ctx context.Context) channel.ReadOnly[cid.Cid]
}

// NewPinnedProvider returns provider supplying pinned keys
Expand Down Expand Up @@ -217,37 +219,40 @@ func pinSet(ctx context.Context, pinning Pinner, fetchConfig fetcher.Factory, on
defer cancel()
defer close(set.New)

dkeys, err := pinning.DirectKeys(ctx)
if err != nil {
logR.Errorf("reprovide direct pins: %s", err)
return
}
for _, key := range dkeys {
dkeys := pinning.DirectKeys(ctx)
for {
key, err := dkeys.ReadContext(ctx)
if err == io.EOF {
break
}
if err != nil {
logR.Errorf("reprovide direct pins: %s", err)
return
}
set.Visitor(ctx)(key)
}

rkeys, err := pinning.RecursiveKeys(ctx)
if err != nil {
logR.Errorf("reprovide indirect pins: %s", err)
return
}

session := fetchConfig.NewSession(ctx)
for _, key := range rkeys {
set.Visitor(ctx)(key)

err := pinning.RecursiveKeys(ctx).RangeContext(ctx, func(c cid.Cid) error {
set.Visitor(ctx)(c)
if !onlyRoots {
err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error {
err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: c}, func(res fetcher.FetchResult) error {
clink, ok := res.LastBlockLink.(cidlink.Link)
if ok {
set.Visitor(ctx)(clink.Cid)
}
return nil
})
if err != nil {
logR.Errorf("reprovide indirect pins: %s", err)
return
return err
}
}
return nil
})
if err != nil {
logR.Errorf("reprovide indirect pins: %s", err)
return
}
}()

Expand Down
29 changes: 25 additions & 4 deletions simple/reprovide_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/Jorropo/channel"
blocks "github.com/ipfs/go-block-format"
bsrv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -220,12 +221,32 @@ type mockPinner struct {
direct []cid.Cid
}

func (mp *mockPinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) {
return mp.direct, nil
func (mp *mockPinner) DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid] {
c := channel.New[cid.Cid]()
go func() {
defer c.Close()
for _, p := range mp.direct {
err := c.WriteContext(ctx, p)
if err != nil {
return
}
}
}()
return c.ReadOnly()
}

func (mp *mockPinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) {
return mp.recursive, nil
func (mp *mockPinner) RecursiveKeys(ctx context.Context) channel.ReadOnly[cid.Cid] {
c := channel.New[cid.Cid]()
go func() {
defer c.Close()
for _, p := range mp.recursive {
err := c.WriteContext(ctx, p)
if err != nil {
return
}
}
}()
return c.ReadOnly()
}

func TestReprovidePinned(t *testing.T) {
Expand Down

0 comments on commit 093cba1

Please sign in to comment.