Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

[WIP] Experiment: Option for dynamically manageable requests #593

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 108 additions & 1 deletion client/internal/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package getter
import (
"context"
"errors"

"fmt"
"github.com/ipfs/go-bitswap/client/internal"
notifications "github.com/ipfs/go-bitswap/client/internal/notifications"
"github.com/ipfs/go-bitswap/client/sessioniface"
logging "github.com/ipfs/go-log"
"sync"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -95,6 +97,111 @@ func AsyncGetBlocks(ctx context.Context, sessctx context.Context, keys []cid.Cid
return out, nil
}

// AsyncGetBlocksCh take a set of block cids, a pubsub channel for incoming
// blocks, a want function, and a close function, and returns a channel of
// incoming blocks.
func AsyncGetBlocksCh(ctx context.Context, sessctx context.Context, keys <-chan sessioniface.AddRemoveCid, notif notifications.PubSub,
want WantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Getter.AsyncGetBlocks")
defer span.End()

type ia interface {
notifications.PubSub
SubscribeModifier(ctx context.Context, addRmKeysCh <-chan notifications.AddRemoveCid, initialKeys ...cid.Cid) <-chan blocks.Block
}
n, ok := notif.(ia)
if !ok {
return nil, fmt.Errorf("wrong notifier type")
}

keyCh := make(chan notifications.AddRemoveCid)
blockCh := n.SubscribeModifier(ctx, keyCh)
out := make(chan blocks.Block)

ctx, cancel := context.WithCancel(ctx)
remaining := cid.NewSet()
requested := cid.NewSet()
remLk := sync.Mutex{}

go func() {
for {
select {
case <-ctx.Done():
return
case <-sessctx.Done():
return
case k, ok := <-keys:
if !ok {
return
}
if k.IsAdd() {
remLk.Lock()
remaining.Add(k.Key())
requested.Add(k.Key())
remLk.Unlock()
select {
case <-ctx.Done():
return
case <-sessctx.Done():
return
case keyCh <- k:
}
log.Debugw("Bitswap.GetBlockRequest.Start", "cid", k)
want(ctx, []cid.Cid{k.Key()})
} else {
remLk.Lock()
remaining.Remove(k.Key())
remLk.Unlock()
cwants([]cid.Cid{k.Key()})
select {
case <-ctx.Done():
return
case <-sessctx.Done():
return
case keyCh <- k:
}
}
}
}
}()
go func() {
// Clean up before exiting this function, and call the cancel function on
// any remaining keys
defer func() {
cancel()
close(out)
// can't just defer this call on its own, arguments are resolved *when* the defer is created
remLk.Lock()
cwants(remaining.Keys())
remLk.Unlock()
}()

for {
select {
case <-ctx.Done():
return
case <-sessctx.Done():
return
case blk, ok := <-blockCh:
if !ok {
return
}
remLk.Lock()
remaining.Remove(blk.Cid())
remLk.Unlock()
select {
case out <- blk:
case <-ctx.Done():
return
case <-sessctx.Done():
return
}
}
}
}()
return out, nil
}

// Listens for incoming blocks, passing them to the out channel.
// If the context is cancelled or the incoming channel closes, calls cfun with
// any keys corresponding to blocks that were never received.
Expand Down
93 changes: 91 additions & 2 deletions client/internal/notifications/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package notifications

import (
"context"
"sync"

pubsub "github.com/cskr/pubsub"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
"sync"
)

const bufferSize = 16
Expand Down Expand Up @@ -130,6 +129,96 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Bl
return blocksCh
}

type AddRemoveCid interface {
IsAdd() bool
Key() cid.Cid
}

// SubscribeModifier returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after receiving the blocks
// corresponding to |keys|.
func (ps *impl) SubscribeModifier(ctx context.Context, addRmKeysCh <-chan AddRemoveCid, initialKeys ...cid.Cid) <-chan blocks.Block {
blocksCh := make(chan blocks.Block)
valuesCh := make(chan interface{})

// prevent shutdown
ps.lk.RLock()
defer ps.lk.RUnlock()

select {
case <-ps.closed:
close(blocksCh)
return blocksCh
default:
}

ctx, cancel := context.WithCancel(ctx)

// AddSubOnceEach listens for each key in the list, and closes the channel
// once all keys have been received
ps.wrapped.AddSubOnceEach(valuesCh, append([]string{"waitUntilEnd"}, toStrings(initialKeys)...)...)
go func() {
for {
select {
case <-ctx.Done():
return
case <-ps.closed:
case chg, ok := <-addRmKeysCh:
if !ok {
return
}
if chg.IsAdd() {
ps.wrapped.AddSubOnceEach(valuesCh, chg.Key().KeyString())
} else {
ps.wrapped.Unsub(valuesCh, chg.Key().KeyString())
}
}
}
}()
go func() {
defer func() {
close(blocksCh)
cancel()

ps.lk.RLock()
defer ps.lk.RUnlock()
// Don't touch the pubsub instance if we're
// already closed.
select {
case <-ps.closed:
return
default:
}

ps.wrapped.Unsub(valuesCh)
}()

for {
select {
case <-ctx.Done():
return
case <-ps.closed:
case val, ok := <-valuesCh:
if !ok {
return
}
block, ok := val.(blocks.Block)
if !ok {
return
}
select {
case <-ctx.Done():
return
case blocksCh <- block: // continue
case <-ps.closed:
}
}
}
}()

return blocksCh
}

func toStrings(keys []cid.Cid) []string {
strs := make([]string, 0, len(keys))
for _, key := range keys {
Expand Down
25 changes: 25 additions & 0 deletions client/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package session

import (
"context"
"github.com/ipfs/go-bitswap/client/sessioniface"
"time"

"github.com/ipfs/go-bitswap/client/internal"
Expand Down Expand Up @@ -256,6 +257,30 @@ func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.
)
}

// GetBlocksCh fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocksCh(ctx context.Context, keys <-chan sessioniface.AddRemoveCid) (<-chan blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Session.GetBlocks")
defer span.End()

return bsgetter.AsyncGetBlocksCh(ctx, s.ctx, keys, s.notif,
func(ctx context.Context, keys []cid.Cid) {
select {
case s.incoming <- op{op: opWant, keys: keys}:
case <-ctx.Done():
case <-s.ctx.Done():
}
},
func(keys []cid.Cid) {
select {
case s.incoming <- op{op: opCancel, keys: keys}:
case <-s.ctx.Done():
}
},
)
}

// SetBaseTickDelay changes the rate at which ticks happen.
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
select {
Expand Down
18 changes: 18 additions & 0 deletions client/sessioniface/fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package sessioniface

import (
"context"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
)

type AddRemoveCid interface {
IsAdd() bool
Key() cid.Cid
}

type ChannelFetcher interface {
exchange.Fetcher
GetBlocksCh(ctx context.Context, keys <-chan AddRemoveCid) (<-chan blocks.Block, error)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this would be more ergonomic to work against with separate add and remove channels of cid.Cid's directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a bad idea since if a CID is added and removed at a similar time they could come out of order which would result in a state the caller did not intend (e.g. a CID being requested they thought they'd removed, or a CID being removed they'd thought had been requested).

You could hide the calls behind a single object with multiple functions (option 2 here #593 (comment)) since mutexes could guard you. It does come with some complexity and extra locking though

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point! it's a shame golang's generics don't make it easier to specify at type around AddRemoveCid and we're stuck with accessor functions

}