Skip to content
This repository has been archived by the owner on Aug 19, 2022. It is now read-only.

feat: plumb through datastore contexts #176

Merged
merged 1 commit into from
Oct 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ retract v0.2.9 // Contains backwards-incompatible changes. Use v0.3.0 instead.
require (
github.com/gogo/protobuf v1.3.2
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.7
github.com/ipfs/go-ds-leveldb v0.4.2
github.com/ipfs/go-datastore v0.5.0
github.com/ipfs/go-ds-badger v0.3.0
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-log v1.0.5
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p-core v0.8.6
Expand Down
17 changes: 8 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,21 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/ipfs/go-cid v0.0.7 h1:ysQJVJA3fNDF1qigJbsSQOdjhVLsOEoPdh0+R97k3jY=
github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I=
github.com/ipfs/go-datastore v0.4.1/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
github.com/ipfs/go-datastore v0.4.4/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
github.com/ipfs/go-datastore v0.4.5 h1:cwOUcGMLdLPWgu3SlrCckCMznaGADbPqE0r8h768/Dg=
github.com/ipfs/go-datastore v0.4.5/go.mod h1:eXTcaaiN6uOlVCLS9GjJUJtlvJfM3xk23w3fyfrmmJs=
github.com/ipfs/go-ds-badger v0.2.7 h1:ju5REfIm+v+wgVnQ19xGLYPHYHbYLR6qJfmMbCDSK1I=
github.com/ipfs/go-ds-badger v0.2.7/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6JPxd14JHA=
github.com/ipfs/go-ds-leveldb v0.4.2 h1:QmQoAJ9WkPMUfBLnu1sBVy0xWWlJPg0m4kRAiJL9iaw=
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-datastore v0.5.0 h1:rQicVCEacWyk4JZ6G5bD9TKR7lZEG1MWcG7UdWYrFAU=
github.com/ipfs/go-datastore v0.5.0/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk=
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
github.com/ipfs/go-ds-badger v0.3.0 h1:xREL3V0EH9S219kFFueOYJJTcjgNSZ2HY1iSvN7U1Ro=
github.com/ipfs/go-ds-badger v0.3.0/go.mod h1:1ke6mXNqeV8K3y5Ak2bAA0osoTfmxUdupVCGm4QUIek=
github.com/ipfs/go-ds-leveldb v0.5.0 h1:s++MEBbD3ZKc9/8/njrn4flZLnCuY9I79v94gBUNumo=
github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q=
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8=
github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo=
github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw=
github.com/ipfs/go-log/v2 v2.1.3 h1:1iS3IU7aXRlbgUpN8yTTpJ53NXYjAe37vcI5+5nYrzk=
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
Expand Down
8 changes: 4 additions & 4 deletions pstoreds/addr_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (r *addrsRecord) flush(write ds.Write) (err error) {
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(r.Id.ID)))

