diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index e80a407aae..4794ded77b 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -96,6 +96,13 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc return bs } +func (bs *Bitswap) NotifyNewBlock(ctx context.Context, blk blocks.Block) error { + return multierr.Combine( + bs.Client.NotifyNewBlock(ctx, blk), + bs.Server.NotifyNewBlock(ctx, blk), + ) +} + func (bs *Bitswap) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) error { return multierr.Combine( bs.Client.NotifyNewBlocks(ctx, blks...), diff --git a/bitswap/client/client.go b/bitswap/client/client.go index 46e3a0ecc5..51645ed144 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -259,6 +259,16 @@ func (bs *Client) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks. return session.GetBlocks(ctx, keys) } +// NotifyNewBlock announces the existence of blocks to this bitswap service. +// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure +// that those blocks are available in the blockstore before calling this function. +func (bs *Client) NotifyNewBlock(ctx context.Context, blk blocks.Block) error { + // Call to the variadic to avoid code duplication. + // This is actually fine to do because no calls is virtual the compiler is able + // to see that the slice does not leak and the slice is stack allocated. + return bs.NotifyNewBlocks(ctx, blk) +} + // NotifyNewBlocks announces the existence of blocks to this bitswap service. // Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure // that those blocks are available in the blockstore before calling this function. diff --git a/bitswap/server/server.go b/bitswap/server/server.go index a3378d6c44..924bcee382 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -404,6 +404,17 @@ func (bs *Server) Stat() (Stat, error) { return s, nil } +// NotifyNewBlock announces the existence of block to this bitswap service. The +// service will potentially notify its peers. +// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure +// that those blocks are available in the blockstore before calling this function. +func (bs *Server) NotifyNewBlock(ctx context.Context, blk blocks.Block) error { + // Call to the variadic to avoid code duplication. + // This is actually fine to do because no calls is virtual the compiler is able + // to see that the slice does not leak and the slice is stack allocated. + return bs.NotifyNewBlocks(ctx, blk) +} + // NotifyNewBlocks announces the existence of blocks to this bitswap service. The // service will potentially notify its peers. // Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 773fb53037..3691640dfb 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -157,8 +157,8 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { logger.Debugf("BlockService.BlockAdded %s", c) if s.exchange != nil { - if err := s.exchange.NotifyNewBlocks(ctx, o); err != nil { - logger.Errorf("NotifyNewBlocks: %s", err.Error()) + if err := s.exchange.NotifyNewBlock(ctx, o); err != nil { + logger.Errorf("NotifyNewBlock: %s", err.Error()) } } @@ -254,7 +254,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget fun if err != nil { return nil, err } - err = f.NotifyNewBlocks(ctx, blk) + err = f.NotifyNewBlock(ctx, blk) if err != nil { return nil, err } @@ -334,7 +334,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget return } - var cache [1]blocks.Block // preallocate once for all iterations for { var b blocks.Block select { @@ -355,13 +354,11 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget } // inform the exchange that the blocks are available - cache[0] = b - err = f.NotifyNewBlocks(ctx, cache[:]...) + err = f.NotifyNewBlock(ctx, b) if err != nil { logger.Errorf("could not tell the exchange about new blocks: %s", err) return } - cache[0] = nil // early gc select { case out <- b: @@ -391,6 +388,7 @@ func (s *blockService) Close() error { } type notifier interface { + NotifyNewBlock(context.Context, blocks.Block) error NotifyNewBlocks(context.Context, ...blocks.Block) error } diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index 14396c8a1e..4258a8c71d 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -195,6 +195,11 @@ type notifyCountingExchange struct { notifyCount int } +func (n *notifyCountingExchange) NotifyNewBlock(ctx context.Context, blocks blocks.Block) error { + n.notifyCount++ + return n.Interface.NotifyNewBlock(ctx, blocks) +} + func (n *notifyCountingExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { n.notifyCount += len(blocks) return n.Interface.NotifyNewBlocks(ctx, blocks...) diff --git a/exchange/interface.go b/exchange/interface.go index 3ae174d5c3..7d32960a5f 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -13,6 +13,8 @@ import ( type Interface interface { // type Exchanger interface Fetcher + // NotifyNewBlock tells the exchange that a new block is available and can be served. + NotifyNewBlock(ctx context.Context, blocks blocks.Block) error // NotifyNewBlocks tells the exchange that new blocks are available and can be served. NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error diff --git a/exchange/offline/offline.go b/exchange/offline/offline.go index f3590893e3..1e8cdd2fe2 100644 --- a/exchange/offline/offline.go +++ b/exchange/offline/offline.go @@ -34,6 +34,12 @@ func (e *offlineExchange) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block return blk, err } +// NotifyNewBlock tells the exchange that a new block is available and can be served. +func (e *offlineExchange) NotifyNewBlock(ctx context.Context, block blocks.Block) error { + // as an offline exchange we have nothing to do + return nil +} + // NotifyNewBlocks tells the exchange that new blocks are available and can be served. func (e *offlineExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { // as an offline exchange we have nothing to do