From b95bba8ddd70f68c6eb9df4faaff09695098870b Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Mon, 6 Mar 2023 11:02:22 +0000 Subject: [PATCH] Allow DHT crawler to be swappable The accelerated DHT client uses a fixed implementation of DHT crawler with a parallelism hardcoded to 200. This offers a reasonable general case default behaviour but limits experimentation with alternative crawling strategies with varied or dynamic degree of parallelism. To avoid this the change here: * refactor the existing `Crawler` to `DefaultCrawler` * introduce `Crawler` as interface type, and * add option to set the `Crawler` in the accelerated DHT client with fallback to `DefaultCrawler` at parallelism of 200. As a result the net functionality of the code remains unchanged while it allows the crawling logic to be swapped entirely with an alternative implementation. Additionally, the changes here parameterize the dial addr extend duration with fallback to the previously hardcoded value of 30 minutes. --- crawler/crawler.go | 55 +++++++++++++++++++++++++++------------------- crawler/options.go | 20 +++++++++++++---- fullrt/dht.go | 12 +++++----- fullrt/options.go | 11 ++++++++++ 4 files changed, 67 insertions(+), 31 deletions(-) diff --git a/crawler/crawler.go b/crawler/crawler.go index 451b21543..965f1df2f 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -18,18 +18,30 @@ import ( kbucket "github.com/libp2p/go-libp2p-kbucket" ) -var logger = logging.Logger("dht-crawler") - -// Crawler connects to hosts in the DHT to track routing tables of peers. -type Crawler struct { - parallelism int - connectTimeout time.Duration - host host.Host - dhtRPC *pb.ProtocolMessenger -} +var ( + logger = logging.Logger("dht-crawler") + + _ Crawler = (*DefaultCrawler)(nil) +) + +type ( + // Crawler connects to hosts in the DHT to track routing tables of peers. + Crawler interface { + // Run crawls the DHT starting from the startingPeers, and calls either handleSuccess or handleFail depending on whether a peer was successfully contacted or not. + Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) + } + // DefaultCrawler provides a default implementation of Crawler. + DefaultCrawler struct { + parallelism int + connectTimeout time.Duration + host host.Host + dhtRPC *pb.ProtocolMessenger + dialAddressExtendDur time.Duration + } +) -// New creates a new Crawler -func New(host host.Host, opts ...Option) (*Crawler, error) { +// NewDefaultCrawler creates a new DefaultCrawler +func NewDefaultCrawler(host host.Host, opts ...Option) (*DefaultCrawler, error) { o := new(options) if err := defaults(o); err != nil { return nil, err @@ -45,11 +57,12 @@ func New(host host.Host, opts ...Option) (*Crawler, error) { return nil, err } - return &Crawler{ - parallelism: o.parallelism, - connectTimeout: o.connectTimeout, - host: host, - dhtRPC: pm, + return &DefaultCrawler{ + parallelism: o.parallelism, + connectTimeout: o.connectTimeout, + host: host, + dhtRPC: pm, + dialAddressExtendDur: o.dialAddressExtendDur, }, nil } @@ -120,10 +133,8 @@ type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo) // HandleQueryFail is a callback on failed peer query type HandleQueryFail func(p peer.ID, err error) -const dialAddressExtendDur time.Duration = time.Minute * 30 - // Run crawls dht peers from an initial seed of `startingPeers` -func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) { +func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) { jobs := make(chan peer.ID, 1) results := make(chan *queryResult, 1) @@ -151,7 +162,7 @@ func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handl extendAddrs := c.host.Peerstore().Addrs(ai.ID) if len(ai.Addrs) > 0 { extendAddrs = append(extendAddrs, ai.Addrs...) - c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, dialAddressExtendDur) + c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, c.dialAddressExtendDur) } if len(extendAddrs) == 0 { numSkipped++ @@ -183,7 +194,7 @@ func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handl logger.Debugf("peer %v had %d peers", res.peer, len(res.data)) rtPeers := make([]*peer.AddrInfo, 0, len(res.data)) for p, ai := range res.data { - c.host.Peerstore().AddAddrs(p, ai.Addrs, dialAddressExtendDur) + c.host.Peerstore().AddAddrs(p, ai.Addrs, c.dialAddressExtendDur) if _, ok := peersSeen[p]; !ok { peersSeen[p] = struct{}{} toDial = append(toDial, ai) @@ -212,7 +223,7 @@ type queryResult struct { err error } -func (c *Crawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult { +func (c *DefaultCrawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult { tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer), time.Hour, c.host.Peerstore(), time.Hour, nil) if err != nil { logger.Errorf("error creating rt for peer %v : %v", nextPeer, err) diff --git a/crawler/options.go b/crawler/options.go index ee26f1c03..8c7602fb0 100644 --- a/crawler/options.go +++ b/crawler/options.go @@ -10,10 +10,11 @@ import ( type Option func(*options) error type options struct { - protocols []protocol.ID - parallelism int - connectTimeout time.Duration - perMsgTimeout time.Duration + protocols []protocol.ID + parallelism int + connectTimeout time.Duration + perMsgTimeout time.Duration + dialAddressExtendDur time.Duration } // defaults are the default crawler options. This option will be automatically @@ -23,6 +24,7 @@ var defaults = func(o *options) error { o.parallelism = 1000 o.connectTimeout = time.Second * 5 o.perMsgTimeout = time.Second * 5 + o.dialAddressExtendDur = time.Minute * 30 return nil } @@ -58,3 +60,13 @@ func WithConnectTimeout(timeout time.Duration) Option { return nil } } + +// WithDialAddrExtendDuration sets the duration by which the TTL of dialed address in peer store are +// extended. +// Defaults to 30 minutes if unset. +func WithDialAddrExtendDuration(ext time.Duration) Option { + return func(o *options) error { + o.dialAddressExtendDur = ext + return nil + } +} diff --git a/fullrt/dht.go b/fullrt/dht.go index b0fbb2cf4..003fdd1d2 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -71,7 +71,7 @@ type FullRT struct { crawlerInterval time.Duration lastCrawlTime time.Time - crawler *crawler.Crawler + crawler crawler.Crawler protoMessenger *dht_pb.ProtocolMessenger messageSender dht_pb.MessageSender @@ -141,9 +141,11 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful return nil, err } - c, err := crawler.New(h, crawler.WithParallelism(200)) - if err != nil { - return nil, err + if fullrtcfg.crawler == nil { + fullrtcfg.crawler, err = crawler.NewDefaultCrawler(h, crawler.WithParallelism(200)) + if err != nil { + return nil, err + } } ctx, cancel := context.WithCancel(context.Background()) @@ -171,7 +173,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful ProviderManager: pm, datastore: dhtcfg.Datastore, h: h, - crawler: c, + crawler: fullrtcfg.crawler, messageSender: ms, protoMessenger: protoMessenger, filterFromTable: kaddht.PublicQueryFilter, diff --git a/fullrt/options.go b/fullrt/options.go index 5bcc247ee..a469548a5 100644 --- a/fullrt/options.go +++ b/fullrt/options.go @@ -5,6 +5,7 @@ import ( "time" kaddht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p-kad-dht/crawler" ) type config struct { @@ -14,6 +15,7 @@ type config struct { waitFrac float64 bulkSendParallelism int timeoutPerOp time.Duration + crawler crawler.Crawler } func (cfg *config) apply(opts ...Option) error { @@ -34,6 +36,15 @@ func DHTOption(opts ...kaddht.Option) Option { } } +// WithCrawler sets the crawler.Crawler to use in order to crawl the DHT network. +// Defaults to crawler.DefaultCrawler with parallelism of 200. +func WithCrawler(c crawler.Crawler) Option { + return func(opt *config) error { + opt.crawler = c + return nil + } +} + // WithCrawlInterval sets the interval at which the DHT is crawled to refresh peer store. // Defaults to 1 hour if unspecified. func WithCrawlInterval(i time.Duration) Option {