Skip to content
This repository has been archived by the owner on Jun 20, 2023. It is now read-only.

Commit

Permalink
Update to pinner having async pin listing
Browse files Browse the repository at this point in the history
Original change: ipfs/go-ipfs-pinner#24
  • Loading branch information
MichaelMure committed Mar 3, 2023
1 parent 408f15c commit cf378be
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 18 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.18
retract [v1.0.0, v1.0.1]

require (
github.com/Jorropo/channel v0.0.0-20230302184439-7ec509945d60
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-blockservice v0.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Jorropo/channel v0.0.0-20230302184439-7ec509945d60 h1:/PGSDln05TokWwDCltOgcf/uYfuVPK8JV2hcQ3Nz2zo=
github.com/Jorropo/channel v0.0.0-20230302184439-7ec509945d60/go.mod h1:mI95Zfa5HM2woyGuaxl2tTnfZKKzPAyjwcbvmMk7hwI=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
Expand Down
38 changes: 24 additions & 14 deletions simple/reprovide.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"io"
"time"

"github.com/Jorropo/channel"
"github.com/cenkalti/backoff"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil"
Expand Down Expand Up @@ -180,8 +182,8 @@ func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
// Pinner interface defines how the simple.Reprovider wants to interact
// with a Pinning service
type Pinner interface {
DirectKeys(ctx context.Context) ([]cid.Cid, error)
RecursiveKeys(ctx context.Context) ([]cid.Cid, error)
DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid]
RecursiveKeys(ctx context.Context) channel.ReadOnly[cid.Cid]
}

// NewPinnedProvider returns provider supplying pinned keys
Expand Down Expand Up @@ -217,23 +219,31 @@ func pinSet(ctx context.Context, pinning Pinner, fetchConfig fetcher.Factory, on
defer cancel()
defer close(set.New)

dkeys, err := pinning.DirectKeys(ctx)
if err != nil {
logR.Errorf("reprovide direct pins: %s", err)
return
}
for _, key := range dkeys {
dkeys := pinning.DirectKeys(ctx)
for {
key, err := dkeys.ReadContext(ctx)
if err == io.EOF {
break
}
if err != nil {
logR.Errorf("reprovide direct pins: %s", err)
return
}
set.Visitor(ctx)(key)
}

rkeys, err := pinning.RecursiveKeys(ctx)
if err != nil {
logR.Errorf("reprovide indirect pins: %s", err)
return
}
rkeys := pinning.RecursiveKeys(ctx)

session := fetchConfig.NewSession(ctx)
for _, key := range rkeys {
for {
key, err := rkeys.ReadContext(ctx)
if err == io.EOF {
break
}
if err != nil {
logR.Errorf("reprovide indirect pins: %s", err)
return
}
set.Visitor(ctx)(key)
if !onlyRoots {
err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error {
Expand Down
29 changes: 25 additions & 4 deletions simple/reprovide_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/Jorropo/channel"
blocks "github.com/ipfs/go-block-format"
bsrv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -220,12 +221,32 @@ type mockPinner struct {
direct []cid.Cid
}

func (mp *mockPinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) {
return mp.direct, nil
func (mp *mockPinner) DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid] {
c := channel.New[cid.Cid]()
go func() {
defer c.Close()
for _, p := range mp.direct {
err := c.WriteContext(ctx, p)
if err != nil {
return
}
}
}()
return c.ReadOnly()
}

func (mp *mockPinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) {
return mp.recursive, nil
func (mp *mockPinner) RecursiveKeys(ctx context.Context) channel.ReadOnly[cid.Cid] {
c := channel.New[cid.Cid]()
go func() {
defer c.Close()
for _, p := range mp.recursive {
err := c.WriteContext(ctx, p)
if err != nil {
return
}
}
}()
return c.ReadOnly()
}

func TestReprovidePinned(t *testing.T) {
Expand Down

0 comments on commit cf378be

Please sign in to comment.