Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP retrieval proposal #747

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package bitswap implements the IPFS exchange interface with the BitSwap
// Package client implements the IPFS exchange interface with the BitSwap
// bilateral exchange protocol.
package client

Expand Down Expand Up @@ -191,7 +191,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr

sim := bssim.New()
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory, network.Self())
pm := bspm.New(ctx, peerQueueFactory)

if bs.providerFinder != nil && bs.defaultProviderQueryManager {
// network can do dialing.
Expand Down Expand Up @@ -232,7 +232,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr
return bssession.New(sessctx, sessmgr, id, spm, sessionProvFinder, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(id, network.ConnectionManager())
return bsspm.New(id, network)
}
notif := notifications.New()
sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())
Expand Down
5 changes: 1 addition & 4 deletions bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,17 @@ type PeerManager struct {
psLk sync.RWMutex
sessions map[uint64]Session
peerSessions map[peer.ID]map[uint64]struct{}

self peer.ID
}

// New creates a new PeerManager, given a context and a peerQueueFactory.
func New(ctx context.Context, createPeerQueue PeerQueueFactory, self peer.ID) *PeerManager {
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
wantGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge()
wantBlockGauge := metrics.NewCtx(ctx, "want_blocks_total", "Number of want-blocks in wantlist.").Gauge()
return &PeerManager{
peerQueues: make(map[peer.ID]PeerQueue),
pwm: newPeerWantManager(wantGauge, wantBlockGauge),
createPeerQueue: createPeerQueue,
ctx: ctx,
self: self,

sessions: make(map[uint64]Session),
peerSessions: make(map[peer.ID]map[uint64]struct{}),
Expand Down
14 changes: 14 additions & 0 deletions bitswap/network/bsnet/bsnet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package bsnet

import "github.com/ipfs/boxo/bitswap/network/bsnet/internal"

var (
// ProtocolBitswapNoVers is equivalent to the legacy bitswap protocol
ProtocolBitswapNoVers = internal.ProtocolBitswapNoVers
// ProtocolBitswapOneZero is the prefix for the legacy bitswap protocol
ProtocolBitswapOneZero = internal.ProtocolBitswapOneZero
// ProtocolBitswapOneOne is the prefix for version 1.1.0
ProtocolBitswapOneOne = internal.ProtocolBitswapOneOne
// ProtocolBitswap is the current version of the bitswap protocol: 1.2.0
ProtocolBitswap = internal.ProtocolBitswap
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package network
package bsnet

import (
"context"
Expand All @@ -9,10 +9,10 @@
"time"

bsmsg "github.com/ipfs/boxo/bitswap/message"
"github.com/ipfs/boxo/bitswap/network/internal"
iface "github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/network/bsnet/internal"

logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -23,7 +23,7 @@
"github.com/multiformats/go-multistream"
)

var log = logging.Logger("bitswap/network")
var log = logging.Logger("bitswap/bsnet")

var (
maxSendTimeout = 2 * time.Minute
Expand All @@ -33,7 +33,7 @@
)

// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host.
func NewFromIpfsHost(host host.Host, opts ...NetOpt) BitSwapNetwork {
func NewFromIpfsHost(host host.Host, opts ...NetOpt) iface.BitSwapNetwork {
s := processSettings(opts...)

bitswapNetwork := impl{
Expand Down Expand Up @@ -66,10 +66,10 @@
type impl struct {
// NOTE: Stats must be at the top of the heap allocation to ensure 64bit
// alignment.
stats Stats
stats iface.Stats

host host.Host
connectEvtMgr *connectEventManager
connectEvtMgr *iface.ConnectEventManager

protocolBitswapNoVers protocol.ID
protocolBitswapOneZero protocol.ID
Expand All @@ -79,7 +79,7 @@
supportedProtocols []protocol.ID

// inbound messages from the network are forwarded to the receiver
receivers []Receiver
receivers []iface.Receiver
}

// interfaceWrapper is concrete type that wraps an interface. Necessary because
Expand Down Expand Up @@ -107,9 +107,10 @@

type streamMessageSender struct {
to peer.ID
stream atomicInterface[network.Stream]

Check failure on line 110 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go this)

other declaration of stream

Check failure on line 110 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-check / All

other declaration of stream

Check failure on line 110 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-check / All

other declaration of stream

Check failure on line 110 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go next)

other declaration of stream

Check failure on line 110 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go next)

other declaration of stream

Check failure on line 110 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go this)

other declaration of stream
stream network.Stream

Check failure on line 111 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go this)

stream redeclared

Check failure on line 111 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-check / All

stream redeclared

Check failure on line 111 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-check / All

stream redeclared

Check failure on line 111 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go next)

stream redeclared

Check failure on line 111 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go next)

