Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockservice: add NewSessionContext and EmbedSessionInContext #549

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The following emojis are used to highlight certain changes:

### Added

- `blockservice` now has `ContextWithSession` and `EmbedSessionInContext` functions, which allows to embed a session in a context. Future calls to `BlockGetter.GetBlock`, `BlockGetter.GetBlocks` and `NewSession` will use the session in the context.

### Changed

### Removed
Expand Down
237 changes: 128 additions & 109 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,29 +144,19 @@
// If the current exchange is a SessionExchange, a new exchange
// session will be created. Otherwise, the current exchange will be used
// directly.
// Sessions are lazily setup, this is cheap.
func NewSession(ctx context.Context, bs BlockService) *Session {
allowlist := verifcid.Allowlist(verifcid.DefaultAllowlist)
ses := grabSessionFromContext(ctx, bs)
if ses != nil {
return ses
}

var allowlist verifcid.Allowlist = verifcid.DefaultAllowlist
if bbs, ok := bs.(BoundedBlockService); ok {
allowlist = bbs.Allowlist()
}
exch := bs.Exchange()
if sessEx, ok := exch.(exchange.SessionExchange); ok {
return &Session{
allowlist: allowlist,
sessCtx: ctx,
ses: nil,
sessEx: sessEx,
bs: bs.Blockstore(),
notifier: exch,
}
}
return &Session{
allowlist: allowlist,
ses: exch,
sessCtx: ctx,
bs: bs.Blockstore(),
notifier: exch,
}

return &Session{bs: bs, allowlist: allowlist, sesctx: ctx}
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
Expand Down Expand Up @@ -248,75 +238,80 @@
// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
if ses := grabSessionFromContext(ctx, s); ses != nil {
return ses.GetBlock(ctx, c)
}

ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

var f func() notifiableFetcher
if s.exchange != nil {
f = s.getExchange
}

return getBlock(ctx, c, s.blockstore, s.allowlist, f)
return getBlock(ctx, c, s, s.allowlist, s.getExchangeFetcher)
}

