Skip to content

Commit

Permalink
providers: optimize GC
Browse files Browse the repository at this point in the history
1. Don't be n^2.
2. Don't bother walking the cache, just drop it.
3. Walk the datastore in a goroutine.
  • Loading branch information
Stebalien committed Apr 13, 2019
1 parent ff6374a commit 84b9fac
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 92 deletions.
127 changes: 39 additions & 88 deletions providers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
logging "github.com/ipfs/go-log"
goprocess "github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
periodicproc "github.com/jbenet/goprocess/periodic"
peer "github.com/libp2p/go-libp2p-peer"
base32 "github.com/whyrusleeping/base32"
)
Expand Down Expand Up @@ -68,7 +69,8 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching)

pm.proc = goprocessctx.WithContext(ctx)
pm.cleanupInterval = defaultCleanupInterval
pm.proc.Go(func(p goprocess.Process) { pm.run() })
pm.proc.Go(pm.run)
pm.proc.AddChild(periodicproc.Tick(pm.cleanupInterval, pm.gc))

return pm
}
Expand Down Expand Up @@ -188,69 +190,49 @@ func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time)
return dstore.Put(ds.NewKey(dsk), buf[:n])
}

func (pm *ProviderManager) deleteProvSet(k cid.Cid) error {
pm.providers.Remove(k)

func (pm *ProviderManager) gc(proc goprocess.Process) {
res, err := pm.dstore.Query(dsq.Query{
KeysOnly: true,
Prefix: mkProvKey(k),
Prefix: providersKeyPrefix,
})
if err != nil {
return err
}

entries, err := res.Rest()
if err != nil {
return err
log.Error("error garbage collecting provider records: ", err)
return
}
defer res.Close()

for _, e := range entries {
err := pm.dstore.Delete(ds.NewKey(e.Key))
if err != nil {
log.Error("deleting provider set: ", err)
now := time.Now()
for {
e, ok := res.NextSync()
if !ok {
return
}
}
return nil
}

func (pm *ProviderManager) getProvKeys() (func() (cid.Cid, bool), error) {
res, err := pm.dstore.Query(dsq.Query{
KeysOnly: true,
Prefix: providersKeyPrefix,
})
if err != nil {
return nil, err
}

iter := func() (cid.Cid, bool) {
for e := range res.Next() {
parts := strings.Split(e.Key, "/")
if len(parts) != 4 {
log.Warningf("incorrectly formatted provider entry in datastore: %s", e.Key)
continue
}
decoded, err := base32.RawStdEncoding.DecodeString(parts[2])
if err != nil {
log.Warning("error decoding base32 provider key: %s: %s", parts[2], err)
continue
}
if e.Error != nil {
log.Error("got an error: ", e.Error)
continue
}

c, err := cid.Cast(decoded)
// check expiration time
t, err := readTimeValue(e.Value)
switch {
case err != nil:
// couldn't parse the time
log.Warning("parsing providers record from disk: ", err)
fallthrough
case now.Sub(t) > ProvideValidity:
// or just expired
err = pm.dstore.Delete(ds.RawKey(e.Key))
if err != nil {
log.Warning("error casting key to cid from datastore key: %s", err)
continue
log.Warning("failed to remove provider record from disk: ", err)
}

return c, true
}
return cid.Cid{}, false
}

return iter, nil
}

func (pm *ProviderManager) run() {
func (pm *ProviderManager) run(proc goprocess.Process) {
tick := time.NewTicker(pm.cleanupInterval)
defer tick.Stop()
for {
select {
case np := <-pm.newprovs:
Expand All @@ -262,49 +244,18 @@ func (pm *ProviderManager) run() {
provs, err := pm.providersForKey(gp.k)
if err != nil && err != ds.ErrNotFound {
log.Error("error reading providers: ", err)
continue
}

gp.resp <- provs
// set the cap so the user can't append to this.
gp.resp <- provs[0:len(provs):len(provs)]
case <-tick.C:
keys, err := pm.getProvKeys()
if err != nil {
log.Error("Error loading provider keys: ", err)
continue
}
now := time.Now()
for {
k, ok := keys()
if !ok {
break
}

provs, err := pm.getProvSet(k)
if err != nil {
log.Error("error loading known provset: ", err)
continue
}
for p, t := range provs.set {
if now.Sub(t) > ProvideValidity {
delete(provs.set, p)
}
}
// have we run out of providers?
if len(provs.set) == 0 {
provs.providers = nil
err := pm.deleteProvSet(k)
if err != nil {
log.Error("error deleting provider set: ", err)
}
} else if len(provs.set) < len(provs.providers) {
// We must have modified the providers set, recompute.
provs.providers = make([]peer.ID, 0, len(provs.set))
for p := range provs.set {
provs.providers = append(provs.providers, p)
}
}
}
case <-pm.proc.Closing():
tick.Stop()
// You know the wonderful thing about caches? You can
// drop them.
//
// Much faster than GCing.
pm.providers.Purge()
case <-proc.Closing():
return
}
}
Expand Down
11 changes: 7 additions & 4 deletions providers/providers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
u "github.com/ipfs/go-ipfs-util"
peer "github.com/libp2p/go-libp2p-peer"
//
Expand Down Expand Up @@ -150,13 +151,15 @@ func TestProvidesExpire(t *testing.T) {
t.Fatal("providers map not cleaned up")
}

proviter, err := p.getProvKeys()
res, err := p.dstore.Query(dsq.Query{Prefix: providersKeyPrefix})
if err != nil {
t.Fatal(err)
}

_, ok := proviter()
if ok {
rest, err := res.Rest()
if err != nil {
t.Fatal(err)
}
if len(rest) > 0 {
t.Fatal("expected everything to be cleaned out of the datastore")
}
}
Expand Down

0 comments on commit 84b9fac

Please sign in to comment.