Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

custom ProviderManager that brokers AddrInfos #751

Merged
merged 8 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ type IpfsDHT struct {
datastore ds.Datastore // Local data

routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
// ProviderManager stores & manages the provider records for this Dht peer.
ProviderManager *providers.ProviderManager
// providerStore stores & manages the provider records for this Dht peer.
providerStore providers.ProviderStore

// manages Routing Table refresh
rtRefreshManager *rtrefresh.RtRefreshManager
Expand Down Expand Up @@ -221,7 +221,9 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
}
dht.proc.Go(sn.subscribe)
// handle providers
dht.proc.AddChild(dht.ProviderManager.Process())
if mgr, ok := dht.providerStore.(interface{ Process() goprocess.Process }); ok {
dht.proc.AddChild(mgr.Process())
}

// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
// since RT membership is decoupled from connectivity
Expand Down Expand Up @@ -338,11 +340,14 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
// the DHT context should be done when the process is closed
dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.Datastore, cfg.ProvidersOptions...)
if err != nil {
return nil, err
if cfg.ProviderStore != nil {
dht.providerStore = cfg.ProviderStore
} else {
dht.providerStore, err = providers.NewProviderManager(dht.ctx, h.ID(), dht.peerstore, cfg.Datastore, cfg.ProvidersOptions...)
if err != nil {
return nil, fmt.Errorf("initializing default provider manager (%v)", err)
}
}
dht.ProviderManager = pm

dht.rtFreezeTimeout = rtFreezeTimeout

Expand Down Expand Up @@ -413,6 +418,11 @@ func makeRoutingTable(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutbound
return rt, err
}

// ProviderStore returns the provider storage object for storing and retrieving provider records.
func (dht *IpfsDHT) ProviderStore() providers.ProviderStore {
return dht.providerStore
}

// GetRoutingTableDiversityStats returns the diversity stats for the Routing Table.
func (dht *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats {
return dht.routingTable.GetDiversityStats()
Expand Down
8 changes: 8 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ const DefaultPrefix protocol.ID = "/ipfs"

type Option = dhtcfg.Option

// ProviderStore sets the provider storage manager.
func ProviderStore(ps providers.ProviderStore) Option {
return func(c *dhtcfg.Config) error {
c.ProviderStore = ps
return nil
}
}

// RoutingTableLatencyTolerance sets the maximum acceptable latency for peers
// in the routing table's cluster.
func RoutingTableLatencyTolerance(latency time.Duration) Option {
Expand Down
16 changes: 3 additions & 13 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
pstore "github.com/libp2p/go-libp2p-peerstore"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -318,13 +317,8 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())

// setup providers
providers := dht.ProviderManager.GetProviders(ctx, key)

if len(providers) > 0 {
// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
infos := pstore.PeerInfos(dht.peerstore, providers)
resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
}
providers := dht.providerStore.GetProviders(ctx, key)
resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), providers)

// Also send closer peers.
closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
Expand Down Expand Up @@ -362,11 +356,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
continue
}

if pi.ID != dht.self { // don't add own addrs.
// add the received addresses to our peerstore.
dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL)
}
dht.ProviderManager.AddProvider(ctx, key, p)
dht.providerStore.AddProvider(ctx, key, pstore.PeerInfo{ID: p})
}

return nil, nil
Expand Down
3 changes: 2 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type Config struct {
MaxRecordAge time.Duration
EnableProviders bool
EnableValues bool
ProvidersOptions []providers.Option
ProvidersOptions []providers.Option //XXX: get rid?
petar marked this conversation as resolved.
Show resolved Hide resolved
ProviderStore providers.ProviderStore
QueryPeerFilter QueryFilterFunc

RoutingTable struct {
Expand Down
24 changes: 19 additions & 5 deletions providers/providers_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/libp2p/go-libp2p-core/peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"

lru "github.com/hashicorp/golang-lru/simplelru"
ds "github.com/ipfs/go-datastore"
Expand All @@ -30,12 +31,20 @@ var lruCacheSize = 256
var batchBufferSize = 256
var log = logging.Logger("providers")

// ProviderStore represents a store that associates peers and their addresses to keys.
type ProviderStore interface {
AddProvider(ctx context.Context, key []byte, prov peer.AddrInfo)
GetProviders(ctx context.Context, key []byte) []peer.AddrInfo
petar marked this conversation as resolved.
Show resolved Hide resolved
}

// ProviderManager adds and pulls providers out of the datastore,
// caching them in between
type ProviderManager struct {
self peer.ID
// all non channel fields are meant to be accessed only within
// the run method
cache lru.LRUCache
pstore peerstore.Peerstore
dstore *autobatch.Datastore

newprovs chan *addProv
Expand Down Expand Up @@ -86,10 +95,12 @@ type getProv struct {
}

// NewProviderManager constructor
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
func NewProviderManager(ctx context.Context, local peer.ID, ps peerstore.Peerstore, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
pm := new(ProviderManager)
pm.self = local
pm.getprovs = make(chan *getProv)
pm.newprovs = make(chan *addProv)
pm.pstore = ps
pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize)
cache, err := lru.NewLRU(lruCacheSize, nil)
if err != nil {
Expand Down Expand Up @@ -214,10 +225,13 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
}

// AddProvider adds a provider
func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.ID) {
func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, provInfo peer.AddrInfo) {
if provInfo.ID != pm.self { // don't add own addrs.
pm.pstore.AddAddrs(provInfo.ID, provInfo.Addrs, peerstore.ProviderAddrTTL)
}
prov := &addProv{
key: k,
val: val,
val: provInfo.ID,
}
select {
case pm.newprovs <- prov:
Expand Down Expand Up @@ -255,7 +269,7 @@ func mkProvKey(k []byte) string {

// GetProviders returns the set of providers for the given key.
// This method _does not_ copy the set. Do not modify it.
func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID {
func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.AddrInfo {
gp := &getProv{
key: k,
resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
Expand All @@ -269,7 +283,7 @@ func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID
case <-ctx.Done():
return nil
case peers := <-gp.resp:
return peers
return peerstore.PeerInfos(pm.pstore, peers)
}
}

Expand Down
4 changes: 4 additions & 0 deletions providers/providers_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
// lds "github.com/ipfs/go-ds-leveldb"
)

func TestProviderManagerImplementsProviderStore(t *testing.T) {
var _ ProviderStore = (*ProviderManager)(nil)
}
petar marked this conversation as resolved.
Show resolved Hide resolved

func TestProviderManager(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
10 changes: 5 additions & 5 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
logger.Debugw("providing", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))

// add self locally
dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
dht.providerStore.AddProvider(ctx, keyMH, peer.AddrInfo{ID: dht.self})
if !brdcst {
return nil
}
Expand Down Expand Up @@ -492,13 +492,13 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash
ps = peer.NewLimitedSet(count)
}

provs := dht.ProviderManager.GetProviders(ctx, key)
provs := dht.providerStore.GetProviders(ctx, key)
for _, p := range provs {
// NOTE: Assuming that this list of peers is unique
if ps.TryAdd(p) {
pi := dht.peerstore.PeerInfo(p)
if ps.TryAdd(p.ID) {
pi := dht.peerstore.PeerInfo(p.ID)
select {
case peerOut <- pi:
case peerOut <- peer.AddrInfo{ID: p.ID, Addrs: append(pi.Addrs, p.Addrs...)}: // XXX: dedup addresses?
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
case <-ctx.Done():
return
}
Expand Down