Skip to content

Commit

Permalink
Allow DHT crawler to be swappable
Browse files Browse the repository at this point in the history
The accelerated DHT client uses a fixed implementation of DHT crawler
with a parallelism hardcoded to 200. This offers a reasonable general
case default behaviour but limits experimentation with alternative
crawling strategies with varied or dynamic degree of parallelism.

To avoid this the change here:
 * refactor the existing `Crawler` to `DefaultCrawler`
 * introduce `Crawler` as interface type, and
 * add option to set the `Crawler` in the accelerated DHT client with
  fallback to `DefaultCrawler` at parallelism of 200.

As a result the net functionality of the code remains unchanged while it
allows the crawling logic to be swapped entirely with an alternative
implementation.

Additionally, the changes here parameterize the dial addr extend
duration with fallback to the previously hardcoded value of 30 minutes.
  • Loading branch information
masih committed Mar 6, 2023
1 parent 26f6e00 commit b95bba8
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 31 deletions.
55 changes: 33 additions & 22 deletions crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,30 @@ import (
kbucket "github.com/libp2p/go-libp2p-kbucket"
)

var logger = logging.Logger("dht-crawler")

// Crawler connects to hosts in the DHT to track routing tables of peers.
type Crawler struct {
parallelism int
connectTimeout time.Duration
host host.Host
dhtRPC *pb.ProtocolMessenger
}
var (
logger = logging.Logger("dht-crawler")

_ Crawler = (*DefaultCrawler)(nil)
)

type (
// Crawler connects to hosts in the DHT to track routing tables of peers.
Crawler interface {
// Run crawls the DHT starting from the startingPeers, and calls either handleSuccess or handleFail depending on whether a peer was successfully contacted or not.
Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail)
}
// DefaultCrawler provides a default implementation of Crawler.
DefaultCrawler struct {
parallelism int
connectTimeout time.Duration
host host.Host
dhtRPC *pb.ProtocolMessenger
dialAddressExtendDur time.Duration
}
)

// New creates a new Crawler
func New(host host.Host, opts ...Option) (*Crawler, error) {
// NewDefaultCrawler creates a new DefaultCrawler
func NewDefaultCrawler(host host.Host, opts ...Option) (*DefaultCrawler, error) {
o := new(options)
if err := defaults(o); err != nil {
return nil, err
Expand All @@ -45,11 +57,12 @@ func New(host host.Host, opts ...Option) (*Crawler, error) {
return nil, err
}

return &Crawler{
parallelism: o.parallelism,
connectTimeout: o.connectTimeout,
host: host,
dhtRPC: pm,
return &DefaultCrawler{
parallelism: o.parallelism,
connectTimeout: o.connectTimeout,
host: host,
dhtRPC: pm,
dialAddressExtendDur: o.dialAddressExtendDur,
}, nil
}

Expand Down Expand Up @@ -120,10 +133,8 @@ type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo)
// HandleQueryFail is a callback on failed peer query
type HandleQueryFail func(p peer.ID, err error)

const dialAddressExtendDur time.Duration = time.Minute * 30

// Run crawls dht peers from an initial seed of `startingPeers`
func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) {
func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) {
jobs := make(chan peer.ID, 1)
results := make(chan *queryResult, 1)

Expand Down Expand Up @@ -151,7 +162,7 @@ func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handl
extendAddrs := c.host.Peerstore().Addrs(ai.ID)
if len(ai.Addrs) > 0 {
extendAddrs = append(extendAddrs, ai.Addrs...)
c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, dialAddressExtendDur)
c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, c.dialAddressExtendDur)
}
if len(extendAddrs) == 0 {
numSkipped++
Expand Down Expand Up @@ -183,7 +194,7 @@ func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handl
logger.Debugf("peer %v had %d peers", res.peer, len(res.data))
rtPeers := make([]*peer.AddrInfo, 0, len(res.data))
for p, ai := range res.data {
c.host.Peerstore().AddAddrs(p, ai.Addrs, dialAddressExtendDur)
c.host.Peerstore().AddAddrs(p, ai.Addrs, c.dialAddressExtendDur)
if _, ok := peersSeen[p]; !ok {
peersSeen[p] = struct{}{}
toDial = append(toDial, ai)
Expand Down Expand Up @@ -212,7 +223,7 @@ type queryResult struct {
err error
}

func (c *Crawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult {
func (c *DefaultCrawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult {
tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer), time.Hour, c.host.Peerstore(), time.Hour, nil)
if err != nil {
logger.Errorf("error creating rt for peer %v : %v", nextPeer, err)
Expand Down
20 changes: 16 additions & 4 deletions crawler/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
type Option func(*options) error

type options struct {
protocols []protocol.ID
parallelism int
connectTimeout time.Duration
perMsgTimeout time.Duration
protocols []protocol.ID
parallelism int
connectTimeout time.Duration
perMsgTimeout time.Duration
dialAddressExtendDur time.Duration
}

// defaults are the default crawler options. This option will be automatically
Expand All @@ -23,6 +24,7 @@ var defaults = func(o *options) error {
o.parallelism = 1000
o.connectTimeout = time.Second * 5
o.perMsgTimeout = time.Second * 5
o.dialAddressExtendDur = time.Minute * 30

return nil
}
Expand Down Expand Up @@ -58,3 +60,13 @@ func WithConnectTimeout(timeout time.Duration) Option {
return nil
}
}

// WithDialAddrExtendDuration sets the duration by which the TTL of dialed address in peer store are
// extended.
// Defaults to 30 minutes if unset.
func WithDialAddrExtendDuration(ext time.Duration) Option {
return func(o *options) error {
o.dialAddressExtendDur = ext
return nil
}
}
12 changes: 7 additions & 5 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type FullRT struct {
crawlerInterval time.Duration
lastCrawlTime time.Time

crawler *crawler.Crawler
crawler crawler.Crawler
protoMessenger *dht_pb.ProtocolMessenger
messageSender dht_pb.MessageSender

Expand Down Expand Up @@ -141,9 +141,11 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
return nil, err
}

c, err := crawler.New(h, crawler.WithParallelism(200))
if err != nil {
return nil, err
if fullrtcfg.crawler == nil {
fullrtcfg.crawler, err = crawler.NewDefaultCrawler(h, crawler.WithParallelism(200))
if err != nil {
return nil, err
}
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -171,7 +173,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
ProviderManager: pm,
datastore: dhtcfg.Datastore,
h: h,
crawler: c,
crawler: fullrtcfg.crawler,
messageSender: ms,
protoMessenger: protoMessenger,
filterFromTable: kaddht.PublicQueryFilter,
Expand Down
11 changes: 11 additions & 0 deletions fullrt/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

kaddht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/crawler"
)

type config struct {
Expand All @@ -14,6 +15,7 @@ type config struct {
waitFrac float64
bulkSendParallelism int
timeoutPerOp time.Duration
crawler crawler.Crawler
}

func (cfg *config) apply(opts ...Option) error {
Expand All @@ -34,6 +36,15 @@ func DHTOption(opts ...kaddht.Option) Option {
}
}

// WithCrawler sets the crawler.Crawler to use in order to crawl the DHT network.
// Defaults to crawler.DefaultCrawler with parallelism of 200.
func WithCrawler(c crawler.Crawler) Option {
return func(opt *config) error {
opt.crawler = c
return nil
}
}

// WithCrawlInterval sets the interval at which the DHT is crawled to refresh peer store.
// Defaults to 1 hour if unspecified.
func WithCrawlInterval(i time.Duration) Option {
Expand Down

0 comments on commit b95bba8

Please sign in to comment.