func (s *blockService) getExchange() notifiableFetcher {
// Look at what I have to do, no interface covariance :'(
func (s *blockService) getExchangeFetcher() exchange.Fetcher {
return s.exchange
}

func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, allowlist verifcid.Allowlist, fget func() notifiableFetcher) (blocks.Block, error) {
func getBlock(ctx context.Context, c cid.Cid, bs BlockService, allowlist verifcid.Allowlist, fetchFactory func() exchange.Fetcher) (blocks.Block, error) {
err := verifcid.ValidateCid(allowlist, c) // hash security
if err != nil {
return nil, err
}

block, err := bs.Get(ctx, c)
if err == nil {
blockstore := bs.Blockstore()

block, err := blockstore.Get(ctx, c)
switch {
case err == nil:
return block, nil
case ipld.IsNotFound(err):
break
default:
return nil, err

Check warning on line 271 in blockservice/blockservice.go

View check run for this annotation

Codecov / codecov/patch

blockservice/blockservice.go#L270-L271

Added lines #L270 - L271 were not covered by tests
}

if ipld.IsNotFound(err) && fget != nil {
f := fget() // Don't load the exchange until we have to
fetch := fetchFactory() // lazily create session if needed
if fetch == nil {
logger.Debug("BlockService GetBlock: Not found")
return nil, err
}

// TODO be careful checking ErrNotFound. If the underlying
// implementation changes, this will break.
logger.Debug("BlockService: Searching")
blk, err := f.GetBlock(ctx, c)
if err != nil {
return nil, err
}
// also write in the blockstore for caching, inform the exchange that the block is available
err = bs.Put(ctx, blk)
if err != nil {
return nil, err
}
err = f.NotifyNewBlocks(ctx, blk)
logger.Debug("BlockService: Searching")
blk, err := fetch.GetBlock(ctx, c)
if err != nil {
return nil, err
}
// also write in the blockstore for caching, inform the exchange that the block is available
err = blockstore.Put(ctx, blk)
if err != nil {
return nil, err
}

Check warning on line 289 in blockservice/blockservice.go

View check run for this annotation

Codecov / codecov/patch

blockservice/blockservice.go#L288-L289

Added lines #L288 - L289 were not covered by tests
if ex := bs.Exchange(); ex != nil {
err = ex.NotifyNewBlocks(ctx, blk)
if err != nil {
return nil, err
}
logger.Debugf("BlockService.BlockFetched %s", c)
return blk, nil
}

logger.Debug("BlockService GetBlock: Not found")
return nil, err
logger.Debugf("BlockService.BlockFetched %s", c)
return blk, nil
}

// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
if ses := grabSessionFromContext(ctx, s); ses != nil {
return ses.GetBlocks(ctx, ks)
}

ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks")
defer span.End()

var f func() notifiableFetcher
if s.exchange != nil {
f = s.getExchange
}

return getBlocks(ctx, ks, s.blockstore, s.allowlist, f)
return getBlocks(ctx, ks, s, s.allowlist, s.getExchangeFetcher)
}

func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allowlist verifcid.Allowlist, fget func() notifiableFetcher) <-chan blocks.Block {
func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, allowlist verifcid.Allowlist, fetchFactory func() exchange.Fetcher) <-chan blocks.Block {
out := make(chan blocks.Block)

go func() {
Expand Down Expand Up @@ -344,6 +339,8 @@
ks = ks2
}

bs := blockservice.Blockstore()

var misses []cid.Cid
for _, c := range ks {
hit, err := bs.Get(ctx, c)
Expand All @@ -358,17 +355,18 @@
}
}

if len(misses) == 0 || fget == nil {
fetch := fetchFactory() // don't load exchange unless we have to
if len(misses) == 0 || fetch == nil {
return
}

f := fget() // don't load exchange unless we have to
rblocks, err := f.GetBlocks(ctx, misses)
rblocks, err := fetch.GetBlocks(ctx, misses)
if err != nil {
logger.Debugf("Error with GetBlocks: %s", err)
return
}

ex := blockservice.Exchange()
var cache [1]blocks.Block // preallocate once for all iterations
for {
var b blocks.Block
Expand All @@ -389,14 +387,16 @@
return
}

// inform the exchange that the blocks are available
cache[0] = b
err = f.NotifyNewBlocks(ctx, cache[:]...)
if err != nil {
logger.Errorf("could not tell the exchange about new blocks: %s", err)
return
if ex != nil {
// inform the exchange that the blocks are available
cache[0] = b
err = ex.NotifyNewBlocks(ctx, cache[:]...)
if err != nil {
logger.Errorf("could not tell the exchange about new blocks: %s", err)
return
}

Check warning on line 397 in blockservice/blockservice.go

View check run for this annotation

Codecov / codecov/patch

blockservice/blockservice.go#L395-L397

Added lines #L395 - L397 were not covered by tests
cache[0] = nil // early gc
}
cache[0] = nil // early gc

select {
case out <- b:
Expand Down Expand Up @@ -428,70 +428,89 @@
return s.exchange.Close()
}

type notifier interface {
NotifyNewBlocks(context.Context, ...blocks.Block) error
}

// Session is a helper type to provide higher level access to bitswap sessions
type Session struct {
allowlist verifcid.Allowlist
bs blockstore.Blockstore
ses exchange.Fetcher
sessEx exchange.SessionExchange
sessCtx context.Context
notifier notifier
lk sync.Mutex
createSession sync.Once
bs BlockService
ses exchange.Fetcher
sesctx context.Context
allowlist verifcid.Allowlist
}

type notifiableFetcher interface {
exchange.Fetcher
notifier
}
// grabSession is used to lazily create sessions.
func (s *Session) grabSession() exchange.Fetcher {
s.createSession.Do(func() {
defer func() {
s.sesctx = nil // early gc
}()

type notifiableFetcherWrapper struct {
exchange.Fetcher
notifier
}

func (s *Session) getSession() notifiableFetcher {
s.lk.Lock()
defer s.lk.Unlock()
if s.ses == nil {
s.ses = s.sessEx.NewSession(s.sessCtx)
}

return notifiableFetcherWrapper{s.ses, s.notifier}
}
ex := s.bs.Exchange()
if ex == nil {
return
}
s.ses = ex // always fallback to non session fetches

func (s *Session) getExchange() notifiableFetcher {
return notifiableFetcherWrapper{s.ses, s.notifier}
}
sesEx, ok := ex.(exchange.SessionExchange)
if !ok {
return
}
s.ses = sesEx.NewSession(s.sesctx)
})

func (s *Session) getFetcherFactory() func() notifiableFetcher {
if s.sessEx != nil {
return s.getSession
}
if s.ses != nil {
// Our exchange isn't session compatible, let's fallback to non sessions fetches
return s.getExchange
}
return nil
return s.ses
}

// GetBlock gets a block in the context of a request session
func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

return getBlock(ctx, c, s.bs, s.allowlist, s.getFetcherFactory())
return getBlock(ctx, c, s.bs, s.allowlist, s.grabSession)
}

// GetBlocks gets blocks in the context of a request session
func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
ctx, span := internal.StartSpan(ctx, "Session.GetBlocks")
defer span.End()

return getBlocks(ctx, ks, s.bs, s.allowlist, s.getFetcherFactory())
return getBlocks(ctx, ks, s.bs, s.allowlist, s.grabSession)
}

var _ BlockGetter = (*Session)(nil)

// ContextWithSession is a helper which creates a context with an embded session,
// future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService]
// will be redirected to this same session instead.
// Sessions are lazily setup, this is cheap.
// It wont make a new session if one exists already in the context.
func ContextWithSession(ctx context.Context, bs BlockService) context.Context {
if grabSessionFromContext(ctx, bs) != nil {
return ctx
}
return EmbedSessionInContext(ctx, NewSession(ctx, bs))
}

// EmbedSessionInContext is like [NewSessionContext] but it allows to embed an existing session.
func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context {
// use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice.
return context.WithValue(ctx, ses.bs, ses)
}

// grabSessionFromContext returns nil if the session was not found
// This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety,
// if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app.
// By having this private we allow consumers to follow the trace of where the blockservice is passed and used.
func grabSessionFromContext(ctx context.Context, bs BlockService) *Session {
s := ctx.Value(bs)
if s == nil {
return nil
}

ss, ok := s.(*Session)
if !ok {
// idk what to do here, that kinda sucks, giveup
return nil
}

Check warning on line 513 in blockservice/blockservice.go

View check run for this annotation

Codecov / codecov/patch

blockservice/blockservice.go#L511-L513

Added lines #L511 - L513 were not covered by tests

return ss
}
Loading
Loading