Skip to content

Commit

Permalink
feat: warn users who are falling behind reprovides
Browse files Browse the repository at this point in the history
Fixes: ipfs#9704
Fixes: ipfs#9702
Fixes: ipfs#9703
Fixes: ipfs#9419

Depends on: ipfs/boxo#273
  • Loading branch information
Jorropo committed Jun 1, 2023
1 parent 6eef0b4 commit d0a7e03
Show file tree
Hide file tree
Showing 18 changed files with 205 additions and 191 deletions.
3 changes: 3 additions & 0 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,9 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
case routingOptionNoneKwd:
ncfg.Routing = libp2p.NilRouterOption
case routingOptionCustomKwd:
if cfg.Routing.AcceleratedDHTClient {
return fmt.Errorf("Routing.AcceleratedDHTClient option is set even tho Routing.Type is custom, using custom .AcceleratedDHTClient needs to be set on DHT routers individually")
}
ncfg.Routing = libp2p.ConstructDelegatedRouting(
cfg.Routing.Routers,
cfg.Routing.Methods,
Expand Down
2 changes: 1 addition & 1 deletion config/experiments.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type Experiments struct {
Libp2pStreamMounting bool
P2pHttpProxy bool //nolint
StrategicProviding bool
AcceleratedDHTClient bool
AcceleratedDHTClient experimentalAcceleratedDHTClient `json:",omitempty"`
OptimisticProvide bool
OptimisticProvideJobsPoolSize int
}
2 changes: 2 additions & 0 deletions config/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type Routing struct {
// When "custom" is set, user-provided Routing.Routers is used.
Type *OptionalString `json:",omitempty"`

AcceleratedDHTClient bool

Routers Routers

Methods Methods
Expand Down
24 changes: 24 additions & 0 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,3 +438,27 @@ func (swarmLimits) UnmarshalJSON(b []byte) error {
}
}
}

type experimentalAcceleratedDHTClient struct{}

var _ json.Unmarshaler = experimentalAcceleratedDHTClient{}

func (experimentalAcceleratedDHTClient) UnmarshalJSON(b []byte) error {
d := json.NewDecoder(bytes.NewReader(b))
for {
switch tok, err := d.Token(); err {
case io.EOF:
return nil
case nil:
switch tok {
case json.Delim('{'), json.Delim('}'):
// accept empty objects
continue
}
//nolint
return fmt.Errorf("The Experimental.AcceleratedDHTClient key has been moved to Routing.AcceleratedDHTClient in Kubo 0.21, please use this new key and remove the old one.")
default:
return err
}
}
}
17 changes: 6 additions & 11 deletions core/commands/stat_provide.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"time"

humanize "github.com/dustin/go-humanize"
"github.com/ipfs/boxo/provider"
cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/kubo/core/commands/cmdenv"

"github.com/ipfs/boxo/provider/batched"
"golang.org/x/exp/constraints"
)

var statProvideCmd = &cmds.Command{
Expand All @@ -34,12 +34,7 @@ This interface is not stable and may change from release to release.
return ErrNotOnline
}

sys, ok := nd.Provider.(*batched.BatchProvidingSystem)
if !ok {
return fmt.Errorf("can only return stats if Experimental.AcceleratedDHTClient is enabled")
}

stats, err := sys.Stat(req.Context)
stats, err := nd.Provider.Stat()
if err != nil {
return err
}
Expand All @@ -51,7 +46,7 @@ This interface is not stable and may change from release to release.
return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, s *batched.BatchedProviderStats) error {
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, s *provider.ReproviderStats) error {
wtr := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
defer wtr.Flush()

Expand All @@ -62,14 +57,14 @@ This interface is not stable and may change from release to release.
return nil
}),
},
Type: batched.BatchedProviderStats{},
Type: provider.ReproviderStats{},
}

func humanDuration(val time.Duration) string {
return val.Truncate(time.Microsecond).String()
}

func humanNumber(n int) string {
func humanNumber[T constraints.Float | constraints.Integer](n T) string {
nf := float64(n)
str := humanSI(nf, 0)
fullStr := humanFull(nf, 0)
Expand Down
2 changes: 1 addition & 1 deletion core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e
return nil, fmt.Errorf("error constructing namesys: %w", err)
}

subAPI.provider = provider.NewOfflineProvider()
subAPI.provider = provider.NewNoopProvider()

subAPI.peerstore = nil
subAPI.peerHost = nil
Expand Down
8 changes: 1 addition & 7 deletions core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part
LibP2P(bcfg, cfg, userResourceOverrides),
OnlineProviders(
cfg.Experimental.StrategicProviding,
cfg.Experimental.AcceleratedDHTClient,
cfg.Reprovider.Strategy.WithDefault(config.DefaultReproviderStrategy),
cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval),
),
Expand All @@ -320,12 +319,7 @@ func Offline(cfg *config.Config) fx.Option {
fx.Provide(libp2p.Routing),
fx.Provide(libp2p.ContentRouting),
fx.Provide(libp2p.OfflineRouting),
OfflineProviders(
cfg.Experimental.StrategicProviding,
cfg.Experimental.AcceleratedDHTClient,
cfg.Reprovider.Strategy.WithDefault(config.DefaultReproviderStrategy),
cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval),
),
OfflineProviders(),
)
}