if len(r.Addrs) == 0 {
if err = write.Delete(key); err == nil {
if err = write.Delete(context.TODO(), key); err == nil {
r.dirty = false
}
return err
Expand All @@ -60,7 +60,7 @@ func (r *addrsRecord) flush(write ds.Write) (err error) {
if err != nil {
return err
}
if err = write.Put(key, data); err != nil {
if err = write.Put(context.TODO(), key, data); err != nil {
return err
}
// write succeeded; record is no longer dirty.
Expand Down Expand Up @@ -223,7 +223,7 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs

pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(id)))
data, err := ab.ds.Get(key)
data, err := ab.ds.Get(context.TODO(), key)

switch err {
case ds.ErrNotFound:
Expand Down Expand Up @@ -446,7 +446,7 @@ func (ab *dsAddrBook) ClearAddrs(p peer.ID) {
ab.cache.Remove(p)

key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(p)))
if err := ab.ds.Delete(key); err != nil {
if err := ab.ds.Delete(context.TODO(), key); err != nil {
log.Errorf("failed to clear addresses for peer %s: %v", p.Pretty(), err)
}
}
Expand Down
26 changes: 13 additions & 13 deletions pstoreds/addr_book_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,28 +152,28 @@ func (gc *dsAddrBookGc) purgeLookahead() {
if err != nil {
log.Warnf("failed while %s record with GC key: %v, err: %v; deleting", msg, key, err)
}
if err = batch.Delete(key); err != nil {
if err = batch.Delete(context.TODO(), key); err != nil {
log.Warnf("failed to delete corrupt GC lookahead entry: %v, err: %v", key, err)
}
}

// This function drops a GC key if the entry is cleaned correctly. It may reschedule another visit
// if the next earliest expiry falls within the current window again.
dropOrReschedule := func(key ds.Key, ar *addrsRecord) {
if err := batch.Delete(key); err != nil {
if err := batch.Delete(context.TODO(), key); err != nil {
log.Warnf("failed to delete lookahead entry: %v, err: %v", key, err)
}

// re-add the record if it needs to be visited again in this window.
if len(ar.Addrs) != 0 && ar.Addrs[0].Expiry <= gc.currWindowEnd {
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", ar.Addrs[0].Expiry, key.Name()))
if err := batch.Put(gcKey, []byte{}); err != nil {
if err := batch.Put(context.TODO(), gcKey, []byte{}); err != nil {
log.Warnf("failed to add new GC key: %v, err: %v", gcKey, err)
}
}
}

results, err := gc.ab.ds.Query(purgeLookaheadQuery)
results, err := gc.ab.ds.Query(context.TODO(), purgeLookaheadQuery)
if err != nil {
log.Warnf("failed while fetching entries to purge: %v", err)
return
Expand Down Expand Up @@ -228,7 +228,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {

// otherwise, fetch it from the store, clean it and flush it.
entryKey := addrBookBase.ChildString(gcKey.Name())
val, err := gc.ab.ds.Get(entryKey)
val, err := gc.ab.ds.Get(context.TODO(), entryKey)
if err != nil {
// captures all errors, including ErrNotFound.
dropInError(gcKey, err, "fetching entry")
Expand All @@ -248,7 +248,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
dropOrReschedule(gcKey, record)
}

if err = batch.Commit(); err != nil {
if err = batch.Commit(context.TODO()); err != nil {
log.Warnf("failed to commit GC purge batch: %v", err)
}
}
Expand All @@ -268,7 +268,7 @@ func (gc *dsAddrBookGc) purgeStore() {
log.Warnf("failed while creating batch to purge GC entries: %v", err)
}

results, err := gc.ab.ds.Query(purgeStoreQuery)
results, err := gc.ab.ds.Query(context.TODO(), purgeStoreQuery)
if err != nil {
log.Warnf("failed while opening iterator: %v", err)
return
Expand All @@ -294,7 +294,7 @@ func (gc *dsAddrBookGc) purgeStore() {
gc.ab.cache.Remove(id)
}

if err = batch.Commit(); err != nil {
if err = batch.Commit(context.TODO()); err != nil {
log.Warnf("failed to commit GC purge batch: %v", err)
}
}
Expand All @@ -321,7 +321,7 @@ func (gc *dsAddrBookGc) populateLookahead() {

var id peer.ID
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
results, err := gc.ab.ds.Query(populateLookaheadQuery)
results, err := gc.ab.ds.Query(context.TODO(), populateLookaheadQuery)
if err != nil {
log.Warnf("failed while querying to populate lookahead GC window: %v", err)
return
Expand Down Expand Up @@ -354,7 +354,7 @@ func (gc *dsAddrBookGc) populateLookahead() {
continue
}
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", cached.Addrs[0].Expiry, idb32))
if err = batch.Put(gcKey, []byte{}); err != nil {
if err = batch.Put(context.TODO(), gcKey, []byte{}); err != nil {
log.Warnf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err)
}
cached.RUnlock()
Expand All @@ -363,7 +363,7 @@ func (gc *dsAddrBookGc) populateLookahead() {

record.Reset()

val, err := gc.ab.ds.Get(ds.RawKey(result.Key))
val, err := gc.ab.ds.Get(context.TODO(), ds.RawKey(result.Key))
if err != nil {
log.Warnf("failed which getting record from store for peer: %v, err: %v", id.Pretty(), err)
continue
Expand All @@ -374,13 +374,13 @@ func (gc *dsAddrBookGc) populateLookahead() {
}
if len(record.Addrs) > 0 && record.Addrs[0].Expiry <= until {
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", record.Addrs[0].Expiry, idb32))
if err = batch.Put(gcKey, []byte{}); err != nil {
if err = batch.Put(context.TODO(), gcKey, []byte{}); err != nil {
log.Warnf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err)
}
}
}

if err = batch.Commit(); err != nil {
if err = batch.Commit(context.TODO()); err != nil {
log.Warnf("failed to commit GC lookahead batch: %v", err)
}

Expand Down
3 changes: 2 additions & 1 deletion pstoreds/addr_book_gc_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pstoreds

import (
"context"
"testing"
"time"

Expand All @@ -18,7 +19,7 @@ type testProbe struct {
}

func (tp *testProbe) countLookaheadEntries() (i int) {
results, err := tp.ab.(*dsAddrBook).ds.Query(lookaheadQuery)
results, err := tp.ab.(*dsAddrBook).ds.Query(context.Background(), lookaheadQuery)
if err != nil {
tp.t.Fatal(err)
}
Expand Down
19 changes: 10 additions & 9 deletions pstoreds/cyclic_batch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pstoreds

import (
"context"
"errors"
"fmt"

Expand All @@ -23,7 +24,7 @@ type cyclicBatch struct {
}

func newCyclicBatch(ds ds.Batching, threshold int) (ds.Batch, error) {
batch, err := ds.Batch()
batch, err := ds.Batch(context.TODO())
if err != nil {
return nil, err
}
Expand All @@ -39,36 +40,36 @@ func (cb *cyclicBatch) cycle() (err error) {
return nil
}
// commit and renew the batch.
if err = cb.Batch.Commit(); err != nil {
if err = cb.Batch.Commit(context.TODO()); err != nil {
return fmt.Errorf("failed while committing cyclic batch: %w", err)
}
if cb.Batch, err = cb.ds.Batch(); err != nil {
if cb.Batch, err = cb.ds.Batch(context.TODO()); err != nil {
return fmt.Errorf("failed while renewing cyclic batch: %w", err)
}
return nil
}

func (cb *cyclicBatch) Put(key ds.Key, val []byte) error {
func (cb *cyclicBatch) Put(ctx context.Context, key ds.Key, val []byte) error {
if err := cb.cycle(); err != nil {
return err
}
cb.pending++
return cb.Batch.Put(key, val)
return cb.Batch.Put(ctx, key, val)
}

func (cb *cyclicBatch) Delete(key ds.Key) error {
func (cb *cyclicBatch) Delete(ctx context.Context, key ds.Key) error {
if err := cb.cycle(); err != nil {
return err
}
cb.pending++
return cb.Batch.Delete(key)
return cb.Batch.Delete(ctx, key)
}

func (cb *cyclicBatch) Commit() error {
func (cb *cyclicBatch) Commit(ctx context.Context) error {
if cb.Batch == nil {
return errors.New("cyclic batch is closed")
}
if err := cb.Batch.Commit(); err != nil {
if err := cb.Batch.Commit(ctx); err != nil {
return err
}
cb.pending = 0
Expand Down
10 changes: 5 additions & 5 deletions pstoreds/keybook.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (kb *dsKeyBook) PubKey(p peer.ID) ic.PubKey {
key := kbBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).Child(pubSuffix)

var pk ic.PubKey
if value, err := kb.ds.Get(key); err == nil {
if value, err := kb.ds.Get(context.TODO(), key); err == nil {
pk, err = ic.UnmarshalPublicKey(value)
if err != nil {
log.Errorf("error when unmarshalling pubkey from datastore for peer %s: %s\n", p.Pretty(), err)
Expand All @@ -56,7 +56,7 @@ func (kb *dsKeyBook) PubKey(p peer.ID) ic.PubKey {
log.Errorf("error when turning extracted pubkey into bytes for peer %s: %s\n", p.Pretty(), err)
return nil
}
err = kb.ds.Put(key, pkb)
err = kb.ds.Put(context.TODO(), key, pkb)
if err != nil {
log.Errorf("error when adding extracted pubkey to peerstore for peer %s: %s\n", p.Pretty(), err)
return nil
Expand All @@ -80,7 +80,7 @@ func (kb *dsKeyBook) AddPubKey(p peer.ID, pk ic.PubKey) error {
log.Errorf("error while converting pubkey byte string for peer %s: %s\n", p.Pretty(), err)
return err
}
err = kb.ds.Put(key, val)
err = kb.ds.Put(context.TODO(), key, val)
if err != nil {
log.Errorf("error while updating pubkey in datastore for peer %s: %s\n", p.Pretty(), err)
}
Expand All @@ -89,7 +89,7 @@ func (kb *dsKeyBook) AddPubKey(p peer.ID, pk ic.PubKey) error {

func (kb *dsKeyBook) PrivKey(p peer.ID) ic.PrivKey {
key := kbBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).Child(privSuffix)
value, err := kb.ds.Get(key)
value, err := kb.ds.Get(context.TODO(), key)
if err != nil {
log.Errorf("error while fetching privkey from datastore for peer %s: %s\n", p.Pretty(), err)
return nil
Expand All @@ -116,7 +116,7 @@ func (kb *dsKeyBook) AddPrivKey(p peer.ID, sk ic.PrivKey) error {
log.Errorf("error while converting privkey byte string for peer %s: %s\n", p.Pretty(), err)
return err
}
err = kb.ds.Put(key, val)
err = kb.ds.Put(context.TODO(), key, val)
if err != nil {
log.Errorf("error while updating privkey in datastore for peer %s: %s\n", p.Pretty(), err)
}
Expand Down
4 changes: 2 additions & 2 deletions pstoreds/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (pm *dsPeerMetadata) Get(p peer.ID, key string) (interface{}, error) {
return nil, err
}
k := pmBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).ChildString(key)
value, err := pm.ds.Get(k)
value, err := pm.ds.Get(context.TODO(), k)
if err != nil {
if err == ds.ErrNotFound {
err = pstore.ErrNotFound
Expand All @@ -69,5 +69,5 @@ func (pm *dsPeerMetadata) Put(p peer.ID, key string, val interface{}) error {
if err := gob.NewEncoder(&buf).Encode(&val); err != nil {
return err
}
return pm.ds.Put(k, buf.Bytes())
return pm.ds.Put(context.TODO(), k, buf.Bytes())
}
2 changes: 1 addition & 1 deletion pstoreds/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func uniquePeerIds(ds ds.Datastore, prefix ds.Key, extractor func(result query.R
err error
)

if results, err = ds.Query(q); err != nil {
if results, err = ds.Query(context.TODO(), q); err != nil {
log.Error(err)
return nil, err
}
Expand Down