Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

custom ProviderManager that brokers AddrInfos #751

Merged
merged 8 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ type IpfsDHT struct {
datastore ds.Datastore // Local data

routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
// ProviderManager stores & manages the provider records for this Dht peer.
ProviderManager *providers.ProviderManager
// providerStore stores & manages the provider records for this Dht peer.
providerStore providers.ProviderStore

// manages Routing Table refresh
rtRefreshManager *rtrefresh.RtRefreshManager
Expand Down Expand Up @@ -221,7 +221,9 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
}
dht.proc.Go(sn.subscribe)
// handle providers
dht.proc.AddChild(dht.ProviderManager.Process())
if mgr, ok := dht.providerStore.(interface{ Process() goprocess.Process }); ok {
dht.proc.AddChild(mgr.Process())
}

// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
// since RT membership is decoupled from connectivity
Expand Down Expand Up @@ -338,11 +340,14 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
// the DHT context should be done when the process is closed
dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.Datastore, cfg.ProvidersOptions...)
if err != nil {
return nil, err
if cfg.ProviderStore != nil {
dht.providerStore = cfg.ProviderStore
} else {
dht.providerStore, err = providers.NewProviderManager(dht.ctx, h.ID(), dht.peerstore, cfg.Datastore)
if err != nil {
return nil, fmt.Errorf("initializing default provider manager (%v)", err)
}
}
dht.ProviderManager = pm

dht.rtFreezeTimeout = rtFreezeTimeout

Expand Down Expand Up @@ -413,6 +418,11 @@ func makeRoutingTable(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutbound
return rt, err
}

// ProviderStore returns the provider storage object for storing and retrieving provider records.
func (dht *IpfsDHT) ProviderStore() providers.ProviderStore {
return dht.providerStore
}

// GetRoutingTableDiversityStats returns the diversity stats for the Routing Table.
func (dht *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats {
return dht.routingTable.GetDiversityStats()
Expand Down
20 changes: 8 additions & 12 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ const DefaultPrefix protocol.ID = "/ipfs"

type Option = dhtcfg.Option

// ProviderStore sets the provider storage manager.
func ProviderStore(ps providers.ProviderStore) Option {
return func(c *dhtcfg.Config) error {
c.ProviderStore = ps
return nil
}
}

// RoutingTableLatencyTolerance sets the maximum acceptable latency for peers
// in the routing table's cluster.
func RoutingTableLatencyTolerance(latency time.Duration) Option {
Expand Down Expand Up @@ -236,18 +244,6 @@ func DisableValues() Option {
}
}

// ProvidersOptions are options passed directly to the provider manager.
//
// The provider manager adds and gets provider records from the datastore, cahing
// them in between. These options are passed to the provider manager allowing
// customisation of things like the GC interval and cache implementation.
func ProvidersOptions(opts []providers.Option) Option {
return func(c *dhtcfg.Config) error {
c.ProvidersOptions = opts
return nil
}
}

// QueryFilter sets a function that approves which peers may be dialed in a query
func QueryFilter(filter QueryFilterFunc) Option {
return func(c *dhtcfg.Config) error {
Expand Down
8 changes: 4 additions & 4 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func TestLocalProvides(t *testing.T) {

for _, c := range testCaseCids {
for i := 0; i < 3; i++ {
provs := dhts[i].ProviderManager.GetProviders(ctx, c.Hash())
provs, _ := dhts[i].ProviderStore().GetProviders(ctx, c.Hash())
if len(provs) > 0 {
t.Fatal("shouldnt know this")
}
Expand Down Expand Up @@ -1285,7 +1285,7 @@ func TestClientModeConnect(t *testing.T) {

c := testCaseCids[0]
p := peer.ID("TestPeer")
a.ProviderManager.AddProvider(ctx, c.Hash(), p)
a.ProviderStore().AddProvider(ctx, c.Hash(), peer.AddrInfo{ID: p})
time.Sleep(time.Millisecond * 5) // just in case...

provs, err := b.FindProviders(ctx, c)
Expand Down Expand Up @@ -1548,7 +1548,7 @@ func TestProvideDisabled(t *testing.T) {
if err != routing.ErrNotSupported {
t.Fatal("get should have failed on node B")
}
provs := dhtB.ProviderManager.GetProviders(ctx, kHash)
provs, _ := dhtB.ProviderStore().GetProviders(ctx, kHash)
if len(provs) != 0 {
t.Fatal("node B should not have found local providers")
}
Expand All @@ -1564,7 +1564,7 @@ func TestProvideDisabled(t *testing.T) {
t.Fatal("node A should not have found providers")
}
}
provAddrs := dhtA.ProviderManager.GetProviders(ctx, kHash)
provAddrs, _ := dhtA.ProviderStore().GetProviders(ctx, kHash)
if len(provAddrs) != 0 {
t.Fatal("node A should not have found local providers")
}
Expand Down
14 changes: 8 additions & 6 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful

ctx, cancel := context.WithCancel(context.Background())

pm, err := providers.NewProviderManager(ctx, h.ID(), dhtcfg.Datastore)
pm, err := providers.NewProviderManager(ctx, h.ID(), h.Peerstore(), dhtcfg.Datastore)
if err != nil {
cancel()
return nil, err
Expand Down Expand Up @@ -762,7 +762,7 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e
logger.Debugw("providing", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))

// add self locally
dht.ProviderManager.AddProvider(ctx, keyMH, dht.h.ID())
dht.ProviderManager.AddProvider(ctx, keyMH, peer.AddrInfo{ID: dht.h.ID()})
if !brdcst {
return nil
}
Expand Down Expand Up @@ -1209,13 +1209,15 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
ps = peer.NewLimitedSet(count)
}

provs := dht.ProviderManager.GetProviders(ctx, key)
provs, err := dht.ProviderManager.GetProviders(ctx, key)
if err != nil {
return
}
for _, p := range provs {
// NOTE: Assuming that this list of peers is unique
if ps.TryAdd(p) {
pi := dht.h.Peerstore().PeerInfo(p)
if ps.TryAdd(p.ID) {
select {
case peerOut <- pi:
case peerOut <- p:
case <-ctx.Done():
return
}
Expand Down
17 changes: 5 additions & 12 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
pstore "github.com/libp2p/go-libp2p-peerstore"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -318,13 +317,11 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())

// setup providers
providers := dht.ProviderManager.GetProviders(ctx, key)

if len(providers) > 0 {
// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
infos := pstore.PeerInfos(dht.peerstore, providers)
resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
providers, err := dht.providerStore.GetProviders(ctx, key)
if err != nil {
return nil, err
}
resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), providers)

// Also send closer peers.
closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
Expand Down Expand Up @@ -362,11 +359,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
continue
}

if pi.ID != dht.self { // don't add own addrs.
// add the received addresses to our peerstore.
dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL)
}
dht.ProviderManager.AddProvider(ctx, key, p)
dht.providerStore.AddProvider(ctx, key, peer.AddrInfo{ID: p})
}

return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Config struct {
MaxRecordAge time.Duration
EnableProviders bool
EnableValues bool
ProvidersOptions []providers.Option
ProviderStore providers.ProviderStore
QueryPeerFilter QueryFilterFunc

RoutingTable struct {
Expand Down
33 changes: 26 additions & 7 deletions providers/providers_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/libp2p/go-libp2p-core/peer"
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
peerstore_legacy "github.com/libp2p/go-libp2p-peerstore"
petar marked this conversation as resolved.
Show resolved Hide resolved

lru "github.com/hashicorp/golang-lru/simplelru"
ds "github.com/ipfs/go-datastore"
Expand All @@ -30,12 +32,20 @@ var lruCacheSize = 256
var batchBufferSize = 256
var log = logging.Logger("providers")

// ProviderStore represents a store that associates peers and their addresses to keys.
type ProviderStore interface {
AddProvider(ctx context.Context, key []byte, prov peer.AddrInfo) error
GetProviders(ctx context.Context, key []byte) ([]peer.AddrInfo, error)
}

// ProviderManager adds and pulls providers out of the datastore,
// caching them in between
type ProviderManager struct {
self peer.ID
// all non channel fields are meant to be accessed only within
// the run method
cache lru.LRUCache
pstore peerstore.Peerstore
dstore *autobatch.Datastore

newprovs chan *addProv
Expand All @@ -45,6 +55,8 @@ type ProviderManager struct {
cleanupInterval time.Duration
}

var _ ProviderStore = (*ProviderManager)(nil)

// Option is a function that sets a provider manager option.
type Option func(*ProviderManager) error

Expand Down Expand Up @@ -86,10 +98,12 @@ type getProv struct {
}

// NewProviderManager constructor
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
func NewProviderManager(ctx context.Context, local peer.ID, ps peerstore.Peerstore, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
pm := new(ProviderManager)
pm.self = local
pm.getprovs = make(chan *getProv)
pm.newprovs = make(chan *addProv)
pm.pstore = ps
pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize)
cache, err := lru.NewLRU(lruCacheSize, nil)
if err != nil {
Expand Down Expand Up @@ -214,14 +228,19 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
}

// AddProvider adds a provider
func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.ID) {
func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, provInfo peer.AddrInfo) error {
if provInfo.ID != pm.self { // don't add own addrs.
pm.pstore.AddAddrs(provInfo.ID, provInfo.Addrs, peerstore.ProviderAddrTTL)
}
prov := &addProv{
key: k,
val: val,
val: provInfo.ID,
}
select {
case pm.newprovs <- prov:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

Expand Down Expand Up @@ -255,21 +274,21 @@ func mkProvKey(k []byte) string {

// GetProviders returns the set of providers for the given key.
// This method _does not_ copy the set. Do not modify it.
func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID {
func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) ([]peer.AddrInfo, error) {
gp := &getProv{
key: k,
resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
}
select {
case <-ctx.Done():
return nil
return nil, ctx.Err()
case pm.getprovs <- gp:
}
select {
case <-ctx.Done():
return nil
return nil, ctx.Err()
case peers := <-gp.resp:
return peers
return peerstore_legacy.PeerInfos(pm.pstore, peers), nil
}
}

Expand Down
Loading