Expand Down
2 changes: 1 addition & 1 deletion core/node/libp2p/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func BaseRouting(cfg *config.Config) interface{} {
}
}

if dualDHT != nil && cfg.Experimental.AcceleratedDHTClient {
if dualDHT != nil && cfg.Routing.AcceleratedDHTClient {
cfg, err := in.Repo.Config()
if err != nil {
return out, err
Expand Down
204 changes: 105 additions & 99 deletions core/node/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,145 +5,151 @@ import (
"fmt"
"time"

"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/fetcher"
pin "github.com/ipfs/boxo/pinning/pinner"
provider "github.com/ipfs/boxo/provider"
"github.com/ipfs/boxo/provider/batched"
q "github.com/ipfs/boxo/provider/queue"
"github.com/ipfs/boxo/provider/simple"
"go.uber.org/fx"

"github.com/ipfs/kubo/core/node/helpers"
"github.com/ipfs/kubo/repo"
irouting "github.com/ipfs/kubo/routing"
"go.uber.org/fx"
)

// SIMPLE

// ProviderQueue creates new datastore backed provider queue
func ProviderQueue(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*q.Queue, error) {
return q.NewQueue(helpers.LifecycleCtx(mctx, lc), "provider-v1", repo.Datastore())
}

// SimpleProvider creates new record provider
func SimpleProvider(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *q.Queue, rt irouting.ProvideManyRouter) provider.Provider {
return simple.NewProvider(helpers.LifecycleCtx(mctx, lc), queue, rt)
}

// SimpleReprovider creates new reprovider
func SimpleReprovider(reproviderInterval time.Duration) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt irouting.ProvideManyRouter, keyProvider simple.KeyChanFunc) (provider.Reprovider, error) {
return simple.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil
}
}

// SimpleProviderSys creates new provider system
func SimpleProviderSys(isOnline bool) interface{} {
return func(lc fx.Lifecycle, p provider.Provider, r provider.Reprovider) provider.System {
sys := provider.NewSystem(p, r)

if isOnline {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
sys.Run()
return nil
},
OnStop: func(ctx context.Context) error {
return sys.Close()
},
})
}

return sys
}
}

// BatchedProviderSys creates new provider system
func BatchedProviderSys(isOnline bool, reprovideInterval time.Duration) interface{} {
return func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, q *q.Queue, keyProvider simple.KeyChanFunc, repo repo.Repo) (provider.System, error) {
sys, err := batched.New(cr, q,
batched.ReproviderInterval(reprovideInterval),
batched.Datastore(repo.Datastore()),
batched.KeyProvider(keyProvider))
func ProviderSys(reprovideInterval time.Duration) fx.Option {
const magicThroughputReportCount = 128
return fx.Provide(func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, keyProvider provider.KeyChanFunc, repo repo.Repo, bs blockstore.Blockstore) (provider.System, error) {
sys, err := provider.New(repo.Datastore(),
provider.Online(cr),
provider.ReproviderInterval(reprovideInterval),
provider.KeyProvider(keyProvider),
provider.ThroughputReport(func(reprovide bool, complete bool, keysProvided uint, duration time.Duration) bool {
avgProvideSpeed := duration / time.Duration(keysProvided)
count := uint64(keysProvided)

if !reprovide || !complete {
// We don't know how many CIDs we have to provide, try to fetch it from the blockstore.
// But don't try for too long as this might be very expensive if you have a huge datastore.
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()

// FIXME: I want a running counter of blocks so size of blockstore can be an O(1) lookup.
ch, err := bs.AllKeysChan(ctx)
if err != nil {
logger.Errorf("fetching AllKeysChain in provider ThroughputReport: %v", err)
return false
}
count = 0
countLoop:
for {
select {
case _, ok := <-ch:
if !ok {
break countLoop
}
count++
case <-ctx.Done():
// really big blockstore mode

// how many blocks would be in a 10TiB blockstore with 128KiB blocks.
const probableBigBlockstore = (10 * 1024 * 1024 * 1024 * 1024) / (128 * 1024)
// How long per block that lasts us.
expectedProvideSpeed := reprovideInterval / probableBigBlockstore
if avgProvideSpeed > expectedProvideSpeed {
logger.Errorf(`
🔔🔔🔔 YOU MAY BE FALLING BEHIND DHT REPROVIDES! 🔔🔔🔔
⚠️ Your system might be struggling to keep up with DHT reprovides!
Meaning your content being partially or completely inacessible on the network.
We observed that you recently provided %d keys at an average rate of %v per key.
🕑 An attempt to estimate your blockstore size timed out after 5 minutes,
implying your blockstore might be exceedingly large. Assuming a considerable
size of 10TiB, it would take %v to provide the complete set.
⏰ The total provide time needs to stay under 24 hours to prevent falling behind!
💡 Consider enabling the Accelerated DHT to enhance your system performance. See:
https://github.com/ipfs/kubo/blob/master/docs/config.md#routingaccelerateddhtclient`,
keysProvided, avgProvideSpeed, avgProvideSpeed*probableBigBlockstore)
return false
}
}
}
}

// How long per block that lasts us.
expectedProvideSpeed := reprovideInterval / time.Duration(count)
if avgProvideSpeed > expectedProvideSpeed {
// FIXME(@Jorropo): add link to the accelerated DHT client docs once this isn't experimental anymore.
logger.Errorf(`
🔔🔔🔔 YOU ARE FALLING BEHIND DHT REPROVIDES! 🔔🔔🔔
⚠️ Your system is struggling to keep up with DHT reprovides!
Meaning your content being partially or completely inacessible on the network.
We observed that you recently provided %d keys at an average rate of %v per key.
💾 Your total CID count is ~%d which would total at %v reprovide process.
⏰ The total provide time needs to stay under 24 hours to prevent falling behind!
💡 Consider enabling the Accelerated DHT to enhance your reprovide throughput. See:
https://github.com/ipfs/kubo/blob/master/docs/config.md#routingaccelerateddhtclient`,
keysProvided, avgProvideSpeed, count, avgProvideSpeed*time.Duration(count))
}
return false
}, magicThroughputReportCount),
)
if err != nil {
return nil, err
}

if isOnline {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
sys.Run()
return nil
},
OnStop: func(ctx context.Context) error {
return sys.Close()
},
})
}
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return sys.Close()
},
})

