diff --git a/v2/config.go b/v2/config.go index b7314ecf..a74964e0 100644 --- a/v2/config.go +++ b/v2/config.go @@ -395,16 +395,21 @@ type QueryConfig struct { // operation. A DefaultQuorum of 0 means that we search the network until // we have exhausted the keyspace. DefaultQuorum int + + // SkipConnectivityCheck defines whether we do a connectivity check before + // we add peers to the routing table. + SkipConnectivityCheck bool } // DefaultQueryConfig returns the default query configuration options for a DHT. func DefaultQueryConfig() *QueryConfig { return &QueryConfig{ - Concurrency: 3, // MAGIC - Timeout: 5 * time.Minute, // MAGIC - RequestConcurrency: 3, // MAGIC - RequestTimeout: time.Minute, // MAGIC - DefaultQuorum: 0, // MAGIC + Concurrency: 3, // MAGIC + Timeout: 5 * time.Minute, // MAGIC + RequestConcurrency: 3, // MAGIC + RequestTimeout: time.Minute, // MAGIC + DefaultQuorum: 0, // MAGIC + SkipConnectivityCheck: false, } } diff --git a/v2/dht.go b/v2/dht.go index 86e3b97f..96dea3a8 100644 --- a/v2/dht.go +++ b/v2/dht.go @@ -128,6 +128,7 @@ func New(h host.Host, cfg *Config) (*DHT, error) { coordCfg.Routing.Logger = cfg.Logger.With("behaviour", "routing") coordCfg.Routing.Tracer = cfg.TracerProvider.Tracer(tele.TracerName) coordCfg.Routing.Meter = cfg.MeterProvider.Meter(tele.MeterName) + coordCfg.Routing.IncludeSkipCheck = cfg.Query.SkipConnectivityCheck rtr := &router{ host: h, diff --git a/v2/internal/coord/routing.go b/v2/internal/coord/routing.go index bf019752..6d85e29f 100644 --- a/v2/internal/coord/routing.go +++ b/v2/internal/coord/routing.go @@ -62,6 +62,9 @@ type RoutingConfig struct { // ProbeCheckInterval is the time interval the behaviour should use between connectivity checks for the same node in the routing table. ProbeCheckInterval time.Duration + // IncludeSkipCheck indicates whether we perform connectivity checks before we add a peer to the routing table. + IncludeSkipCheck bool + // IncludeQueueCapacity is the maximum number of nodes the behaviour should keep queued as candidates for inclusion in the routing table. IncludeQueueCapacity int @@ -268,6 +271,7 @@ func DefaultRoutingConfig() *RoutingConfig { ProbeRequestConcurrency: 3, // MAGIC ProbeCheckInterval: 6 * time.Hour, // MAGIC + IncludeSkipCheck: false, IncludeRequestConcurrency: 3, // MAGIC IncludeQueueCapacity: 128, // MAGIC @@ -307,6 +311,22 @@ type RoutingBehaviour struct { ready chan struct{} } +type Recording2SM[E any, S any] struct { + State S + Received E +} + +func NewRecording2SM[E any, S any](response S) *Recording2SM[E, S] { + return &Recording2SM[E, S]{ + State: response, + } +} + +func (r *Recording2SM[E, S]) Advance(ctx context.Context, e E) S { + r.Received = e + return r.State +} + func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key, kadt.PeerID], cfg *RoutingConfig) (*RoutingBehaviour, error) { if cfg == nil { cfg = DefaultRoutingConfig() @@ -331,6 +351,7 @@ func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key, includeCfg.Clock = cfg.Clock includeCfg.Tracer = cfg.Tracer includeCfg.Meter = cfg.Meter + includeCfg.SkipCheck = cfg.IncludeSkipCheck includeCfg.Timeout = cfg.ConnectivityCheckTimeout includeCfg.QueueCapacity = cfg.IncludeQueueCapacity includeCfg.Concurrency = cfg.IncludeRequestConcurrency diff --git a/v2/internal/coord/routing/include.go b/v2/internal/coord/routing/include.go index 4aad5383..2fdca7de 100644 --- a/v2/internal/coord/routing/include.go +++ b/v2/internal/coord/routing/include.go @@ -61,12 +61,9 @@ type IncludeConfig struct { Concurrency int // the maximum number of include checks that may be in progress at any one time Timeout time.Duration // the time to wait before terminating a check that is not making progress Clock clock.Clock // a clock that may replaced by a mock when testing - - // Tracer is the tracer that should be used to trace execution. - Tracer trace.Tracer - - // Meter is the meter that should be used to record metrics. - Meter metric.Meter + SkipCheck bool // whether to skip connectivity checks and add any node passed to this state machine + Tracer trace.Tracer // Tracer is the tracer that should be used to trace execution. + Meter metric.Meter // Meter is the meter that should be used to record metrics. } // Validate checks the configuration options and returns an error if any have invalid values. @@ -124,6 +121,7 @@ func DefaultIncludeConfig() *IncludeConfig { Tracer: tele.NoopTracer(), Meter: tele.NoopMeter(), + SkipCheck: false, Concurrency: 3, Timeout: time.Minute, QueueCapacity: 128, @@ -209,6 +207,15 @@ func (in *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) (out Incl switch tev := ev.(type) { case *EventIncludeAddCandidate[K, N]: + + if in.cfg.SkipCheck { + if in.rt.AddNode(tev.NodeID) { + return &StateIncludeRoutingUpdated[K, N]{NodeID: tev.NodeID} + } else { + return &StateIncludeIdle{} + } + } + // Ignore if already running a check _, checking := in.checks[key.HexString(tev.NodeID.Key())] if checking { diff --git a/v2/routing_test.go b/v2/routing_test.go index 50d77895..1576a9d0 100644 --- a/v2/routing_test.go +++ b/v2/routing_test.go @@ -682,6 +682,8 @@ func (suite *SearchValueQuorumTestSuite) SetupTest() { cfg := DefaultConfig() cfg.Clock = clk + cfg.Query.SkipConnectivityCheck = true + top := NewTopology(t) // init privileged DHT server