stream redeclared

Check failure on line 111 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go this)

stream redeclared
bsnet *impl
opts *MessageSenderOpts
opts *iface.MessageSenderOpts
}

type HasContext interface {
Expand Down Expand Up @@ -317,7 +318,7 @@
return nil
}

func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *MessageSenderOpts) (MessageSender, error) {
func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *iface.MessageSenderOpts) (iface.MessageSender, error) {
opts = setDefaultOpts(opts)

sender := &streamMessageSender{
Expand All @@ -337,7 +338,7 @@
return sender, nil
}

func setDefaultOpts(opts *MessageSenderOpts) *MessageSenderOpts {
func setDefaultOpts(opts *iface.MessageSenderOpts) *iface.MessageSenderOpts {
copy := *opts
if opts.MaxRetries == 0 {
copy.MaxRetries = 3
Expand Down Expand Up @@ -385,14 +386,14 @@
return bsnet.host.NewStream(ctx, p, bsnet.supportedProtocols...)
}

func (bsnet *impl) Start(r ...Receiver) {
func (bsnet *impl) Start(r ...iface.Receiver) {
bsnet.receivers = r
{
connectionListeners := make([]ConnectionListener, len(r))
connectionListeners := make([]iface.ConnectionListener, len(r))
for i, v := range r {
connectionListeners[i] = v
}
bsnet.connectEvtMgr = newConnectEventManager(connectionListeners...)
bsnet.connectEvtMgr = iface.NewConnectEventManager(connectionListeners...)
}
for _, proto := range bsnet.supportedProtocols {
bsnet.host.SetStreamHandler(proto, bsnet.handleNewStream)
Expand Down Expand Up @@ -451,12 +452,36 @@
}
}

func (bsnet *impl) ConnectionManager() connmgr.ConnManager {
return bsnet.host.ConnManager()
func (bsnet *impl) TagPeer(p peer.ID, tag string, w int) {
if bsnet.host == nil {
return
}
bsnet.host.ConnManager().TagPeer(p, tag, w)
}

func (bsnet *impl) UntagPeer(p peer.ID, tag string) {
if bsnet.host == nil {
return
}
bsnet.host.ConnManager().UntagPeer(p, tag)
}

func (bsnet *impl) Protect(p peer.ID, tag string) {
if bsnet.host == nil {
return
}
bsnet.host.ConnManager().Protect(p, tag)
}

func (bsnet *impl) Unprotect(p peer.ID, tag string) bool {
if bsnet.host == nil {
return false
}
return bsnet.host.ConnManager().Unprotect(p, tag)
}

func (bsnet *impl) Stats() Stats {
return Stats{
func (bsnet *impl) Stats() iface.Stats {
return iface.Stats{
MessagesRecvd: atomic.LoadUint64(&bsnet.stats.MessagesRecvd),
MessagesSent: atomic.LoadUint64(&bsnet.stats.MessagesSent),
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package network_test
package bsnet_test

import (
"context"
Expand All @@ -11,7 +11,7 @@ import (
bsmsg "github.com/ipfs/boxo/bitswap/message"
pb "github.com/ipfs/boxo/bitswap/message/pb"
bsnet "github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/network/internal"
"github.com/ipfs/boxo/bitswap/network/bsnet/internal"
tn "github.com/ipfs/boxo/bitswap/testnet"
"github.com/ipfs/go-test/random"
tnet "github.com/libp2p/go-libp2p-testing/net"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package network
package bsnet

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package network
package bsnet

import "github.com/libp2p/go-libp2p/core/protocol"

Expand Down
29 changes: 16 additions & 13 deletions bitswap/network/connecteventmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"sync"

"github.com/gammazero/deque"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/peer"
)

var log = logging.Logger("bitswap/connevtman")

type ConnectionListener interface {
PeerConnected(peer.ID)
PeerDisconnected(peer.ID)
Expand All @@ -20,7 +23,7 @@ const (
stateUnresponsive
)

type connectEventManager struct {
type ConnectEventManager struct {
connListeners []ConnectionListener
lk sync.RWMutex
cond sync.Cond
Expand All @@ -36,8 +39,8 @@ type peerState struct {
pending bool
}

func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager {
evtManager := &connectEventManager{
func NewConnectEventManager(connListeners ...ConnectionListener) *ConnectEventManager {
evtManager := &ConnectEventManager{
connListeners: connListeners,
peers: make(map[peer.ID]*peerState),
done: make(chan struct{}),
Expand All @@ -46,11 +49,11 @@ func newConnectEventManager(connListeners ...ConnectionListener) *connectEventMa
return evtManager
}

func (c *connectEventManager) Start() {
func (c *ConnectEventManager) Start() {
go c.worker()
}

func (c *connectEventManager) Stop() {
func (c *ConnectEventManager) Stop() {
c.lk.Lock()
c.stop = true
c.lk.Unlock()
Expand All @@ -59,15 +62,15 @@ func (c *connectEventManager) Stop() {
<-c.done
}

func (c *connectEventManager) getState(p peer.ID) state {
func (c *ConnectEventManager) getState(p peer.ID) state {
if state, ok := c.peers[p]; ok {
return state.newState
} else {
return stateDisconnected
}
}

func (c *connectEventManager) setState(p peer.ID, newState state) {
func (c *ConnectEventManager) setState(p peer.ID, newState state) {
state, ok := c.peers[p]
if !ok {
state = new(peerState)
Expand All @@ -83,14 +86,14 @@ func (c *connectEventManager) setState(p peer.ID, newState state) {

// Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the
// connect event manager has been stopped.
func (c *connectEventManager) waitChange() bool {
func (c *ConnectEventManager) waitChange() bool {
for !c.stop && c.changeQueue.Len() == 0 {
c.cond.Wait()
}
return !c.stop
}

func (c *connectEventManager) worker() {
func (c *ConnectEventManager) worker() {
c.lk.Lock()
defer c.lk.Unlock()
defer close(c.done)
Expand Down Expand Up @@ -145,7 +148,7 @@ func (c *connectEventManager) worker() {
}

// Called whenever we receive a new connection. May be called many times.
func (c *connectEventManager) Connected(p peer.ID) {
func (c *ConnectEventManager) Connected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

Expand All @@ -158,7 +161,7 @@ func (c *connectEventManager) Connected(p peer.ID) {
}

// Called when we drop the final connection to a peer.
func (c *connectEventManager) Disconnected(p peer.ID) {
func (c *ConnectEventManager) Disconnected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

Expand All @@ -172,7 +175,7 @@ func (c *connectEventManager) Disconnected(p peer.ID) {
}

// Called whenever a peer is unresponsive.
func (c *connectEventManager) MarkUnresponsive(p peer.ID) {
func (c *ConnectEventManager) MarkUnresponsive(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

Expand All @@ -191,7 +194,7 @@ func (c *connectEventManager) MarkUnresponsive(p peer.ID) {
// - When not connected, we ignore this call. Unfortunately, a peer may disconnect before we process
//
// the "on message" event, so we can't treat this as evidence of a connection.
func (c *connectEventManager) OnMessage(p peer.ID) {
func (c *ConnectEventManager) OnMessage(p peer.ID) {
c.lk.RLock()
unresponsive := c.getState(p) == stateUnresponsive
c.lk.RUnlock()
Expand Down
Loading
Loading