Skip to content

Commit

Permalink
feat: plumb through datastore contexts (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert authored Oct 28, 2021
1 parent 6a7533e commit b7a856c
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 35 deletions.
8 changes: 4 additions & 4 deletions p2p/host/peerstore/pstoreds/addr_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (r *addrsRecord) flush(write ds.Write) (err error) {
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(r.Id.ID)))

if len(r.Addrs) == 0 {
if err = write.Delete(key); err == nil {
if err = write.Delete(context.TODO(), key); err == nil {
r.dirty = false
}
return err
Expand All @@ -60,7 +60,7 @@ func (r *addrsRecord) flush(write ds.Write) (err error) {
if err != nil {
return err
}
if err = write.Put(key, data); err != nil {
if err = write.Put(context.TODO(), key, data); err != nil {
return err
}
// write succeeded; record is no longer dirty.
Expand Down Expand Up @@ -223,7 +223,7 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs

pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(id)))
data, err := ab.ds.Get(key)
data, err := ab.ds.Get(context.TODO(), key)

switch err {
case ds.ErrNotFound:
Expand Down Expand Up @@ -446,7 +446,7 @@ func (ab *dsAddrBook) ClearAddrs(p peer.ID) {
ab.cache.Remove(p)

key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(p)))
if err := ab.ds.Delete(key); err != nil {
if err := ab.ds.Delete(context.TODO(), key); err != nil {
log.Errorf("failed to clear addresses for peer %s: %v", p.Pretty(), err)
}
}
Expand Down
26 changes: 13 additions & 13 deletions p2p/host/peerstore/pstoreds/addr_book_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,28 +152,28 @@ func (gc *dsAddrBookGc) purgeLookahead() {
if err != nil {
log.Warnf("failed while %s record with GC key: %v, err: %v; deleting", msg, key, err)
}
if err = batch.Delete(key); err != nil {
if err = batch.Delete(context.TODO(), key); err != nil {
log.Warnf("failed to delete corrupt GC lookahead entry: %v, err: %v", key, err)
}
}

// This function drops a GC key if the entry is cleaned correctly. It may reschedule another visit
// if the next earliest expiry falls within the current window again.
dropOrReschedule := func(key ds.Key, ar *addrsRecord) {
if err := batch.Delete(key); err != nil {
if err := batch.Delete(context.TODO(), key); err != nil {
log.Warnf("failed to delete lookahead entry: %v, err: %v", key, err)
}

// re-add the record if it needs to be visited again in this window.
if len(ar.Addrs) != 0 && ar.Addrs[0].Expiry <= gc.currWindowEnd {
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", ar.Addrs[0].Expiry, key.Name()))
if err := batch.Put(gcKey, []byte{}); err != nil {
if err := batch.Put(context.TODO(), gcKey, []byte{}); err != nil {
log.Warnf("failed to add new GC key: %v, err: %v", gcKey, err)
}
}
}

results, err := gc.ab.ds.Query(purgeLookaheadQuery)
results, err := gc.ab.ds.Query(context.TODO(), purgeLookaheadQuery)
if err != nil {
log.Warnf("failed while fetching entries to purge: %v", err)
return
Expand Down Expand Up @@ -228,7 +228,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {

// otherwise, fetch it from the store, clean it and flush it.
entryKey := addrBookBase.ChildString(gcKey.Name())
val, err := gc.ab.ds.Get(entryKey)
val, err := gc.ab.ds.Get(context.TODO(), entryKey)
if err != nil {
// captures all errors, including ErrNotFound.
dropInError(gcKey, err, "fetching entry")
Expand All @@ -248,7 +248,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
dropOrReschedule(gcKey, record)
}

if err = batch.Commit(); err != nil {
if err = batch.Commit(context.TODO()); err != nil {
log.Warnf("failed to commit GC purge batch: %v", err)
}
}
Expand All @@ -268,7 +268,7 @@ func (gc *dsAddrBookGc) purgeStore() {
log.Warnf("failed while creating batch to purge GC entries: %v", err)
}

results, err := gc.ab.ds.Query(purgeStoreQuery)
results, err := gc.ab.ds.Query(context.TODO(), purgeStoreQuery)
if err != nil {
log.Warnf("failed while opening iterator: %v", err)
return
Expand All @@ -294,7 +294,7 @@ func (gc *dsAddrBookGc) purgeStore() {
gc.ab.cache.Remove(id)
}

if err = batch.Commit(); err != nil {
if err = batch.Commit(context.TODO()); err != nil {
log.Warnf("failed to commit GC purge batch: %v", err)
}
}
Expand All @@ -321,7 +321,7 @@ func (gc *dsAddrBookGc) populateLookahead() {

var id peer.ID
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
results, err := gc.ab.ds.Query(populateLookaheadQuery)
results, err := gc.ab.ds.Query(context.TODO(), populateLookaheadQuery)
if err != nil {
log.Warnf("failed while querying to populate lookahead GC window: %v", err)
return
Expand Down Expand Up @@ -354,7 +354,7 @@ func (gc *dsAddrBookGc) populateLookahead() {
continue
}
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", cached.Addrs[0].Expiry, idb32))
if err = batch.Put(gcKey, []byte{}); err != nil {
if err = batch.Put(context.TODO(), gcKey, []byte{}); err != nil {
log.Warnf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err)
}
cached.RUnlock()
Expand All @@ -363,7 +363,7 @@ func (gc *dsAddrBookGc) populateLookahead() {

record.Reset()

val, err := gc.ab.ds.Get(ds.RawKey(result.Key))
val, err := gc.ab.ds.Get(context.TODO(), ds.RawKey(result.Key))
if err != nil {
log.Warnf("failed which getting record from store for peer: %v, err: %v", id.Pretty(), err)
continue
Expand All @@ -374,13 +374,13 @@ func (gc *dsAddrBookGc) populateLookahead() {
}
if len(record.Addrs) > 0 && record.Addrs[0].Expiry <= until {
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", record.Addrs[0].Expiry, idb32))
if err = batch.Put(gcKey, []byte{}); err != nil {
if err = batch.Put(context.TODO(), gcKey, []byte{}); err != nil {
log.Warnf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err)
}
}
}

if err = batch.Commit(); err != nil {
if err = batch.Commit(context.TODO()); err != nil {
log.Warnf("failed to commit GC lookahead batch: %v", err)
}

Expand Down
3 changes: 2 additions & 1 deletion p2p/host/peerstore/pstoreds/addr_book_gc_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pstoreds

import (
"context"
"testing"
"time"

Expand All @@ -18,7 +19,7 @@ type testProbe struct {
}

func (tp *testProbe) countLookaheadEntries() (i int) {
results, err := tp.ab.(*dsAddrBook).ds.Query(lookaheadQuery)
results, err := tp.ab.(*dsAddrBook).ds.Query(context.Background(), lookaheadQuery)
if err != nil {
tp.t.Fatal(err)
}
Expand Down
19 changes: 10 additions & 9 deletions p2p/host/peerstore/pstoreds/cyclic_batch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pstoreds

import (
"context"
"errors"
"fmt"

Expand All @@ -23,7 +24,7 @@ type cyclicBatch struct {
}

func newCyclicBatch(ds ds.Batching, threshold int) (ds.Batch, error) {
batch, err := ds.Batch()
batch, err := ds.Batch(context.TODO())
if err != nil {
return nil, err
}
Expand All @@ -39,36 +40,36 @@ func (cb *cyclicBatch) cycle() (err error) {
return nil
}
// commit and renew the batch.
if err = cb.Batch.Commit(); err != nil {
if err = cb.Batch.Commit(context.TODO()); err != nil {
return fmt.Errorf("failed while committing cyclic batch: %w", err)
}
if cb.Batch, err = cb.ds.Batch(); err != nil {
if cb.Batch, err = cb.ds.Batch(context.TODO()); err != nil {
return fmt.Errorf("failed while renewing cyclic batch: %w", err)
}
return nil
}

func (cb *cyclicBatch) Put(key ds.Key, val []byte) error {
func (cb *cyclicBatch) Put(ctx context.Context, key ds.Key, val []byte) error {
if err := cb.cycle(); err != nil {
return err
}
cb.pending++
return cb.Batch.Put(key, val)
return cb.Batch.Put(ctx, key, val)
}

func (cb *cyclicBatch) Delete(key ds.Key) error {
func (cb *cyclicBatch) Delete(ctx context.Context, key ds.Key) error {
if err := cb.cycle(); err != nil {
return err
}
cb.pending++
return cb.Batch.Delete(key)
return cb.Batch.Delete(ctx, key)
}

func (cb *cyclicBatch) Commit() error {
func (cb *cyclicBatch) Commit(ctx context.Context) error {
if cb.Batch == nil {
return errors.New("cyclic batch is closed")
}
if err := cb.Batch.Commit(); err != nil {
if err := cb.Batch.Commit(ctx); err != nil {
return err
}
cb.pending = 0
Expand Down
10 changes: 5 additions & 5 deletions p2p/host/peerstore/pstoreds/keybook.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (kb *dsKeyBook) PubKey(p peer.ID) ic.PubKey {
key := kbBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).Child(pubSuffix)

var pk ic.PubKey
if value, err := kb.ds.Get(key); err == nil {
if value, err := kb.ds.Get(context.TODO(), key); err == nil {
pk, err = ic.UnmarshalPublicKey(value)
if err != nil {
log.Errorf("error when unmarshalling pubkey from datastore for peer %s: %s\n", p.Pretty(), err)
Expand All @@ -56,7 +56,7 @@ func (kb *dsKeyBook) PubKey(p peer.ID) ic.PubKey {
log.Errorf("error when turning extracted pubkey into bytes for peer %s: %s\n", p.Pretty(), err)
return nil
}
err = kb.ds.Put(key, pkb)
err = kb.ds.Put(context.TODO(), key, pkb)
if err != nil {
log.Errorf("error when adding extracted pubkey to peerstore for peer %s: %s\n", p.Pretty(), err)
return nil
Expand All @@ -80,7 +80,7 @@ func (kb *dsKeyBook) AddPubKey(p peer.ID, pk ic.PubKey) error {
log.Errorf("error while converting pubkey byte string for peer %s: %s\n", p.Pretty(), err)
return err
}
err = kb.ds.Put(key, val)
err = kb.ds.Put(context.TODO(), key, val)
if err != nil {
log.Errorf("error while updating pubkey in datastore for peer %s: %s\n", p.Pretty(), err)
}
Expand All @@ -89,7 +89,7 @@ func (kb *dsKeyBook) AddPubKey(p peer.ID, pk ic.PubKey) error {

func (kb *dsKeyBook) PrivKey(p peer.ID) ic.PrivKey {
key := kbBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).Child(privSuffix)
value, err := kb.ds.Get(key)
value, err := kb.ds.Get(context.TODO(), key)
if err != nil {
log.Errorf("error while fetching privkey from datastore for peer %s: %s\n", p.Pretty(), err)
return nil
Expand All @@ -116,7 +116,7 @@ func (kb *dsKeyBook) AddPrivKey(p peer.ID, sk ic.PrivKey) error {
log.Errorf("error while converting privkey byte string for peer %s: %s\n", p.Pretty(), err)
return err
}
err = kb.ds.Put(key, val)
err = kb.ds.Put(context.TODO(), key, val)
if err != nil {
log.Errorf("error while updating privkey in datastore for peer %s: %s\n", p.Pretty(), err)
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/host/peerstore/pstoreds/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (pm *dsPeerMetadata) Get(p peer.ID, key string) (interface{}, error) {
return nil, err
}
k := pmBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).ChildString(key)
value, err := pm.ds.Get(k)
value, err := pm.ds.Get(context.TODO(), k)
if err != nil {
if err == ds.ErrNotFound {
err = pstore.ErrNotFound
Expand All @@ -69,5 +69,5 @@ func (pm *dsPeerMetadata) Put(p peer.ID, key string, val interface{}) error {
if err := gob.NewEncoder(&buf).Encode(&val); err != nil {
return err
}
return pm.ds.Put(k, buf.Bytes())
return pm.ds.Put(context.TODO(), k, buf.Bytes())
}
2 changes: 1 addition & 1 deletion p2p/host/peerstore/pstoreds/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func uniquePeerIds(ds ds.Datastore, prefix ds.Key, extractor func(result query.R
err error
)

if results, err = ds.Query(q); err != nil {
if results, err = ds.Query(context.TODO(), q); err != nil {
log.Error(err)
return nil, err
}
Expand Down

0 comments on commit b7a856c

Please sign in to comment.