Skip to content

Commit

Permalink
bitswap/server: remove provide
Browse files Browse the repository at this point in the history
We always had a very weird relationship between bitswap and providing.
Bitswap took care of doing the initial provide and then reprovider did it later.
The Bitswap server had a complicated providing workflow where it slurped thing into memory.

Reprovide accepts provides and is able to queue them in a database, such as on disk, this is much better.

I'll add options to hook initial provide logic from the blockservice to the reprovider queue so consumers don't have to do this themselves.
  • Loading branch information
Jorropo committed Feb 14, 2024
1 parent bb8be38 commit e0d5341
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 178 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The following emojis are used to highlight certain changes:

- 🛠 `boxo/gateway`: when making a trustless CAR request with the "entity-bytes" parameter, using a negative index greater than the underlying entity length could trigger reading more data than intended
- 🛠 `boxo/gateway`: the header configuration `Config.Headers` and `AddAccessControlHeaders` has been replaced by the new middleware provided by `NewHeaders`.
- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany.

### Security

Expand Down
12 changes: 2 additions & 10 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"github.com/ipfs/boxo/bitswap/client"
"github.com/ipfs/boxo/bitswap/internal/defaults"
"github.com/ipfs/boxo/bitswap/message"
"github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/server"
Expand Down Expand Up @@ -45,9 +44,8 @@ type bitswap interface {
}

var (
_ exchange.SessionExchange = (*Bitswap)(nil)
_ bitswap = (*Bitswap)(nil)
HasBlockBufferSize = defaults.HasBlockBufferSize
_ exchange.SessionExchange = (*Bitswap)(nil)
_ bitswap = (*Bitswap)(nil)
)

type Bitswap struct {
Expand Down Expand Up @@ -85,10 +83,6 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc
serverOptions = append(serverOptions, server.WithTracer(tracer))
}

if HasBlockBufferSize != defaults.HasBlockBufferSize {
serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize))
}

ctx = metrics.CtxSubScope(ctx, "bitswap")

bs.Server = server.New(ctx, net, bstore, serverOptions...)
Expand All @@ -115,7 +109,6 @@ type Stat struct {
MessagesReceived uint64
BlocksSent uint64
DataSent uint64
ProvideBufLen int
}

func (bs *Bitswap) Stat() (*Stat, error) {
Expand All @@ -138,7 +131,6 @@ func (bs *Bitswap) Stat() (*Stat, error) {
Peers: ss.Peers,
BlocksSent: ss.BlocksSent,
DataSent: ss.DataSent,
ProvideBufLen: ss.ProvideBufLen,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)}
bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)}
ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
defer ig.Close()

Expand Down
4 changes: 4 additions & 0 deletions bitswap/client/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk
if err != nil {
t.Fatal(err)
}
err = inst.Adapter.Provide(ctx, blk.Cid())
if err != nil {
t.Fatal(err)
}
}

