Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wire a context in most of the ipfs data pipeline, connect it #683

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,10 +525,10 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (
}

// getLocal attempts to retrieve the value from the datastore
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
func (dht *IpfsDHT) getLocal(ctx context.Context, key string) (*recpb.Record, error) {
logger.Debugw("finding value in datastore", "key", loggableKeyString(key))

rec, err := dht.getRecordFromDatastore(mkDsKey(key))
rec, err := dht.getRecordFromDatastore(ctx, mkDsKey(key))
if err != nil {
logger.Warnw("get local failed", "key", key, "error", err)
return nil, err
Expand All @@ -544,14 +544,14 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
}

// putLocal stores the key value pair in the datastore
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
func (dht *IpfsDHT) putLocal(ctx context.Context, key string, rec *recpb.Record) error {
data, err := proto.Marshal(rec)
if err != nil {
logger.Warnw("failed to put marshal record for local put", "error", err, "key", key)
return err
}

return dht.datastore.Put(mkDsKey(key), data)
return dht.datastore.Put(ctx, mkDsKey(key), data)
}

// peerFound signals the routingTable that we've found a peer that
Expand Down
4 changes: 2 additions & 2 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1891,7 +1891,7 @@ func TestProtocolUpgrade(t *testing.T) {
// Add record into local DHT only
rec := record.MakePutRecord("/v/crow", []byte("caw"))
rec.TimeReceived = u.FormatRFC3339(time.Now())
err = dhtC.putLocal(string(rec.Key), rec)
err = dhtC.putLocal(ctx, string(rec.Key), rec)
if err != nil {
t.Fatal(err)
}
Expand All @@ -1908,7 +1908,7 @@ func TestProtocolUpgrade(t *testing.T) {
// Add record into local DHT only
rec = record.MakePutRecord("/v/bee", []byte("buzz"))
rec.TimeReceived = u.FormatRFC3339(time.Now())
err = dhtB.putLocal(string(rec.Key), rec)
err = dhtB.putLocal(ctx, string(rec.Key), rec)
if err != nil {
t.Fatal(err)
}
Expand Down
16 changes: 8 additions & 8 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
// setup response
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())

rec, err := dht.checkLocalDatastore(k)
rec, err := dht.checkLocalDatastore(ctx, k)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -89,10 +89,10 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
return resp, nil
}

func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) {
func (dht *IpfsDHT) checkLocalDatastore(ctx context.Context, k []byte) (*recpb.Record, error) {
logger.Debugf("%s handleGetValue looking into ds", dht.self)
dskey := convertToDsKey(k)
buf, err := dht.datastore.Get(dskey)
buf, err := dht.datastore.Get(ctx, dskey)
logger.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, buf)

if err == ds.ErrNotFound {
Expand Down Expand Up @@ -131,7 +131,7 @@ func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) {
// may be computationally expensive

if recordIsBad {
err := dht.datastore.Delete(dskey)
err := dht.datastore.Delete(ctx, dskey)
if err != nil {
logger.Error("Failed to delete bad record from datastore: ", err)
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
// Make sure the new record is "better" than the record we have locally.
// This prevents a record with for example a lower sequence number from
// overwriting a record with a higher sequence number.
existing, err := dht.getRecordFromDatastore(dskey)
existing, err := dht.getRecordFromDatastore(ctx, dskey)
if err != nil {
return nil, err
}
Expand All @@ -213,14 +213,14 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
return nil, err
}

err = dht.datastore.Put(dskey, data)
err = dht.datastore.Put(ctx, dskey, data)
return pmes, err
}

// returns nil, nil when either nothing is found or the value found doesn't properly validate.
// returns nil, some_error when there's a *datastore* error (i.e., something goes very wrong)
func (dht *IpfsDHT) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error) {
buf, err := dht.datastore.Get(dskey)
func (dht *IpfsDHT) getRecordFromDatastore(ctx context.Context, dskey ds.Key) (*recpb.Record, error) {
buf, err := dht.datastore.Get(ctx, dskey)
if err == ds.ErrNotFound {
return nil, nil
}
Expand Down
36 changes: 19 additions & 17 deletions providers/providers_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var log = logging.Logger("providers")
// ProviderManager adds and pulls providers out of the datastore,
// caching them in between
type ProviderManager struct {
ctx context.Context
// all non channel fields are meant to be accessed only within
// the run method
cache lru.LRUCache
Expand Down Expand Up @@ -88,6 +89,7 @@ type getProv struct {
// NewProviderManager constructor
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
pm := new(ProviderManager)
pm.ctx = ctx
pm.getprovs = make(chan *getProv)
pm.newprovs = make(chan *addProv)
pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize)
Expand Down Expand Up @@ -125,15 +127,15 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
// don't really care if this fails.
_ = gcQuery.Close()
}
if err := pm.dstore.Flush(); err != nil {
if err := pm.dstore.Flush(context.Background()); err != nil {
log.Error("failed to flush datastore: ", err)
}
}()

for {
select {
case np := <-pm.newprovs:
err := pm.addProv(np.key, np.val)
err := pm.addProv(pm.ctx, np.key, np.val)
if err != nil {
log.Error("error adding new providers: ", err)
continue
Expand All @@ -144,7 +146,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
gcSkip[mkProvKeyFor(np.key, np.val)] = struct{}{}
}
case gp := <-pm.getprovs:
provs, err := pm.getProvidersForKey(gp.key)
provs, err := pm.getProvidersForKey(pm.ctx, gp.key)
if err != nil && err != ds.ErrNotFound {
log.Error("error reading providers: ", err)
}
Expand Down Expand Up @@ -183,7 +185,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
fallthrough
case gcTime.Sub(t) > ProvideValidity:
// or expired
err = pm.dstore.Delete(ds.RawKey(res.Key))
err = pm.dstore.Delete(pm.ctx, ds.RawKey(res.Key))
if err != nil && err != ds.ErrNotFound {
log.Error("failed to remove provider record from disk: ", err)
}
Expand All @@ -197,7 +199,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
pm.cache.Purge()

// Now, kick off a GC of the datastore.
q, err := pm.dstore.Query(dsq.Query{
q, err := pm.dstore.Query(pm.ctx, dsq.Query{
Prefix: ProvidersKeyPrefix,
})
if err != nil {
Expand Down Expand Up @@ -226,23 +228,23 @@ func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.I
}

// addProv updates the cache if needed
func (pm *ProviderManager) addProv(k []byte, p peer.ID) error {
func (pm *ProviderManager) addProv(ctx context.Context, k []byte, p peer.ID) error {
now := time.Now()
if provs, ok := pm.cache.Get(string(k)); ok {
provs.(*providerSet).setVal(p, now)
} // else not cached, just write through

return writeProviderEntry(pm.dstore, k, p, now)
return writeProviderEntry(ctx, pm.dstore, k, p, now)
}

// writeProviderEntry writes the provider into the datastore
func writeProviderEntry(dstore ds.Datastore, k []byte, p peer.ID, t time.Time) error {
func writeProviderEntry(ctx context.Context, dstore ds.Datastore, k []byte, p peer.ID, t time.Time) error {
dsk := mkProvKeyFor(k, p)

buf := make([]byte, 16)
n := binary.PutVarint(buf, t.UnixNano())

return dstore.Put(ds.NewKey(dsk), buf[:n])
return dstore.Put(ctx, ds.NewKey(dsk), buf[:n])
}

func mkProvKeyFor(k []byte, p peer.ID) string {
Expand Down Expand Up @@ -273,22 +275,22 @@ func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID
}
}

func (pm *ProviderManager) getProvidersForKey(k []byte) ([]peer.ID, error) {
pset, err := pm.getProviderSetForKey(k)
func (pm *ProviderManager) getProvidersForKey(ctx context.Context, k []byte) ([]peer.ID, error) {
pset, err := pm.getProviderSetForKey(ctx, k)
if err != nil {
return nil, err
}
return pset.providers, nil
}

// returns the ProviderSet if it already exists on cache, otherwise loads it from datasatore
func (pm *ProviderManager) getProviderSetForKey(k []byte) (*providerSet, error) {
func (pm *ProviderManager) getProviderSetForKey(ctx context.Context, k []byte) (*providerSet, error) {
cached, ok := pm.cache.Get(string(k))
if ok {
return cached.(*providerSet), nil
}

pset, err := loadProviderSet(pm.dstore, k)
pset, err := loadProviderSet(ctx, pm.dstore, k)
if err != nil {
return nil, err
}
Expand All @@ -301,8 +303,8 @@ func (pm *ProviderManager) getProviderSetForKey(k []byte) (*providerSet, error)
}

// loads the ProviderSet out of the datastore
func loadProviderSet(dstore ds.Datastore, k []byte) (*providerSet, error) {
res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k)})
func loadProviderSet(ctx context.Context, dstore ds.Datastore, k []byte) (*providerSet, error) {
res, err := dstore.Query(ctx, dsq.Query{Prefix: mkProvKey(k)})
if err != nil {
return nil, err
}
Expand All @@ -329,7 +331,7 @@ func loadProviderSet(dstore ds.Datastore, k []byte) (*providerSet, error) {
fallthrough
case now.Sub(t) > ProvideValidity:
// or just expired
err = dstore.Delete(ds.RawKey(e.Key))
err = dstore.Delete(ctx, ds.RawKey(e.Key))
if err != nil && err != ds.ErrNotFound {
log.Error("failed to remove provider record from disk: ", err)
}
Expand All @@ -341,7 +343,7 @@ func loadProviderSet(dstore ds.Datastore, k []byte) (*providerSet, error) {
decstr, err := base32.RawStdEncoding.DecodeString(e.Key[lix+1:])
if err != nil {
log.Error("base32 decoding error: ", err)
err = dstore.Delete(ds.RawKey(e.Key))
err = dstore.Delete(ctx, ds.RawKey(e.Key))
if err != nil && err != ds.ErrNotFound {
log.Error("failed to remove provider record from disk: ", err)
}
Expand Down
9 changes: 5 additions & 4 deletions providers/providers_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func TestProvidersDatastore(t *testing.T) {
}

func TestProvidersSerialization(t *testing.T) {
ctx := context.Background()
dstore := dssync.MutexWrap(ds.NewMapDatastore())

k := u.Hash(([]byte("my key!")))
Expand All @@ -101,17 +102,17 @@ func TestProvidersSerialization(t *testing.T) {
pt1 := time.Now()
pt2 := pt1.Add(time.Hour)

err := writeProviderEntry(dstore, k, p1, pt1)
err := writeProviderEntry(ctx, dstore, k, p1, pt1)
if err != nil {
t.Fatal(err)
}

err = writeProviderEntry(dstore, k, p2, pt2)
err = writeProviderEntry(ctx, dstore, k, p2, pt2)
if err != nil {
t.Fatal(err)
}

pset, err := loadProviderSet(dstore, k)
pset, err := loadProviderSet(ctx, dstore, k)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -206,7 +207,7 @@ func TestProvidesExpire(t *testing.T) {
t.Fatal("providers map not cleaned up")
}

res, err := ds.Query(dsq.Query{Prefix: ProvidersKeyPrefix})
res, err := ds.Query(ctx, dsq.Query{Prefix: ProvidersKeyPrefix})
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 4 additions & 4 deletions records_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func TestPubkeyBadKeyFromDHT(t *testing.T) {
// Store incorrect public key on node B
rec := record.MakePutRecord(pkkey, wrongbytes)
rec.TimeReceived = u.FormatRFC3339(time.Now())
err = dhtB.putLocal(pkkey, rec)
err = dhtB.putLocal(ctx, pkkey, rec)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestPubkeyBadKeyFromDHTGoodKeyDirect(t *testing.T) {
// Store incorrect public key on node B
rec := record.MakePutRecord(pkkey, wrongbytes)
rec.TimeReceived = u.FormatRFC3339(time.Now())
err = dhtB.putLocal(pkkey, rec)
err = dhtB.putLocal(ctx, pkkey, rec)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -358,7 +358,7 @@ func TestValuesDisabled(t *testing.T) {
if err != routing.ErrNotSupported {
t.Fatal("get should have failed on node B")
}
rec, _ := dhtB.getLocal(pkkey)
rec, _ := dhtB.getLocal(ctx, pkkey)
if rec != nil {
t.Fatal("node B should not have found the value locally")
}
Expand All @@ -374,7 +374,7 @@ func TestValuesDisabled(t *testing.T) {
t.Fatal("node A should not have found the value")
}
}
rec, _ := dhtA.getLocal(pkkey)
rec, _ := dhtA.getLocal(ctx, pkkey)
if rec != nil {
t.Fatal("node A should not have found the value locally")
}
Expand Down
8 changes: 4 additions & 4 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
return err
}

old, err := dht.getLocal(key)
old, err := dht.getLocal(ctx, key)
if err != nil {
// Means something is wrong with the datastore.
return err
Expand All @@ -59,7 +59,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts

rec := record.MakePutRecord(key, value)
rec.TimeReceived = u.FormatRFC3339(time.Now())
err = dht.putLocal(key, rec)
err = dht.putLocal(ctx, key, rec)
if err != nil {
return err
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte
go func(p peer.ID) {
//TODO: Is this possible?
if p == dht.self {
err := dht.putLocal(key, fixupRec)
err := dht.putLocal(ctx, key, fixupRec)
if err != nil {
logger.Error("Error correcting local dht entry:", err)
}
Expand All @@ -295,7 +295,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st

logger.Debugw("finding value", "key", loggableKeyString(key))

if rec, err := dht.getLocal(key); rec != nil && err == nil {
if rec, err := dht.getLocal(ctx, key); rec != nil && err == nil {
select {
case valCh <- RecvdVal{
Val: rec.GetValue(),
Expand Down