From 0f0d4eaf32b8a6dadfa43f10587f45c0f15b5351 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Sun, 6 Nov 2022 23:02:24 -0500 Subject: [PATCH] feat(sessions): option for dynamically manageable requests --- client/internal/getter/getter.go | 109 +++++++++++++++++- .../internal/notifications/notifications.go | 93 ++++++++++++++- client/internal/session/session.go | 25 ++++ client/sessioniface/fetcher.go | 18 +++ 4 files changed, 242 insertions(+), 3 deletions(-) create mode 100644 client/sessioniface/fetcher.go diff --git a/client/internal/getter/getter.go b/client/internal/getter/getter.go index 5a58e187..36c02968 100644 --- a/client/internal/getter/getter.go +++ b/client/internal/getter/getter.go @@ -3,10 +3,12 @@ package getter import ( "context" "errors" - + "fmt" "github.com/ipfs/go-bitswap/client/internal" notifications "github.com/ipfs/go-bitswap/client/internal/notifications" + "github.com/ipfs/go-bitswap/client/sessioniface" logging "github.com/ipfs/go-log" + "sync" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" @@ -95,6 +97,111 @@ func AsyncGetBlocks(ctx context.Context, sessctx context.Context, keys []cid.Cid return out, nil } +// AsyncGetBlocksCh take a set of block cids, a pubsub channel for incoming +// blocks, a want function, and a close function, and returns a channel of +// incoming blocks. +func AsyncGetBlocksCh(ctx context.Context, sessctx context.Context, keys <-chan sessioniface.AddRemoveCid, notif notifications.PubSub, + want WantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) { + ctx, span := internal.StartSpan(ctx, "Getter.AsyncGetBlocks") + defer span.End() + + type ia interface { + notifications.PubSub + SubscribeModifier(ctx context.Context, addRmKeysCh <-chan notifications.AddRemoveCid, initialKeys ...cid.Cid) <-chan blocks.Block + } + n, ok := notif.(ia) + if !ok { + return nil, fmt.Errorf("wrong notifier type") + } + + keyCh := make(chan notifications.AddRemoveCid) + blockCh := n.SubscribeModifier(ctx, keyCh) + out := make(chan blocks.Block) + + ctx, cancel := context.WithCancel(ctx) + remaining := cid.NewSet() + requested := cid.NewSet() + remLk := sync.Mutex{} + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-sessctx.Done(): + return + case k, ok := <-keys: + if !ok { + return + } + if k.IsAdd() { + remLk.Lock() + remaining.Add(k.Key()) + requested.Add(k.Key()) + remLk.Unlock() + select { + case <-ctx.Done(): + return + case <-sessctx.Done(): + return + case keyCh <- k: + } + log.Debugw("Bitswap.GetBlockRequest.Start", "cid", k) + want(ctx, []cid.Cid{k.Key()}) + } else { + remLk.Lock() + remaining.Remove(k.Key()) + remLk.Unlock() + cwants([]cid.Cid{k.Key()}) + select { + case <-ctx.Done(): + return + case <-sessctx.Done(): + return + case keyCh <- k: + } + } + } + } + }() + go func() { + // Clean up before exiting this function, and call the cancel function on + // any remaining keys + defer func() { + cancel() + close(out) + // can't just defer this call on its own, arguments are resolved *when* the defer is created + remLk.Lock() + cwants(remaining.Keys()) + remLk.Unlock() + }() + + for { + select { + case <-ctx.Done(): + return + case <-sessctx.Done(): + return + case blk, ok := <-blockCh: + if !ok { + return + } + remLk.Lock() + remaining.Remove(blk.Cid()) + remLk.Unlock() + select { + case out <- blk: + case <-ctx.Done(): + return + case <-sessctx.Done(): + return + } + } + } + }() + return out, nil +} + // Listens for incoming blocks, passing them to the out channel. // If the context is cancelled or the incoming channel closes, calls cfun with // any keys corresponding to blocks that were never received. diff --git a/client/internal/notifications/notifications.go b/client/internal/notifications/notifications.go index ed4b79f5..f9e52a2d 100644 --- a/client/internal/notifications/notifications.go +++ b/client/internal/notifications/notifications.go @@ -2,11 +2,10 @@ package notifications import ( "context" - "sync" - pubsub "github.com/cskr/pubsub" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" + "sync" ) const bufferSize = 16 @@ -130,6 +129,96 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Bl return blocksCh } +type AddRemoveCid interface { + IsAdd() bool + Key() cid.Cid +} + +// SubscribeModifier returns a channel of blocks for the given |keys|. |blockChannel| +// is closed if the |ctx| times out or is cancelled, or after receiving the blocks +// corresponding to |keys|. +func (ps *impl) SubscribeModifier(ctx context.Context, addRmKeysCh <-chan AddRemoveCid, initialKeys ...cid.Cid) <-chan blocks.Block { + blocksCh := make(chan blocks.Block) + valuesCh := make(chan interface{}) + + // prevent shutdown + ps.lk.RLock() + defer ps.lk.RUnlock() + + select { + case <-ps.closed: + close(blocksCh) + return blocksCh + default: + } + + ctx, cancel := context.WithCancel(ctx) + + // AddSubOnceEach listens for each key in the list, and closes the channel + // once all keys have been received + ps.wrapped.AddSubOnceEach(valuesCh, append([]string{"waitUntilEnd"}, toStrings(initialKeys)...)...) + go func() { + for { + select { + case <-ctx.Done(): + return + case <-ps.closed: + case chg, ok := <-addRmKeysCh: + if !ok { + return + } + if chg.IsAdd() { + ps.wrapped.AddSubOnceEach(valuesCh, chg.Key().KeyString()) + } else { + ps.wrapped.Unsub(valuesCh, chg.Key().KeyString()) + } + } + } + }() + go func() { + defer func() { + close(blocksCh) + cancel() + + ps.lk.RLock() + defer ps.lk.RUnlock() + // Don't touch the pubsub instance if we're + // already closed. + select { + case <-ps.closed: + return + default: + } + + ps.wrapped.Unsub(valuesCh) + }() + + for { + select { + case <-ctx.Done(): + return + case <-ps.closed: + case val, ok := <-valuesCh: + if !ok { + return + } + block, ok := val.(blocks.Block) + if !ok { + return + } + select { + case <-ctx.Done(): + return + case blocksCh <- block: // continue + case <-ps.closed: + } + } + } + }() + + return blocksCh +} + func toStrings(keys []cid.Cid) []string { strs := make([]string, 0, len(keys)) for _, key := range keys { diff --git a/client/internal/session/session.go b/client/internal/session/session.go index 51e787e2..c5fd5cb4 100644 --- a/client/internal/session/session.go +++ b/client/internal/session/session.go @@ -2,6 +2,7 @@ package session import ( "context" + "github.com/ipfs/go-bitswap/client/sessioniface" "time" "github.com/ipfs/go-bitswap/client/internal" @@ -256,6 +257,30 @@ func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks. ) } +// GetBlocksCh fetches a set of blocks within the context of this session and +// returns a channel that found blocks will be returned on. No order is +// guaranteed on the returned blocks. +func (s *Session) GetBlocksCh(ctx context.Context, keys <-chan sessioniface.AddRemoveCid) (<-chan blocks.Block, error) { + ctx, span := internal.StartSpan(ctx, "Session.GetBlocks") + defer span.End() + + return bsgetter.AsyncGetBlocksCh(ctx, s.ctx, keys, s.notif, + func(ctx context.Context, keys []cid.Cid) { + select { + case s.incoming <- op{op: opWant, keys: keys}: + case <-ctx.Done(): + case <-s.ctx.Done(): + } + }, + func(keys []cid.Cid) { + select { + case s.incoming <- op{op: opCancel, keys: keys}: + case <-s.ctx.Done(): + } + }, + ) +} + // SetBaseTickDelay changes the rate at which ticks happen. func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) { select { diff --git a/client/sessioniface/fetcher.go b/client/sessioniface/fetcher.go new file mode 100644 index 00000000..d9ce3198 --- /dev/null +++ b/client/sessioniface/fetcher.go @@ -0,0 +1,18 @@ +package sessioniface + +import ( + "context" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + exchange "github.com/ipfs/go-ipfs-exchange-interface" +) + +type AddRemoveCid interface { + IsAdd() bool + Key() cid.Cid +} + +type ChannelFetcher interface { + exchange.Fetcher + GetBlocksCh(ctx context.Context, keys <-chan AddRemoveCid) (<-chan blocks.Block, error) +}