func TestBasicSessions(t *testing.T) {
Expand Down
5 changes: 0 additions & 5 deletions bitswap/internal/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ const (
BitswapMaxOutstandingBytesPerPeer = 1 << 20
// the number of bytes we attempt to make each outgoing bitswap message
BitswapEngineTargetMessageSize = 16 * 1024
// HasBlockBufferSize is the buffer size of the channel for new blocks
// that need to be provided. They should get pulled over by the
// provideCollector even before they are actually provided.
// TODO: Does this need to be this large givent that?
HasBlockBufferSize = 256

// Maximum size of the wantlist we are willing to keep in memory.
MaxQueuedWantlistEntiresPerPeer = 1024
Expand Down
4 changes: 0 additions & 4 deletions bitswap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ func TaskWorkerCount(count int) Option {
return Option{server.TaskWorkerCount(count)}
}

func ProvideEnabled(enabled bool) Option {
return Option{server.ProvideEnabled(enabled)}
}

func SetSendDontHaves(send bool) Option {
return Option{server.SetSendDontHaves(send)}
}
Expand Down
166 changes: 8 additions & 158 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,15 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap"
)

var provideKeysBufferSize = 2048

var (
log = logging.Logger("bitswap-server")
sflog = log.Desugar()
)

const provideWorkerMax = 6

type Option func(*Server)

type Server struct {
Expand All @@ -59,20 +54,8 @@ type Server struct {

process process.Process

// newBlocks is a channel for newly added blocks to be provided to the
// network. blocks pushed down this channel get buffered and fed to the
// provideKeys channel later on to avoid too much network activity
newBlocks chan cid.Cid
// provideKeys directly feeds provide workers
provideKeys chan cid.Cid

// Extra options to pass to the decision manager
engineOptions []decision.Option

// the size of channel buffer to use
hasBlockBufferSize int
// whether or not to make provide announcements
provideEnabled bool
}

func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Server {
Expand All @@ -87,16 +70,12 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl
}()

s := &Server{
sentHistogram: bmetrics.SentHist(ctx),
sendTimeHistogram: bmetrics.SendTimeHist(ctx),
taskWorkerCount: defaults.BitswapTaskWorkerCount,
network: network,
process: px,
provideEnabled: true,
hasBlockBufferSize: defaults.HasBlockBufferSize,
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
sentHistogram: bmetrics.SentHist(ctx),
sendTimeHistogram: bmetrics.SendTimeHist(ctx),
taskWorkerCount: defaults.BitswapTaskWorkerCount,
network: network,
process: px,
}
s.newBlocks = make(chan cid.Cid, s.hasBlockBufferSize)

for _, o := range options {
o(s)
Expand Down Expand Up @@ -131,13 +110,6 @@ func WithTracer(tap tracer.Tracer) Option {
}
}

// ProvideEnabled is an option for enabling/disabling provide announcements
func ProvideEnabled(enabled bool) Option {
return func(bs *Server) {
bs.provideEnabled = enabled
}
}

func WithPeerBlockRequestFilter(pbrf decision.PeerBlockRequestFilter) Option {
o := decision.WithPeerBlockRequestFilter(pbrf)
return func(bs *Server) {
Expand Down Expand Up @@ -233,16 +205,6 @@ func MaxCidSize(n uint) Option {
}
}

// HasBlockBufferSize configure how big the new blocks buffer should be.
func HasBlockBufferSize(count int) Option {
if count < 0 {
panic("cannot have negative buffer size")
}
return func(bs *Server) {
bs.hasBlockBufferSize = count
}
}

// WantlistForPeer returns the currently understood list of blocks requested by a
// given peer.
func (bs *Server) WantlistForPeer(p peer.ID) []cid.Cid {
Expand All @@ -263,18 +225,6 @@ func (bs *Server) startWorkers(ctx context.Context, px process.Process) {
bs.taskWorker(ctx, i)
})
}

if bs.provideEnabled {
// Start up a worker to manage sending out provides messages
px.Go(func(px process.Process) {
bs.provideCollector(ctx)
})

// Spawn up multiple workers to handle incoming blocks
// consider increasing number if providing blocks bottlenecks
// file transfers
px.Go(bs.provideWorker)
}
}

func (bs *Server) taskWorker(ctx context.Context, id int) {
Expand Down Expand Up @@ -382,18 +332,16 @@ func (bs *Server) sendBlocks(ctx context.Context, env *decision.Envelope) {
}

type Stat struct {
Peers []string
ProvideBufLen int
BlocksSent uint64
DataSent uint64
Peers []string
BlocksSent uint64
DataSent uint64
}

// Stat returns aggregated statistics about bitswap operations
func (bs *Server) Stat() (Stat, error) {
bs.counterLk.Lock()
s := bs.counters
bs.counterLk.Unlock()
s.ProvideBufLen = len(bs.newBlocks)

peers := bs.engine.Peers()
peersStr := make([]string, len(peers))
Expand All @@ -420,107 +368,9 @@ func (bs *Server) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err
// Send wanted blocks to decision engine
bs.engine.NotifyNewBlocks(blks)

// If the reprovider is enabled, send block to reprovider
if bs.provideEnabled {
for _, blk := range blks {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
}
}

return nil
}

func (bs *Server) provideCollector(ctx context.Context) {
defer close(bs.provideKeys)
var toProvide []cid.Cid
var nextKey cid.Cid
var keysOut chan cid.Cid

for {
select {
case blkey, ok := <-bs.newBlocks:
if !ok {
log.Debug("newBlocks channel closed")
return
}

if keysOut == nil {
nextKey = blkey
keysOut = bs.provideKeys
} else {
toProvide = append(toProvide, blkey)
}
case keysOut <- nextKey:
if len(toProvide) > 0 {
nextKey = toProvide[0]
toProvide = toProvide[1:]
} else {
keysOut = nil
}
case <-ctx.Done():
return
}
}
}

func (bs *Server) provideWorker(px process.Process) {
// FIXME: OnClosingContext returns a _custom_ context type.
// Unfortunately, deriving a new cancelable context from this custom
// type fires off a goroutine. To work around this, we create a single
// cancelable context up-front and derive all sub-contexts from that.
//
// See: https://github.com/ipfs/go-ipfs/issues/5810
ctx := procctx.OnClosingContext(px)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

limit := make(chan struct{}, provideWorkerMax)

limitedGoProvide := func(k cid.Cid, wid int) {
defer func() {
// replace token when done
<-limit
}()

log.Debugw("Bitswap.ProvideWorker.Start", "ID", wid, "cid", k)
defer log.Debugw("Bitswap.ProvideWorker.End", "ID", wid, "cid", k)

ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) // timeout ctx
defer cancel()

if err := bs.network.Provide(ctx, k); err != nil {
log.Warn(err)
}
}

// worker spawner, reads from bs.provideKeys until it closes, spawning a
// _ratelimited_ number of workers to handle each key.
for wid := 2; ; wid++ {
log.Debug("Bitswap.ProvideWorker.Loop")

select {
case <-px.Closing():
return
case k, ok := <-bs.provideKeys:
if !ok {
log.Debug("provideKeys channel closed")
return
}
select {
case <-px.Closing():
return
case limit <- struct{}{}:
go limitedGoProvide(k, wid)
}
}
}
}

func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) {
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
Expand Down

0 comments on commit e0d5341

Please sign in to comment.