Skip to content

Commit

Permalink
blockservice: add NewSessionContext and EmbedSessionInContext
Browse files Browse the repository at this point in the history
This also include cleanup for session code.
  • Loading branch information
Jorropo committed Jan 12, 2024
1 parent 76d9292 commit 6c590b8
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 109 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The following emojis are used to highlight certain changes:

### Added

- `blockservice` now have `ContextWithSession` and `EmbedSessionInContext` functions, which allows to embed a session in a context, future calls to `BlockGetter.GetBlock`, `BlockGetter.GetBlocks` and `NewSession` will then use the session in the context.

### Changed

### Removed
Expand Down
237 changes: 128 additions & 109 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,29 +144,19 @@ func (s *blockService) Allowlist() verifcid.Allowlist {
// If the current exchange is a SessionExchange, a new exchange
// session will be created. Otherwise, the current exchange will be used
// directly.
// Sessions are lazily setup, this is cheap.
func NewSession(ctx context.Context, bs BlockService) *Session {
allowlist := verifcid.Allowlist(verifcid.DefaultAllowlist)
ses := grabSessionFromContext(ctx, bs)
if ses != nil {
return ses
}

var allowlist verifcid.Allowlist = verifcid.DefaultAllowlist
if bbs, ok := bs.(BoundedBlockService); ok {
allowlist = bbs.Allowlist()
}
exch := bs.Exchange()
if sessEx, ok := exch.(exchange.SessionExchange); ok {
return &Session{
allowlist: allowlist,
sessCtx: ctx,
ses: nil,
sessEx: sessEx,
bs: bs.Blockstore(),
notifier: exch,
}
}
return &Session{
allowlist: allowlist,
ses: exch,
sessCtx: ctx,
bs: bs.Blockstore(),
notifier: exch,
}

return &Session{bs: bs, allowlist: allowlist, sesctx: ctx}
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
Expand Down Expand Up @@ -248,75 +238,80 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {
// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
if ses := grabSessionFromContext(ctx, s); ses != nil {
return ses.GetBlock(ctx, c)
}

ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

var f func() notifiableFetcher
if s.exchange != nil {
f = s.getExchange
}

return getBlock(ctx, c, s.blockstore, s.allowlist, f)
return getBlock(ctx, c, s, s.allowlist, s.getExchangeFetcher)
}

func (s *blockService) getExchange() notifiableFetcher {
// Look at what I have to do, no interface covariance :'(
func (s *blockService) getExchangeFetcher() exchange.Fetcher {
return s.exchange
}

func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, allowlist verifcid.Allowlist, fget func() notifiableFetcher) (blocks.Block, error) {
func getBlock(ctx context.Context, c cid.Cid, bs BlockService, allowlist verifcid.Allowlist, fetchFactory func() exchange.Fetcher) (blocks.Block, error) {
err := verifcid.ValidateCid(allowlist, c) // hash security
if err != nil {
return nil, err
}

block, err := bs.Get(ctx, c)
if err == nil {
blockstore := bs.Blockstore()

block, err := blockstore.Get(ctx, c)
switch {
case err == nil:
return block, nil
case ipld.IsNotFound(err):
break
default:
return nil, err

Check warning on line 271 in blockservice/blockservice.go

View check run for this annotation

Codecov / codecov/patch

blockservice/blockservice.go#L270-L271

Added lines #L270 - L271 were not covered by tests
}

if ipld.IsNotFound(err) && fget != nil {
f := fget() // Don't load the exchange until we have to
fetch := fetchFactory() // lazily create session if needed
if fetch == nil {
logger.Debug("BlockService GetBlock: Not found")
return nil, err
}

// TODO be careful checking ErrNotFound. If the underlying
// implementation changes, this will break.
logger.Debug("BlockService: Searching")
blk, err := f.GetBlock(ctx, c)
if err != nil {
return nil, err
}
// also write in the blockstore for caching, inform the exchange that the block is available
err = bs.Put(ctx, blk)
if err != nil {
return nil, err
}
err = f.NotifyNewBlocks(ctx, blk)
logger.Debug("BlockService: Searching")
blk, err := fetch.GetBlock(ctx, c)
if err != nil {
return nil, err
}
// also write in the blockstore for caching, inform the exchange that the block is available
err = blockstore.Put(ctx, blk)
if err != nil {
return nil, err
}

Check warning on line 289 in blockservice/blockservice.go

View check run for this annotation

Codecov / codecov/patch

blockservice/blockservice.go#L288-L289

Added lines #L288 - L289 were not covered by tests
if ex := bs.Exchange(); ex != nil {
err = ex.NotifyNewBlocks(ctx, blk)
if err != nil {
return nil, err
}
logger.Debugf("BlockService.BlockFetched %s", c)
return blk, nil
}

logger.Debug("BlockService GetBlock: Not found")
return nil, err
logger.Debugf("BlockService.BlockFetched %s", c)
return blk, nil
}

// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
if ses := grabSessionFromContext(ctx, s); ses != nil {
return ses.GetBlocks(ctx, ks)
}

ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks")
defer span.End()

var f func() notifiableFetcher
if s.exchange != nil {
f = s.getExchange
}

return getBlocks(ctx, ks, s.blockstore, s.allowlist, f)
return getBlocks(ctx, ks, s, s.allowlist, s.getExchangeFetcher)
}

func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allowlist verifcid.Allowlist, fget func() notifiableFetcher) <-chan blocks.Block {
func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, allowlist verifcid.Allowlist, fetchFactory func() exchange.Fetcher) <-chan blocks.Block {
out := make(chan blocks.Block)

go func() {
Expand Down Expand Up @@ -344,6 +339,8 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allo
ks = ks2
}

bs := blockservice.Blockstore()

var misses []cid.Cid
for _, c := range ks {
hit, err := bs.Get(ctx, c)
Expand All @@ -358,17 +355,18 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allo
}
}

if len(misses) == 0 || fget == nil {
fetch := fetchFactory() // don't load exchange unless we have to
if len(misses) == 0 || fetch == nil {
return
}

f := fget() // don't load exchange unless we have to
rblocks, err := f.GetBlocks(ctx, misses)
rblocks, err := fetch.GetBlocks(ctx, misses)
if err != nil {
logger.Debugf("Error with GetBlocks: %s", err)
return
}

ex := blockservice.Exchange()
var cache [1]blocks.Block // preallocate once for all iterations
for {
var b blocks.Block
Expand All @@ -389,14 +387,16 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allo
return
}

// inform the exchange that the blocks are available
cache[0] = b
err = f.NotifyNewBlocks(ctx, cache[:]...)
if err != nil {
logger.Errorf("could not tell the exchange about new blocks: %s", err)
return
if ex != nil {
// inform the exchange that the blocks are available
cache[0] = b
err = ex.NotifyNewBlocks(ctx, cache[:]...)
if err != nil {
logger.Errorf("could not tell the exchange about new blocks: %s", err)
return
}

Check warning on line 397 in blockservice/blockservice.go

View check run for this annotation

Codecov / codecov/patch

blockservice/blockservice.go#L395-L397

Added lines #L395 - L397 were not covered by tests
cache[0] = nil // early gc
}
cache[0] = nil // early gc

select {
case out <- b:
Expand Down Expand Up @@ -428,70 +428,89 @@ func (s *blockService) Close() error {
return s.exchange.Close()
}

type notifier interface {
NotifyNewBlocks(context.Context, ...blocks.Block) error
}

// Session is a helper type to provide higher level access to bitswap sessions
type Session struct {
allowlist verifcid.Allowlist
bs blockstore.Blockstore
ses exchange.Fetcher
sessEx exchange.SessionExchange
sessCtx context.Context
notifier notifier
lk sync.Mutex
createSession sync.Once
bs BlockService
ses exchange.Fetcher
sesctx context.Context
allowlist verifcid.Allowlist
}

type notifiableFetcher interface {
exchange.Fetcher
notifier
}
// grabSession is used to lazily create sessions.
func (s *Session) grabSession() exchange.Fetcher {
s.createSession.Do(func() {
defer func() {
s.sesctx = nil // early gc
}()

type notifiableFetcherWrapper struct {
exchange.Fetcher
notifier
}

func (s *Session) getSession() notifiableFetcher {
s.lk.Lock()
defer s.lk.Unlock()
if s.ses == nil {
s.ses = s.sessEx.NewSession(s.sessCtx)
}

return notifiableFetcherWrapper{s.ses, s.notifier}
}
ex := s.bs.Exchange()
if ex == nil {
return
}
s.ses = ex // always fallback to non session fetches

func (s *Session) getExchange() notifiableFetcher {
return notifiableFetcherWrapper{s.ses, s.notifier}
}
sesEx, ok := ex.(exchange.SessionExchange)
if !ok {
return
}
s.ses = sesEx.NewSession(s.sesctx)
})

func (s *Session) getFetcherFactory() func() notifiableFetcher {
if s.sessEx != nil {
return s.getSession
}
if s.ses != nil {
// Our exchange isn't session compatible, let's fallback to non sessions fetches
return s.getExchange
}
return nil
return s.ses
}

// GetBlock gets a block in the context of a request session
func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

return getBlock(ctx, c, s.bs, s.allowlist, s.getFetcherFactory())
return getBlock(ctx, c, s.bs, s.allowlist, s.grabSession)
}

// GetBlocks gets blocks in the context of a request session
func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
ctx, span := internal.StartSpan(ctx, "Session.GetBlocks")
defer span.End()

return getBlocks(ctx, ks, s.bs, s.allowlist, s.getFetcherFactory())
return getBlocks(ctx, ks, s.bs, s.allowlist, s.grabSession)
}

var _ BlockGetter = (*Session)(nil)

// ContextWithSession is a helper which creates a context with an embded session,
// future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService]
// will be redirected to this same session instead.
// Sessions are lazily setup, this is cheap.
// It wont make a new session if one exists already in the context.
func ContextWithSession(ctx context.Context, bs BlockService) context.Context {
if grabSessionFromContext(ctx, bs) != nil {
return ctx
}
return EmbedSessionInContext(ctx, NewSession(ctx, bs))
}

// EmbedSessionInContext is like [NewSessionContext] but it allows to embed an existing session.
func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context {
// use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice.
return context.WithValue(ctx, ses.bs, ses)
}

// grabSessionFromContext returns nil if the session was not found
// This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety,
// if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app.
// By having this private we allow consumers to follow the trace of where the blockservice is passed and used.
func grabSessionFromContext(ctx context.Context, bs BlockService) *Session {
s := ctx.Value(bs)
if s == nil {
return nil
}

ss, ok := s.(*Session)
if !ok {
// idk what to do here, that kinda sucks, giveup
return nil
}

Check warning on line 513 in blockservice/blockservice.go

View check run for this annotation

Codecov / codecov/patch

blockservice/blockservice.go#L511-L513

Added lines #L511 - L513 were not covered by tests

return ss
}
Loading

0 comments on commit 6c590b8

Please sign in to comment.