From e0d53412970e54ff5dbee9a16aa35b8aec9e2ecf Mon Sep 17 00:00:00 2001 From: Jorropo Date: Wed, 27 Dec 2023 20:03:08 +0100 Subject: [PATCH] bitswap/server: remove provide We always had a very weird relationship between bitswap and providing. Bitswap took care of doing the initial provide and then reprovider did it later. The Bitswap server had a complicated providing workflow where it slurped thing into memory. Reprovide accepts provides and is able to queue them in a database, such as on disk, this is much better. I'll add options to hook initial provide logic from the blockservice to the reprovider queue so consumers don't have to do this themselves. --- CHANGELOG.md | 1 + bitswap/bitswap.go | 12 +- bitswap/bitswap_test.go | 2 +- bitswap/client/bitswap_with_sessions_test.go | 4 + bitswap/internal/defaults/defaults.go | 5 - bitswap/options.go | 4 - bitswap/server/server.go | 166 +------------------ 7 files changed, 16 insertions(+), 178 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c9e39536..ad9097666 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ The following emojis are used to highlight certain changes: - 🛠 `boxo/gateway`: when making a trustless CAR request with the "entity-bytes" parameter, using a negative index greater than the underlying entity length could trigger reading more data than intended - 🛠 `boxo/gateway`: the header configuration `Config.Headers` and `AddAccessControlHeaders` has been replaced by the new middleware provided by `NewHeaders`. +- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany. ### Security diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 393ab96ad..90c8690b7 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/ipfs/boxo/bitswap/client" - "github.com/ipfs/boxo/bitswap/internal/defaults" "github.com/ipfs/boxo/bitswap/message" "github.com/ipfs/boxo/bitswap/network" "github.com/ipfs/boxo/bitswap/server" @@ -45,9 +44,8 @@ type bitswap interface { } var ( - _ exchange.SessionExchange = (*Bitswap)(nil) - _ bitswap = (*Bitswap)(nil) - HasBlockBufferSize = defaults.HasBlockBufferSize + _ exchange.SessionExchange = (*Bitswap)(nil) + _ bitswap = (*Bitswap)(nil) ) type Bitswap struct { @@ -85,10 +83,6 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc serverOptions = append(serverOptions, server.WithTracer(tracer)) } - if HasBlockBufferSize != defaults.HasBlockBufferSize { - serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize)) - } - ctx = metrics.CtxSubScope(ctx, "bitswap") bs.Server = server.New(ctx, net, bstore, serverOptions...) @@ -115,7 +109,6 @@ type Stat struct { MessagesReceived uint64 BlocksSent uint64 DataSent uint64 - ProvideBufLen int } func (bs *Bitswap) Stat() (*Stat, error) { @@ -138,7 +131,6 @@ func (bs *Bitswap) Stat() (*Stat, error) { Peers: ss.Peers, BlocksSent: ss.BlocksSent, DataSent: ss.DataSent, - ProvideBufLen: ss.ProvideBufLen, }, nil } diff --git a/bitswap/bitswap_test.go b/bitswap/bitswap_test.go index 505871d6e..7d2b1f924 100644 --- a/bitswap/bitswap_test.go +++ b/bitswap/bitswap_test.go @@ -120,7 +120,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) - bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)} + bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)} ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts) defer ig.Close() diff --git a/bitswap/client/bitswap_with_sessions_test.go b/bitswap/client/bitswap_with_sessions_test.go index 0baede658..a3174d0a4 100644 --- a/bitswap/client/bitswap_with_sessions_test.go +++ b/bitswap/client/bitswap_with_sessions_test.go @@ -37,6 +37,10 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk if err != nil { t.Fatal(err) } + err = inst.Adapter.Provide(ctx, blk.Cid()) + if err != nil { + t.Fatal(err) + } } func TestBasicSessions(t *testing.T) { diff --git a/bitswap/internal/defaults/defaults.go b/bitswap/internal/defaults/defaults.go index f5511cc7a..a06f5c8b9 100644 --- a/bitswap/internal/defaults/defaults.go +++ b/bitswap/internal/defaults/defaults.go @@ -20,11 +20,6 @@ const ( BitswapMaxOutstandingBytesPerPeer = 1 << 20 // the number of bytes we attempt to make each outgoing bitswap message BitswapEngineTargetMessageSize = 16 * 1024 - // HasBlockBufferSize is the buffer size of the channel for new blocks - // that need to be provided. They should get pulled over by the - // provideCollector even before they are actually provided. - // TODO: Does this need to be this large givent that? - HasBlockBufferSize = 256 // Maximum size of the wantlist we are willing to keep in memory. MaxQueuedWantlistEntiresPerPeer = 1024 diff --git a/bitswap/options.go b/bitswap/options.go index da759dfe2..9bea0b637 100644 --- a/bitswap/options.go +++ b/bitswap/options.go @@ -43,10 +43,6 @@ func TaskWorkerCount(count int) Option { return Option{server.TaskWorkerCount(count)} } -func ProvideEnabled(enabled bool) Option { - return Option{server.ProvideEnabled(enabled)} -} - func SetSendDontHaves(send bool) Option { return Option{server.SetSendDontHaves(send)} } diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 7feffd093..913fb83fb 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -21,20 +21,15 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-metrics-interface" process "github.com/jbenet/goprocess" - procctx "github.com/jbenet/goprocess/context" "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/zap" ) -var provideKeysBufferSize = 2048 - var ( log = logging.Logger("bitswap-server") sflog = log.Desugar() ) -const provideWorkerMax = 6 - type Option func(*Server) type Server struct { @@ -59,20 +54,8 @@ type Server struct { process process.Process - // newBlocks is a channel for newly added blocks to be provided to the - // network. blocks pushed down this channel get buffered and fed to the - // provideKeys channel later on to avoid too much network activity - newBlocks chan cid.Cid - // provideKeys directly feeds provide workers - provideKeys chan cid.Cid - // Extra options to pass to the decision manager engineOptions []decision.Option - - // the size of channel buffer to use - hasBlockBufferSize int - // whether or not to make provide announcements - provideEnabled bool } func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Server { @@ -87,16 +70,12 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl }() s := &Server{ - sentHistogram: bmetrics.SentHist(ctx), - sendTimeHistogram: bmetrics.SendTimeHist(ctx), - taskWorkerCount: defaults.BitswapTaskWorkerCount, - network: network, - process: px, - provideEnabled: true, - hasBlockBufferSize: defaults.HasBlockBufferSize, - provideKeys: make(chan cid.Cid, provideKeysBufferSize), + sentHistogram: bmetrics.SentHist(ctx), + sendTimeHistogram: bmetrics.SendTimeHist(ctx), + taskWorkerCount: defaults.BitswapTaskWorkerCount, + network: network, + process: px, } - s.newBlocks = make(chan cid.Cid, s.hasBlockBufferSize) for _, o := range options { o(s) @@ -131,13 +110,6 @@ func WithTracer(tap tracer.Tracer) Option { } } -// ProvideEnabled is an option for enabling/disabling provide announcements -func ProvideEnabled(enabled bool) Option { - return func(bs *Server) { - bs.provideEnabled = enabled - } -} - func WithPeerBlockRequestFilter(pbrf decision.PeerBlockRequestFilter) Option { o := decision.WithPeerBlockRequestFilter(pbrf) return func(bs *Server) { @@ -233,16 +205,6 @@ func MaxCidSize(n uint) Option { } } -// HasBlockBufferSize configure how big the new blocks buffer should be. -func HasBlockBufferSize(count int) Option { - if count < 0 { - panic("cannot have negative buffer size") - } - return func(bs *Server) { - bs.hasBlockBufferSize = count - } -} - // WantlistForPeer returns the currently understood list of blocks requested by a // given peer. func (bs *Server) WantlistForPeer(p peer.ID) []cid.Cid { @@ -263,18 +225,6 @@ func (bs *Server) startWorkers(ctx context.Context, px process.Process) { bs.taskWorker(ctx, i) }) } - - if bs.provideEnabled { - // Start up a worker to manage sending out provides messages - px.Go(func(px process.Process) { - bs.provideCollector(ctx) - }) - - // Spawn up multiple workers to handle incoming blocks - // consider increasing number if providing blocks bottlenecks - // file transfers - px.Go(bs.provideWorker) - } } func (bs *Server) taskWorker(ctx context.Context, id int) { @@ -382,10 +332,9 @@ func (bs *Server) sendBlocks(ctx context.Context, env *decision.Envelope) { } type Stat struct { - Peers []string - ProvideBufLen int - BlocksSent uint64 - DataSent uint64 + Peers []string + BlocksSent uint64 + DataSent uint64 } // Stat returns aggregated statistics about bitswap operations @@ -393,7 +342,6 @@ func (bs *Server) Stat() (Stat, error) { bs.counterLk.Lock() s := bs.counters bs.counterLk.Unlock() - s.ProvideBufLen = len(bs.newBlocks) peers := bs.engine.Peers() peersStr := make([]string, len(peers)) @@ -420,107 +368,9 @@ func (bs *Server) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err // Send wanted blocks to decision engine bs.engine.NotifyNewBlocks(blks) - // If the reprovider is enabled, send block to reprovider - if bs.provideEnabled { - for _, blk := range blks { - select { - case bs.newBlocks <- blk.Cid(): - // send block off to be reprovided - case <-bs.process.Closing(): - return bs.process.Close() - } - } - } - return nil } -func (bs *Server) provideCollector(ctx context.Context) { - defer close(bs.provideKeys) - var toProvide []cid.Cid - var nextKey cid.Cid - var keysOut chan cid.Cid - - for { - select { - case blkey, ok := <-bs.newBlocks: - if !ok { - log.Debug("newBlocks channel closed") - return - } - - if keysOut == nil { - nextKey = blkey - keysOut = bs.provideKeys - } else { - toProvide = append(toProvide, blkey) - } - case keysOut <- nextKey: - if len(toProvide) > 0 { - nextKey = toProvide[0] - toProvide = toProvide[1:] - } else { - keysOut = nil - } - case <-ctx.Done(): - return - } - } -} - -func (bs *Server) provideWorker(px process.Process) { - // FIXME: OnClosingContext returns a _custom_ context type. - // Unfortunately, deriving a new cancelable context from this custom - // type fires off a goroutine. To work around this, we create a single - // cancelable context up-front and derive all sub-contexts from that. - // - // See: https://github.com/ipfs/go-ipfs/issues/5810 - ctx := procctx.OnClosingContext(px) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - limit := make(chan struct{}, provideWorkerMax) - - limitedGoProvide := func(k cid.Cid, wid int) { - defer func() { - // replace token when done - <-limit - }() - - log.Debugw("Bitswap.ProvideWorker.Start", "ID", wid, "cid", k) - defer log.Debugw("Bitswap.ProvideWorker.End", "ID", wid, "cid", k) - - ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) // timeout ctx - defer cancel() - - if err := bs.network.Provide(ctx, k); err != nil { - log.Warn(err) - } - } - - // worker spawner, reads from bs.provideKeys until it closes, spawning a - // _ratelimited_ number of workers to handle each key. - for wid := 2; ; wid++ { - log.Debug("Bitswap.ProvideWorker.Loop") - - select { - case <-px.Closing(): - return - case k, ok := <-bs.provideKeys: - if !ok { - log.Debug("provideKeys channel closed") - return - } - select { - case <-px.Closing(): - return - case limit <- struct{}{}: - go limitedGoProvide(k, wid) - } - } - } -} - func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) { // This call records changes to wantlists, blocks received, // and number of bytes transfered.