diff --git a/dht.go b/dht.go index 5ac1a3666..5417ce2ba 100644 --- a/dht.go +++ b/dht.go @@ -84,8 +84,8 @@ type IpfsDHT struct { datastore ds.Datastore // Local data routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes - // ProviderManager stores & manages the provider records for this Dht peer. - ProviderManager *providers.ProviderManager + // providerStore stores & manages the provider records for this Dht peer. + providerStore providers.ProviderStore // manages Routing Table refresh rtRefreshManager *rtrefresh.RtRefreshManager @@ -221,7 +221,9 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) } dht.proc.Go(sn.subscribe) // handle providers - dht.proc.AddChild(dht.ProviderManager.Process()) + if mgr, ok := dht.providerStore.(interface{ Process() goprocess.Process }); ok { + dht.proc.AddChild(mgr.Process()) + } // go-routine to make sure we ALWAYS have RT peer addresses in the peerstore // since RT membership is decoupled from connectivity @@ -338,11 +340,14 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err // the DHT context should be done when the process is closed dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc) - pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.Datastore, cfg.ProvidersOptions...) - if err != nil { - return nil, err + if cfg.ProviderStore != nil { + dht.providerStore = cfg.ProviderStore + } else { + dht.providerStore, err = providers.NewProviderManager(dht.ctx, h.ID(), dht.peerstore, cfg.Datastore) + if err != nil { + return nil, fmt.Errorf("initializing default provider manager (%v)", err) + } } - dht.ProviderManager = pm dht.rtFreezeTimeout = rtFreezeTimeout @@ -413,6 +418,11 @@ func makeRoutingTable(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutbound return rt, err } +// ProviderStore returns the provider storage object for storing and retrieving provider records. +func (dht *IpfsDHT) ProviderStore() providers.ProviderStore { + return dht.providerStore +} + // GetRoutingTableDiversityStats returns the diversity stats for the Routing Table. func (dht *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats { return dht.routingTable.GetDiversityStats() diff --git a/dht_options.go b/dht_options.go index 0ef777913..a87f320b6 100644 --- a/dht_options.go +++ b/dht_options.go @@ -36,6 +36,14 @@ const DefaultPrefix protocol.ID = "/ipfs" type Option = dhtcfg.Option +// ProviderStore sets the provider storage manager. +func ProviderStore(ps providers.ProviderStore) Option { + return func(c *dhtcfg.Config) error { + c.ProviderStore = ps + return nil + } +} + // RoutingTableLatencyTolerance sets the maximum acceptable latency for peers // in the routing table's cluster. func RoutingTableLatencyTolerance(latency time.Duration) Option { @@ -236,18 +244,6 @@ func DisableValues() Option { } } -// ProvidersOptions are options passed directly to the provider manager. -// -// The provider manager adds and gets provider records from the datastore, cahing -// them in between. These options are passed to the provider manager allowing -// customisation of things like the GC interval and cache implementation. -func ProvidersOptions(opts []providers.Option) Option { - return func(c *dhtcfg.Config) error { - c.ProvidersOptions = opts - return nil - } -} - // QueryFilter sets a function that approves which peers may be dialed in a query func QueryFilter(filter QueryFilterFunc) Option { return func(c *dhtcfg.Config) error { diff --git a/dht_test.go b/dht_test.go index f43b8f6aa..9962c3754 100644 --- a/dht_test.go +++ b/dht_test.go @@ -592,7 +592,7 @@ func TestLocalProvides(t *testing.T) { for _, c := range testCaseCids { for i := 0; i < 3; i++ { - provs := dhts[i].ProviderManager.GetProviders(ctx, c.Hash()) + provs, _ := dhts[i].ProviderStore().GetProviders(ctx, c.Hash()) if len(provs) > 0 { t.Fatal("shouldnt know this") } @@ -1285,7 +1285,7 @@ func TestClientModeConnect(t *testing.T) { c := testCaseCids[0] p := peer.ID("TestPeer") - a.ProviderManager.AddProvider(ctx, c.Hash(), p) + a.ProviderStore().AddProvider(ctx, c.Hash(), peer.AddrInfo{ID: p}) time.Sleep(time.Millisecond * 5) // just in case... provs, err := b.FindProviders(ctx, c) @@ -1548,7 +1548,7 @@ func TestProvideDisabled(t *testing.T) { if err != routing.ErrNotSupported { t.Fatal("get should have failed on node B") } - provs := dhtB.ProviderManager.GetProviders(ctx, kHash) + provs, _ := dhtB.ProviderStore().GetProviders(ctx, kHash) if len(provs) != 0 { t.Fatal("node B should not have found local providers") } @@ -1564,7 +1564,7 @@ func TestProvideDisabled(t *testing.T) { t.Fatal("node A should not have found providers") } } - provAddrs := dhtA.ProviderManager.GetProviders(ctx, kHash) + provAddrs, _ := dhtA.ProviderStore().GetProviders(ctx, kHash) if len(provAddrs) != 0 { t.Fatal("node A should not have found local providers") } diff --git a/fullrt/dht.go b/fullrt/dht.go index 825eabfab..45747400e 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -139,7 +139,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful ctx, cancel := context.WithCancel(context.Background()) - pm, err := providers.NewProviderManager(ctx, h.ID(), dhtcfg.Datastore) + pm, err := providers.NewProviderManager(ctx, h.ID(), h.Peerstore(), dhtcfg.Datastore) if err != nil { cancel() return nil, err @@ -762,7 +762,7 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e logger.Debugw("providing", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH)) // add self locally - dht.ProviderManager.AddProvider(ctx, keyMH, dht.h.ID()) + dht.ProviderManager.AddProvider(ctx, keyMH, peer.AddrInfo{ID: dht.h.ID()}) if !brdcst { return nil } @@ -1209,13 +1209,15 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash. ps = peer.NewLimitedSet(count) } - provs := dht.ProviderManager.GetProviders(ctx, key) + provs, err := dht.ProviderManager.GetProviders(ctx, key) + if err != nil { + return + } for _, p := range provs { // NOTE: Assuming that this list of peers is unique - if ps.TryAdd(p) { - pi := dht.h.Peerstore().PeerInfo(p) + if ps.TryAdd(p.ID) { select { - case peerOut <- pi: + case peerOut <- p: case <-ctx.Done(): return } diff --git a/handlers.go b/handlers.go index 5160232c0..929f679c0 100644 --- a/handlers.go +++ b/handlers.go @@ -8,7 +8,6 @@ import ( "time" "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/peerstore" pstore "github.com/libp2p/go-libp2p-peerstore" "github.com/gogo/protobuf/proto" @@ -318,13 +317,11 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb. resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) // setup providers - providers := dht.ProviderManager.GetProviders(ctx, key) - - if len(providers) > 0 { - // TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos). - infos := pstore.PeerInfos(dht.peerstore, providers) - resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos) + providers, err := dht.providerStore.GetProviders(ctx, key) + if err != nil { + return nil, err } + resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), providers) // Also send closer peers. closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize) @@ -362,11 +359,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M continue } - if pi.ID != dht.self { // don't add own addrs. - // add the received addresses to our peerstore. - dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL) - } - dht.ProviderManager.AddProvider(ctx, key, p) + dht.providerStore.AddProvider(ctx, key, peer.AddrInfo{ID: p}) } return nil, nil diff --git a/internal/config/config.go b/internal/config/config.go index 75b980e3f..405115d12 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -44,7 +44,7 @@ type Config struct { MaxRecordAge time.Duration EnableProviders bool EnableValues bool - ProvidersOptions []providers.Option + ProviderStore providers.ProviderStore QueryPeerFilter QueryFilterFunc RoutingTable struct { diff --git a/providers/providers_manager.go b/providers/providers_manager.go index 20927d2e8..3bf4b8bc7 100644 --- a/providers/providers_manager.go +++ b/providers/providers_manager.go @@ -8,6 +8,8 @@ import ( "time" "github.com/libp2p/go-libp2p-core/peer" + peerstore "github.com/libp2p/go-libp2p-core/peerstore" + peerstoreImpl "github.com/libp2p/go-libp2p-peerstore" lru "github.com/hashicorp/golang-lru/simplelru" ds "github.com/ipfs/go-datastore" @@ -30,12 +32,20 @@ var lruCacheSize = 256 var batchBufferSize = 256 var log = logging.Logger("providers") +// ProviderStore represents a store that associates peers and their addresses to keys. +type ProviderStore interface { + AddProvider(ctx context.Context, key []byte, prov peer.AddrInfo) error + GetProviders(ctx context.Context, key []byte) ([]peer.AddrInfo, error) +} + // ProviderManager adds and pulls providers out of the datastore, // caching them in between type ProviderManager struct { + self peer.ID // all non channel fields are meant to be accessed only within // the run method cache lru.LRUCache + pstore peerstore.Peerstore dstore *autobatch.Datastore newprovs chan *addProv @@ -45,6 +55,8 @@ type ProviderManager struct { cleanupInterval time.Duration } +var _ ProviderStore = (*ProviderManager)(nil) + // Option is a function that sets a provider manager option. type Option func(*ProviderManager) error @@ -86,10 +98,12 @@ type getProv struct { } // NewProviderManager constructor -func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) { +func NewProviderManager(ctx context.Context, local peer.ID, ps peerstore.Peerstore, dstore ds.Batching, opts ...Option) (*ProviderManager, error) { pm := new(ProviderManager) + pm.self = local pm.getprovs = make(chan *getProv) pm.newprovs = make(chan *addProv) + pm.pstore = ps pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize) cache, err := lru.NewLRU(lruCacheSize, nil) if err != nil { @@ -214,14 +228,19 @@ func (pm *ProviderManager) run(proc goprocess.Process) { } // AddProvider adds a provider -func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.ID) { +func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, provInfo peer.AddrInfo) error { + if provInfo.ID != pm.self { // don't add own addrs. + pm.pstore.AddAddrs(provInfo.ID, provInfo.Addrs, peerstore.ProviderAddrTTL) + } prov := &addProv{ key: k, - val: val, + val: provInfo.ID, } select { case pm.newprovs <- prov: + return nil case <-ctx.Done(): + return ctx.Err() } } @@ -255,21 +274,21 @@ func mkProvKey(k []byte) string { // GetProviders returns the set of providers for the given key. // This method _does not_ copy the set. Do not modify it. -func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID { +func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) ([]peer.AddrInfo, error) { gp := &getProv{ key: k, resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking } select { case <-ctx.Done(): - return nil + return nil, ctx.Err() case pm.getprovs <- gp: } select { case <-ctx.Done(): - return nil + return nil, ctx.Err() case peers := <-gp.resp: - return peers + return peerstoreImpl.PeerInfos(pm.pstore, peers), nil } } diff --git a/providers/providers_manager_test.go b/providers/providers_manager_test.go index 11a9a0e5b..43fcb26aa 100644 --- a/providers/providers_manager_test.go +++ b/providers/providers_manager_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-peerstore/pstoremem" mh "github.com/multiformats/go-multihash" @@ -26,31 +27,31 @@ func TestProviderManager(t *testing.T) { defer cancel() mid := peer.ID("testing") - p, err := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore())) + p, err := NewProviderManager(ctx, mid, pstoremem.NewPeerstore(), dssync.MutexWrap(ds.NewMapDatastore())) if err != nil { t.Fatal(err) } a := u.Hash([]byte("test")) - p.AddProvider(ctx, a, peer.ID("testingprovider")) + p.AddProvider(ctx, a, peer.AddrInfo{ID: peer.ID("testingprovider")}) // Not cached // TODO verify that cache is empty - resp := p.GetProviders(ctx, a) + resp, _ := p.GetProviders(ctx, a) if len(resp) != 1 { t.Fatal("Could not retrieve provider.") } // Cached // TODO verify that cache is populated - resp = p.GetProviders(ctx, a) + resp, _ = p.GetProviders(ctx, a) if len(resp) != 1 { t.Fatal("Could not retrieve provider.") } - p.AddProvider(ctx, a, peer.ID("testingprovider2")) - p.AddProvider(ctx, a, peer.ID("testingprovider3")) + p.AddProvider(ctx, a, peer.AddrInfo{ID: peer.ID("testingprovider2")}) + p.AddProvider(ctx, a, peer.AddrInfo{ID: peer.ID("testingprovider3")}) // TODO verify that cache is already up to date - resp = p.GetProviders(ctx, a) + resp, _ = p.GetProviders(ctx, a) if len(resp) != 3 { t.Fatalf("Should have got 3 providers, got %d", len(resp)) } @@ -67,7 +68,7 @@ func TestProvidersDatastore(t *testing.T) { defer cancel() mid := peer.ID("testing") - p, err := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore())) + p, err := NewProviderManager(ctx, mid, pstoremem.NewPeerstore(), dssync.MutexWrap(ds.NewMapDatastore())) if err != nil { t.Fatal(err) } @@ -78,15 +79,15 @@ func TestProvidersDatastore(t *testing.T) { for i := 0; i < 100; i++ { h := u.Hash([]byte(fmt.Sprint(i))) mhs = append(mhs, h) - p.AddProvider(ctx, h, friend) + p.AddProvider(ctx, h, peer.AddrInfo{ID: friend}) } for _, c := range mhs { - resp := p.GetProviders(ctx, c) + resp, _ := p.GetProviders(ctx, c) if len(resp) != 1 { t.Fatal("Could not retrieve provider.") } - if resp[0] != friend { + if resp[0].ID != friend { t.Fatal("expected provider to be 'friend'") } } @@ -152,7 +153,7 @@ func TestProvidesExpire(t *testing.T) { ds := dssync.MutexWrap(ds.NewMapDatastore()) mid := peer.ID("testing") - p, err := NewProviderManager(ctx, mid, ds) + p, err := NewProviderManager(ctx, mid, pstoremem.NewPeerstore(), ds) if err != nil { t.Fatal(err) } @@ -165,19 +166,19 @@ func TestProvidesExpire(t *testing.T) { } for _, h := range mhs[:5] { - p.AddProvider(ctx, h, peers[0]) - p.AddProvider(ctx, h, peers[1]) + p.AddProvider(ctx, h, peer.AddrInfo{ID: peers[0]}) + p.AddProvider(ctx, h, peer.AddrInfo{ID: peers[1]}) } time.Sleep(time.Second / 4) for _, h := range mhs[5:] { - p.AddProvider(ctx, h, peers[0]) - p.AddProvider(ctx, h, peers[1]) + p.AddProvider(ctx, h, peer.AddrInfo{ID: peers[0]}) + p.AddProvider(ctx, h, peer.AddrInfo{ID: peers[1]}) } for _, h := range mhs { - out := p.GetProviders(ctx, h) + out, _ := p.GetProviders(ctx, h) if len(out) != 2 { t.Fatal("expected providers to still be there") } @@ -186,14 +187,14 @@ func TestProvidesExpire(t *testing.T) { time.Sleep(3 * time.Second / 8) for _, h := range mhs[:5] { - out := p.GetProviders(ctx, h) + out, _ := p.GetProviders(ctx, h) if len(out) > 0 { t.Fatal("expected providers to be cleaned up, got: ", out) } } for _, h := range mhs[5:] { - out := p.GetProviders(ctx, h) + out, _ := p.GetProviders(ctx, h) if len(out) != 2 { t.Fatal("expected providers to still be there") } @@ -260,7 +261,7 @@ func TestLargeProvidersSet(t *testing.T) { } mid := peer.ID("myself") - p, err := NewProviderManager(ctx, mid, dstore) + p, err := NewProviderManager(ctx, mid, pstoremem.NewPeerstore(), dstore) if err != nil { t.Fatal(err) } @@ -271,14 +272,14 @@ func TestLargeProvidersSet(t *testing.T) { h := u.Hash([]byte(fmt.Sprint(i))) mhs = append(mhs, h) for _, pid := range peers { - p.AddProvider(ctx, h, pid) + p.AddProvider(ctx, h, peer.AddrInfo{ID: pid}) } } for i := 0; i < 5; i++ { start := time.Now() for _, h := range mhs { - _ = p.GetProviders(ctx, h) + _, _ = p.GetProviders(ctx, h) } elapsed := time.Since(start) fmt.Printf("query %f ms\n", elapsed.Seconds()*1000) @@ -295,19 +296,19 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) { p1, p2 := peer.ID("a"), peer.ID("b") h1 := u.Hash([]byte("1")) h2 := u.Hash([]byte("2")) - pm, err := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore())) + pm, err := NewProviderManager(ctx, p1, pstoremem.NewPeerstore(), dssync.MutexWrap(ds.NewMapDatastore())) if err != nil { t.Fatal(err) } // add provider - pm.AddProvider(ctx, h1, p1) + pm.AddProvider(ctx, h1, peer.AddrInfo{ID: p1}) // make the cached provider for h1 go to datastore - pm.AddProvider(ctx, h2, p1) + pm.AddProvider(ctx, h2, peer.AddrInfo{ID: p1}) // now just offloaded record should be brought back and joined with p2 - pm.AddProvider(ctx, h1, p2) + pm.AddProvider(ctx, h1, peer.AddrInfo{ID: p2}) - h1Provs := pm.GetProviders(ctx, h1) + h1Provs, _ := pm.GetProviders(ctx, h1) if len(h1Provs) != 2 { t.Fatalf("expected h1 to be provided by 2 peers, is by %d", len(h1Provs)) } @@ -319,19 +320,19 @@ func TestWriteUpdatesCache(t *testing.T) { p1, p2 := peer.ID("a"), peer.ID("b") h1 := u.Hash([]byte("1")) - pm, err := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore())) + pm, err := NewProviderManager(ctx, p1, pstoremem.NewPeerstore(), dssync.MutexWrap(ds.NewMapDatastore())) if err != nil { t.Fatal(err) } // add provider - pm.AddProvider(ctx, h1, p1) + pm.AddProvider(ctx, h1, peer.AddrInfo{ID: p1}) // force into the cache pm.GetProviders(ctx, h1) // add a second provider - pm.AddProvider(ctx, h1, p2) + pm.AddProvider(ctx, h1, peer.AddrInfo{ID: p2}) - c1Provs := pm.GetProviders(ctx, h1) + c1Provs, _ := pm.GetProviders(ctx, h1) if len(c1Provs) != 2 { t.Fatalf("expected h1 to be provided by 2 peers, is by %d", len(c1Provs)) } diff --git a/routing.go b/routing.go index 2ec5bbcdc..a56fc4624 100644 --- a/routing.go +++ b/routing.go @@ -380,7 +380,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err logger.Debugw("providing", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH)) // add self locally - dht.ProviderManager.AddProvider(ctx, keyMH, dht.self) + dht.providerStore.AddProvider(ctx, keyMH, peer.AddrInfo{ID: dht.self}) if !brdcst { return nil } @@ -492,13 +492,15 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash ps = peer.NewLimitedSet(count) } - provs := dht.ProviderManager.GetProviders(ctx, key) + provs, err := dht.providerStore.GetProviders(ctx, key) + if err != nil { + return + } for _, p := range provs { // NOTE: Assuming that this list of peers is unique - if ps.TryAdd(p) { - pi := dht.peerstore.PeerInfo(p) + if ps.TryAdd(p.ID) { select { - case peerOut <- pi: + case peerOut <- p: case <-ctx.Done(): return }