return sys, nil
}
})
}

// ONLINE/OFFLINE

// OnlineProviders groups units managing provider routing records online
func OnlineProviders(useStrategicProviding bool, useBatchedProviding bool, reprovideStrategy string, reprovideInterval time.Duration) fx.Option {
if useStrategicProviding {
return fx.Provide(provider.NewOfflineProvider)
}

return fx.Options(
SimpleProviders(reprovideStrategy, reprovideInterval),
maybeProvide(SimpleProviderSys(true), !useBatchedProviding),
maybeProvide(BatchedProviderSys(true, reprovideInterval), useBatchedProviding),
)
}

// OfflineProviders groups units managing provider routing records offline
func OfflineProviders(useStrategicProviding bool, useBatchedProviding bool, reprovideStrategy string, reprovideInterval time.Duration) fx.Option {
func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval time.Duration) fx.Option {
if useStrategicProviding {
return fx.Provide(provider.NewOfflineProvider)
return OfflineProviders()
}

return fx.Options(
SimpleProviders(reprovideStrategy, reprovideInterval),
maybeProvide(SimpleProviderSys(false), true),
//maybeProvide(BatchedProviderSys(false, reprovideInterval), useBatchedProviding),
)
}

// SimpleProviders creates the simple provider/reprovider dependencies
func SimpleProviders(reprovideStrategy string, reproviderInterval time.Duration) fx.Option {
var keyProvider fx.Option
switch reprovideStrategy {
case "all":
fallthrough
case "":
keyProvider = fx.Provide(simple.NewBlockstoreProvider)
case "all", "":
keyProvider = fx.Provide(provider.NewBlockstoreProvider)
case "roots":
keyProvider = fx.Provide(pinnedProviderStrategy(true))
case "pinned":
keyProvider = fx.Provide(pinnedProviderStrategy(false))
default:
return fx.Error(fmt.Errorf("unknown reprovider strategy '%s'", reprovideStrategy))
return fx.Error(fmt.Errorf("unknown reprovider strategy %q", reprovideStrategy))
}

return fx.Options(
fx.Provide(ProviderQueue),
fx.Provide(SimpleProvider),
keyProvider,
fx.Provide(SimpleReprovider(reproviderInterval)),
ProviderSys(reprovideInterval),
)
}

// OfflineProviders groups units managing provider routing records offline
func OfflineProviders() fx.Option {
return fx.Provide(provider.NewNoopProvider)
}

func pinnedProviderStrategy(onlyRoots bool) interface{} {
type input struct {
fx.In
Pinner pin.Pinner
IPLDFetcher fetcher.Factory `name:"ipldFetcher"`
}
return func(in input) simple.KeyChanFunc {
return simple.NewPinnedProvider(onlyRoots, in.Pinner, in.IPLDFetcher)
return func(in input) provider.KeyChanFunc {
return provider.NewPinnedProvider(onlyRoots, in.Pinner, in.IPLDFetcher)
}
}
Loading

0 comments on commit d0a7e03

Please sign in to comment.