diff --git a/bitswap/client/client.go b/bitswap/client/client.go index 2e83949cb..569564559 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -1,4 +1,4 @@ -// Package bitswap implements the IPFS exchange interface with the BitSwap +// Package client implements the IPFS exchange interface with the BitSwap // bilateral exchange protocol. package client @@ -191,7 +191,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr sim := bssim.New() bpm := bsbpm.New() - pm := bspm.New(ctx, peerQueueFactory, network.Self()) + pm := bspm.New(ctx, peerQueueFactory) if bs.providerFinder != nil && bs.defaultProviderQueryManager { // network can do dialing. @@ -232,7 +232,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr return bssession.New(sessctx, sessmgr, id, spm, sessionProvFinder, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self) } sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager { - return bsspm.New(id, network.ConnectionManager()) + return bsspm.New(id, network) } notif := notifications.New() sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self()) diff --git a/bitswap/client/internal/peermanager/peermanager.go b/bitswap/client/internal/peermanager/peermanager.go index 4634ff164..4123aea3b 100644 --- a/bitswap/client/internal/peermanager/peermanager.go +++ b/bitswap/client/internal/peermanager/peermanager.go @@ -45,12 +45,10 @@ type PeerManager struct { psLk sync.RWMutex sessions map[uint64]Session peerSessions map[peer.ID]map[uint64]struct{} - - self peer.ID } // New creates a new PeerManager, given a context and a peerQueueFactory. -func New(ctx context.Context, createPeerQueue PeerQueueFactory, self peer.ID) *PeerManager { +func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager { wantGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge() wantBlockGauge := metrics.NewCtx(ctx, "want_blocks_total", "Number of want-blocks in wantlist.").Gauge() return &PeerManager{ @@ -58,7 +56,6 @@ func New(ctx context.Context, createPeerQueue PeerQueueFactory, self peer.ID) *P pwm: newPeerWantManager(wantGauge, wantBlockGauge), createPeerQueue: createPeerQueue, ctx: ctx, - self: self, sessions: make(map[uint64]Session), peerSessions: make(map[peer.ID]map[uint64]struct{}), diff --git a/bitswap/network/bsnet/bsnet.go b/bitswap/network/bsnet/bsnet.go new file mode 100644 index 000000000..546f2c0b7 --- /dev/null +++ b/bitswap/network/bsnet/bsnet.go @@ -0,0 +1,14 @@ +package bsnet + +import "github.com/ipfs/boxo/bitswap/network/bsnet/internal" + +var ( + // ProtocolBitswapNoVers is equivalent to the legacy bitswap protocol + ProtocolBitswapNoVers = internal.ProtocolBitswapNoVers + // ProtocolBitswapOneZero is the prefix for the legacy bitswap protocol + ProtocolBitswapOneZero = internal.ProtocolBitswapOneZero + // ProtocolBitswapOneOne is the prefix for version 1.1.0 + ProtocolBitswapOneOne = internal.ProtocolBitswapOneOne + // ProtocolBitswap is the current version of the bitswap protocol: 1.2.0 + ProtocolBitswap = internal.ProtocolBitswap +) diff --git a/bitswap/network/internal/default.go b/bitswap/network/bsnet/internal/default.go similarity index 100% rename from bitswap/network/internal/default.go rename to bitswap/network/bsnet/internal/default.go diff --git a/bitswap/network/ipfs_impl.go b/bitswap/network/bsnet/ipfs_impl.go similarity index 89% rename from bitswap/network/ipfs_impl.go rename to bitswap/network/bsnet/ipfs_impl.go index 4a60aaf6b..30678c92f 100644 --- a/bitswap/network/ipfs_impl.go +++ b/bitswap/network/bsnet/ipfs_impl.go @@ -1,4 +1,4 @@ -package network +package bsnet import ( "context" @@ -9,10 +9,10 @@ import ( "time" bsmsg "github.com/ipfs/boxo/bitswap/message" - "github.com/ipfs/boxo/bitswap/network/internal" + iface "github.com/ipfs/boxo/bitswap/network" + "github.com/ipfs/boxo/bitswap/network/bsnet/internal" logging "github.com/ipfs/go-log/v2" - "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -23,7 +23,7 @@ import ( "github.com/multiformats/go-multistream" ) -var log = logging.Logger("bitswap/network") +var log = logging.Logger("bitswap/bsnet") var ( maxSendTimeout = 2 * time.Minute @@ -33,7 +33,7 @@ var ( ) // NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host. -func NewFromIpfsHost(host host.Host, opts ...NetOpt) BitSwapNetwork { +func NewFromIpfsHost(host host.Host, opts ...NetOpt) iface.BitSwapNetwork { s := processSettings(opts...) bitswapNetwork := impl{ @@ -66,10 +66,10 @@ func processSettings(opts ...NetOpt) Settings { type impl struct { // NOTE: Stats must be at the top of the heap allocation to ensure 64bit // alignment. - stats Stats + stats iface.Stats host host.Host - connectEvtMgr *connectEventManager + connectEvtMgr *iface.ConnectEventManager protocolBitswapNoVers protocol.ID protocolBitswapOneZero protocol.ID @@ -79,7 +79,7 @@ type impl struct { supportedProtocols []protocol.ID // inbound messages from the network are forwarded to the receiver - receivers []Receiver + receivers []iface.Receiver } // interfaceWrapper is concrete type that wraps an interface. Necessary because @@ -108,8 +108,9 @@ func (a *atomicInterface[T]) Store(v T) { type streamMessageSender struct { to peer.ID stream atomicInterface[network.Stream] + stream network.Stream bsnet *impl - opts *MessageSenderOpts + opts *iface.MessageSenderOpts } type HasContext interface { @@ -317,7 +318,7 @@ func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg. return nil } -func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *MessageSenderOpts) (MessageSender, error) { +func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *iface.MessageSenderOpts) (iface.MessageSender, error) { opts = setDefaultOpts(opts) sender := &streamMessageSender{ @@ -337,7 +338,7 @@ func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *Messag return sender, nil } -func setDefaultOpts(opts *MessageSenderOpts) *MessageSenderOpts { +func setDefaultOpts(opts *iface.MessageSenderOpts) *iface.MessageSenderOpts { copy := *opts if opts.MaxRetries == 0 { copy.MaxRetries = 3 @@ -385,14 +386,14 @@ func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stre return bsnet.host.NewStream(ctx, p, bsnet.supportedProtocols...) } -func (bsnet *impl) Start(r ...Receiver) { +func (bsnet *impl) Start(r ...iface.Receiver) { bsnet.receivers = r { - connectionListeners := make([]ConnectionListener, len(r)) + connectionListeners := make([]iface.ConnectionListener, len(r)) for i, v := range r { connectionListeners[i] = v } - bsnet.connectEvtMgr = newConnectEventManager(connectionListeners...) + bsnet.connectEvtMgr = iface.NewConnectEventManager(connectionListeners...) } for _, proto := range bsnet.supportedProtocols { bsnet.host.SetStreamHandler(proto, bsnet.handleNewStream) @@ -451,12 +452,36 @@ func (bsnet *impl) handleNewStream(s network.Stream) { } } -func (bsnet *impl) ConnectionManager() connmgr.ConnManager { - return bsnet.host.ConnManager() +func (bsnet *impl) TagPeer(p peer.ID, tag string, w int) { + if bsnet.host == nil { + return + } + bsnet.host.ConnManager().TagPeer(p, tag, w) +} + +func (bsnet *impl) UntagPeer(p peer.ID, tag string) { + if bsnet.host == nil { + return + } + bsnet.host.ConnManager().UntagPeer(p, tag) +} + +func (bsnet *impl) Protect(p peer.ID, tag string) { + if bsnet.host == nil { + return + } + bsnet.host.ConnManager().Protect(p, tag) +} + +func (bsnet *impl) Unprotect(p peer.ID, tag string) bool { + if bsnet.host == nil { + return false + } + return bsnet.host.ConnManager().Unprotect(p, tag) } -func (bsnet *impl) Stats() Stats { - return Stats{ +func (bsnet *impl) Stats() iface.Stats { + return iface.Stats{ MessagesRecvd: atomic.LoadUint64(&bsnet.stats.MessagesRecvd), MessagesSent: atomic.LoadUint64(&bsnet.stats.MessagesSent), } diff --git a/bitswap/network/ipfs_impl_test.go b/bitswap/network/bsnet/ipfs_impl_test.go similarity index 99% rename from bitswap/network/ipfs_impl_test.go rename to bitswap/network/bsnet/ipfs_impl_test.go index bfba5709d..55bb06f38 100644 --- a/bitswap/network/ipfs_impl_test.go +++ b/bitswap/network/bsnet/ipfs_impl_test.go @@ -1,4 +1,4 @@ -package network_test +package bsnet_test import ( "context" @@ -11,7 +11,7 @@ import ( bsmsg "github.com/ipfs/boxo/bitswap/message" pb "github.com/ipfs/boxo/bitswap/message/pb" bsnet "github.com/ipfs/boxo/bitswap/network" - "github.com/ipfs/boxo/bitswap/network/internal" + "github.com/ipfs/boxo/bitswap/network/bsnet/internal" tn "github.com/ipfs/boxo/bitswap/testnet" "github.com/ipfs/go-test/random" tnet "github.com/libp2p/go-libp2p-testing/net" diff --git a/bitswap/network/ipfs_impl_timeout_test.go b/bitswap/network/bsnet/ipfs_impl_timeout_test.go similarity index 97% rename from bitswap/network/ipfs_impl_timeout_test.go rename to bitswap/network/bsnet/ipfs_impl_timeout_test.go index fdbe8e950..994804235 100644 --- a/bitswap/network/ipfs_impl_timeout_test.go +++ b/bitswap/network/bsnet/ipfs_impl_timeout_test.go @@ -1,4 +1,4 @@ -package network +package bsnet import ( "testing" diff --git a/bitswap/network/options.go b/bitswap/network/bsnet/options.go similarity index 96% rename from bitswap/network/options.go rename to bitswap/network/bsnet/options.go index 10d02e5e9..cbe355c0f 100644 --- a/bitswap/network/options.go +++ b/bitswap/network/bsnet/options.go @@ -1,4 +1,4 @@ -package network +package bsnet import "github.com/libp2p/go-libp2p/core/protocol" diff --git a/bitswap/network/connecteventmanager.go b/bitswap/network/connecteventmanager.go index bf3766089..440aa0489 100644 --- a/bitswap/network/connecteventmanager.go +++ b/bitswap/network/connecteventmanager.go @@ -4,9 +4,12 @@ import ( "sync" "github.com/gammazero/deque" + logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p/core/peer" ) +var log = logging.Logger("bitswap/connevtman") + type ConnectionListener interface { PeerConnected(peer.ID) PeerDisconnected(peer.ID) @@ -20,7 +23,7 @@ const ( stateUnresponsive ) -type connectEventManager struct { +type ConnectEventManager struct { connListeners []ConnectionListener lk sync.RWMutex cond sync.Cond @@ -36,8 +39,8 @@ type peerState struct { pending bool } -func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager { - evtManager := &connectEventManager{ +func NewConnectEventManager(connListeners ...ConnectionListener) *ConnectEventManager { + evtManager := &ConnectEventManager{ connListeners: connListeners, peers: make(map[peer.ID]*peerState), done: make(chan struct{}), @@ -46,11 +49,11 @@ func newConnectEventManager(connListeners ...ConnectionListener) *connectEventMa return evtManager } -func (c *connectEventManager) Start() { +func (c *ConnectEventManager) Start() { go c.worker() } -func (c *connectEventManager) Stop() { +func (c *ConnectEventManager) Stop() { c.lk.Lock() c.stop = true c.lk.Unlock() @@ -59,7 +62,7 @@ func (c *connectEventManager) Stop() { <-c.done } -func (c *connectEventManager) getState(p peer.ID) state { +func (c *ConnectEventManager) getState(p peer.ID) state { if state, ok := c.peers[p]; ok { return state.newState } else { @@ -67,7 +70,7 @@ func (c *connectEventManager) getState(p peer.ID) state { } } -func (c *connectEventManager) setState(p peer.ID, newState state) { +func (c *ConnectEventManager) setState(p peer.ID, newState state) { state, ok := c.peers[p] if !ok { state = new(peerState) @@ -83,14 +86,14 @@ func (c *connectEventManager) setState(p peer.ID, newState state) { // Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the // connect event manager has been stopped. -func (c *connectEventManager) waitChange() bool { +func (c *ConnectEventManager) waitChange() bool { for !c.stop && c.changeQueue.Len() == 0 { c.cond.Wait() } return !c.stop } -func (c *connectEventManager) worker() { +func (c *ConnectEventManager) worker() { c.lk.Lock() defer c.lk.Unlock() defer close(c.done) @@ -145,7 +148,7 @@ func (c *connectEventManager) worker() { } // Called whenever we receive a new connection. May be called many times. -func (c *connectEventManager) Connected(p peer.ID) { +func (c *ConnectEventManager) Connected(p peer.ID) { c.lk.Lock() defer c.lk.Unlock() @@ -158,7 +161,7 @@ func (c *connectEventManager) Connected(p peer.ID) { } // Called when we drop the final connection to a peer. -func (c *connectEventManager) Disconnected(p peer.ID) { +func (c *ConnectEventManager) Disconnected(p peer.ID) { c.lk.Lock() defer c.lk.Unlock() @@ -172,7 +175,7 @@ func (c *connectEventManager) Disconnected(p peer.ID) { } // Called whenever a peer is unresponsive. -func (c *connectEventManager) MarkUnresponsive(p peer.ID) { +func (c *ConnectEventManager) MarkUnresponsive(p peer.ID) { c.lk.Lock() defer c.lk.Unlock() @@ -191,7 +194,7 @@ func (c *connectEventManager) MarkUnresponsive(p peer.ID) { // - When not connected, we ignore this call. Unfortunately, a peer may disconnect before we process // // the "on message" event, so we can't treat this as evidence of a connection. -func (c *connectEventManager) OnMessage(p peer.ID) { +func (c *ConnectEventManager) OnMessage(p peer.ID) { c.lk.RLock() unresponsive := c.getState(p) == stateUnresponsive c.lk.RUnlock() diff --git a/bitswap/network/http_multiaddr.go b/bitswap/network/http_multiaddr.go new file mode 100644 index 000000000..226854bc7 --- /dev/null +++ b/bitswap/network/http_multiaddr.go @@ -0,0 +1,90 @@ +package network + +import ( + "fmt" + "net/url" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" +) + +// ExtractHTTPAddress extracts the HTTP schema+host+port from a multiaddress and returns a *url.URL. +func ExtractHTTPAddress(ma multiaddr.Multiaddr) (*url.URL, error) { + components := ma.Protocols() + var host, port, schema string + + for _, comp := range components { + switch comp.Name { + case "dns", "dns4", "dns6", "ip4", "ip6": + hostVal, err := ma.ValueForProtocol(comp.Code) + if err != nil { + return nil, fmt.Errorf("failed to extract host: %w", err) + } + host = hostVal + case "tcp", "udp": + portVal, err := ma.ValueForProtocol(comp.Code) + if err != nil { + return nil, fmt.Errorf("failed to extract port: %w", err) + } + port = portVal + case "http", "https": + schema = comp.Name + } + } + + if host == "" || port == "" || schema == "" { + return nil, fmt.Errorf("multiaddress is missing required components (host/port)") + } + + // Construct the URL object + address := fmt.Sprintf("%s://%s:%s", schema, host, port) + parsedURL, err := url.Parse(address) + if err != nil { + return nil, fmt.Errorf("failed to parse URL: %w", err) + } + + return parsedURL, nil +} + +// ExtractURLsFromPeer extracts all HTTP schema+host+port addresses as *url.URL from a peer.AddrInfo object. +func ExtractURLsFromPeer(info peer.AddrInfo) []*url.URL { + var addresses []*url.URL + + for _, addr := range info.Addrs { + httpAddress, err := ExtractHTTPAddress(addr) + if err != nil { + // Skip invalid or non-HTTP addresses but continue with others + continue + } + addresses = append(addresses, httpAddress) + } + + return addresses +} + +// SplitHTTPAddrs splits a peer.AddrInfo into two: one containing HTTP/HTTPS addresses, and the other containing the rest. +func SplitHTTPAddrs(pi peer.AddrInfo) (httpPeer peer.AddrInfo, otherPeer peer.AddrInfo) { + httpPeer.ID = pi.ID + otherPeer.ID = pi.ID + + for _, addr := range pi.Addrs { + if isHTTPAddress(addr) { + httpPeer.Addrs = append(httpPeer.Addrs, addr) + } else { + otherPeer.Addrs = append(otherPeer.Addrs, addr) + } + } + + return +} + +// isHTTPAddress checks if a multiaddress is an HTTP or HTTPS address. +func isHTTPAddress(ma multiaddr.Multiaddr) bool { + protocols := ma.Protocols() + for _, proto := range protocols { + if proto.Name == "http" || proto.Name == "https" { + return true + } + } + return false +} diff --git a/bitswap/network/http_multiaddr_test.go b/bitswap/network/http_multiaddr_test.go new file mode 100644 index 000000000..92e967e54 --- /dev/null +++ b/bitswap/network/http_multiaddr_test.go @@ -0,0 +1,167 @@ +package network + +import ( + "net/url" + "testing" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" +) + +func TestExtractHTTPAddress(t *testing.T) { + tests := []struct { + name string + maStr string + want *url.URL + expectErr bool + }{ + { + name: "Valid HTTP multiaddress with DNS", + maStr: "/dns4/example.com/tcp/8080/http", + want: &url.URL{ + Scheme: "http", + Host: "example.com:8080", + }, + expectErr: false, + }, + { + name: "Valid HTTPS multiaddress with DNS", + maStr: "/dns4/example.com/tcp/443/https", + want: &url.URL{ + Scheme: "https", + Host: "example.com:443", + }, + expectErr: false, + }, + { + name: "Valid WSS multiaddress with DNS", + maStr: "/dns4/example.com/tcp/443/wss", + want: nil, + expectErr: true, + }, + { + name: "Valid HTTP multiaddress with IP4", + maStr: "/ip4/127.0.0.1/tcp/8080/http", + want: &url.URL{ + Scheme: "http", + Host: "127.0.0.1:8080", + }, + expectErr: false, + }, + { + name: "Missing port", + maStr: "/dns4/example.com/http", + want: nil, + expectErr: true, + }, + { + name: "Invalid multiaddress", + maStr: "/dns4/example.com/tcp/abc/http", + want: nil, + expectErr: true, + }, + { + name: "Unsupported protocol", + maStr: "/unix/tmp/socket", + want: nil, + expectErr: true, + }, + { + name: "Valid HTTP multiaddress with IP6", + maStr: "/ip6/::1/tcp/8080/http", + want: &url.URL{ + Scheme: "http", + Host: "::1:8080", + }, + expectErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ma, err := multiaddr.NewMultiaddr(tt.maStr) + if err != nil { + if !tt.expectErr { + t.Fatalf("failed to create multiaddress: %v", err) + } + return + } + + got, err := ExtractHTTPAddress(ma) + if (err != nil) != tt.expectErr { + t.Errorf("got: %s", got) + t.Errorf("ExtractHTTPAddress() error = %v, wantErr %v", err, tt.expectErr) + return + } + + if tt.want != nil && (got == nil || got.String() != tt.want.String()) { + t.Errorf("ExtractHTTPAddress() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestExtractHTTPAddressesFromPeer(t *testing.T) { + tests := []struct { + name string + peerInfo *peer.AddrInfo + want []*url.URL + }{ + { + name: "Valid peer with multiple addresses", + peerInfo: &peer.AddrInfo{ + ID: "12D3KooWQrKv5jtT5anTrKjwgb5dkt7DYHhTT9JzLs7dABZ1mkTf", + Addrs: []multiaddr.Multiaddr{ + multiaddr.StringCast("/dns4/example.com/tcp/8080/http"), + multiaddr.StringCast("/ip4/127.0.0.1/tcp/8081/http"), + multiaddr.StringCast("/ip4/127.0.0.1/tcp/9000"), // Non-HTTP + }, + }, + want: []*url.URL{ + { + Scheme: "http", + Host: "example.com:8080", + }, + { + Scheme: "http", + Host: "127.0.0.1:8081", + }, + }, + }, + { + name: "No valid HTTP addresses in peer", + peerInfo: &peer.AddrInfo{ + ID: "12D3KooWQrKv5jtT5anTrKjwgb5dkt7DYHhTT9JzLs7dABZ1mkTf", + Addrs: []multiaddr.Multiaddr{ + multiaddr.StringCast("/ip4/127.0.0.1/tcp/9000"), // Non-HTTP + }, + }, + want: nil, + }, + { + name: "Empty peer info", + peerInfo: &peer.AddrInfo{ + ID: "12D3KooWQrKv5jtT5anTrKjwgb5dkt7DYHhTT9JzLs7dABZ1mkTf", + Addrs: []multiaddr.Multiaddr{}, + }, + want: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := ExtractHTTPAddressesFromPeer(*tt.peerInfo) + if len(got) != len(tt.want) { + t.Errorf("ExtractHTTPAddressesFromPeer() = %v, want %v", got, tt.want) + return + } + + // Compare URLs + for i := range got { + if got[i].String() != tt.want[i].String() { + t.Errorf("ExtractHTTPAddressesFromPeer() URL at index %d = %v, want %v", i, got[i], tt.want[i]) + } + } + }) + } +} diff --git a/bitswap/network/httpnet/httpnet.go b/bitswap/network/httpnet/httpnet.go new file mode 100644 index 000000000..0c80cd76a --- /dev/null +++ b/bitswap/network/httpnet/httpnet.go @@ -0,0 +1,693 @@ +package httpnet + +import ( + "context" + "errors" + "fmt" + "io" + "math/rand/v2" + "net" + "net/http" + "net/url" + "reflect" + "runtime/debug" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + bsmsg "github.com/ipfs/boxo/bitswap/message" + pb "github.com/ipfs/boxo/bitswap/message/pb" + "github.com/ipfs/boxo/bitswap/network" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" + probing "github.com/prometheus-community/pro-bing" + "go.uber.org/multierr" +) + +var log = logging.Logger("httpnet") + +var ( // todo + maxSendTimeout = 2 * time.Minute + minSendTimeout = 10 * time.Second + sendLatency = 2 * time.Second + minSendRate = (100 * 1000) / 8 // 100kbit/s +) + +var ErrNoHTTPAddresses = errors.New("AddrInfo does not contain any valid HTTP addresses") +var ErrNoSuccess = errors.New("none of the peer HTTP endpoints responded successfully to request") + +var _ network.BitSwapNetwork = (*httpnet)(nil) + +type ctxKey string + +const pidCtxKey ctxKey = "peerid" + +// Defaults for the different options +var ( + DefaultMaxBlockSize int64 = 2 << 20 // 2MiB. + DefaultUserAgent = defaultUserAgent() // Usually will result in a "boxo@commitID" + DefaultCooldownPeriod = 10 * time.Second +) + +type Option func(net *httpnet) + +func WithUserAgent(agent string) Option { + return func(net *httpnet) { + net.userAgent = agent + } +} + +func WithMaxBlockSize(size int64) Option { + return func(net *httpnet) { + net.maxBlockSize = size + } +} + +// WithDefaultCooldownPeriod specifies how long we will avoid making requests +// to a peer URL after it errors, unless Retry-Header specifies a different +// amount of time. +func WithDefaultCooldownPeriod(d time.Duration) Option { + return func(net *httpnet) { + net.defaultCooldownPeriod = d + } +} + +type httpnet struct { + // NOTE: Stats must be at the top of the heap allocation to ensure 64bit + // alignment. + stats network.Stats + + host host.Host + client *http.Client + dialer *dialer + + // inbound messages from the network are forwarded to the receiver + receivers []network.Receiver + connEvtMgr *network.ConnectEventManager + + latMapLock sync.RWMutex + latMap map[peer.ID]time.Duration + + cooldownURLsLock sync.RWMutex + cooldownURLs map[string]time.Time + + userAgent string + maxBlockSize int64 + defaultCooldownPeriod time.Duration +} + +// wrap a connection to detect connect/disconnect events. +// and also to re-use existing ones. +type conn struct { + pid peer.ID + net.Conn + connEvtMgr *network.ConnectEventManager +} + +func (c *conn) Read(b []byte) (int, error) { + n, err := c.Conn.Read(b) + if err != nil { + c.connEvtMgr.Disconnected(c.pid) + } + return n, err +} + +func (c *conn) Write(b []byte) (int, error) { + n, err := c.Conn.Write(b) + if err != nil { + c.connEvtMgr.Disconnected(c.pid) + } + return n, err +} + +func (c *conn) Close() error { + err := c.Conn.Close() + c.connEvtMgr.Disconnected(c.pid) + return err +} + +type dialer struct { + dialer *net.Dialer + connEvtMgr *network.ConnectEventManager +} + +// DialContext dials using the dialer but calls back on the connection event +// manager PeerConnected() on success. +func (d *dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { + pid := ctx.Value(pidCtxKey).(peer.ID) + cn, err := d.dialer.DialContext(ctx, network, address) + if err != nil { + d.connEvtMgr.Disconnected(pid) + return nil, err + } + d.connEvtMgr.Connected(pid) + return &conn{ + pid: pid, + Conn: cn, + connEvtMgr: d.connEvtMgr, + }, err +} + +// New returns a BitSwapNetwork supported by underlying IPFS host. +func New(host host.Host, opts ...Option) network.BitSwapNetwork { + netdialer := &net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + } + dialer := &dialer{ + dialer: netdialer, + } + c := &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: dialer.DialContext, // maybe breaks wasm + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + } + + net := httpnet{ + host: host, + client: c, + dialer: dialer, + latMap: make(map[peer.ID]time.Duration), + cooldownURLs: make(map[string]time.Time), + userAgent: defaultUserAgent(), + maxBlockSize: DefaultMaxBlockSize, + } + + for _, opt := range opts { + opt(&net) + } + + return &net +} + +func (ht *httpnet) Start(receivers ...network.Receiver) { + ht.receivers = receivers + connectionListeners := make([]network.ConnectionListener, len(receivers)) + for i, v := range receivers { + connectionListeners[i] = v + } + ht.connEvtMgr = network.NewConnectEventManager(connectionListeners...) + ht.dialer.connEvtMgr = ht.connEvtMgr + + ht.connEvtMgr.Start() +} + +func (ht *httpnet) Stop() { + ht.connEvtMgr.Stop() +} + +func (ht *httpnet) Ping(ctx context.Context, p peer.ID) ping.Result { + log.Debugf("Ping: %s", p) + + pi := ht.host.Peerstore().PeerInfo(p) + urls := network.ExtractURLsFromPeer(pi) + if len(urls) == 0 { + return ping.Result{ + Error: ErrNoHTTPAddresses, + } + } + + // pick the first one. In general there should not be more than one + // url per peer. FIXME: right? + pingURL := urls[0] + + pinger, err := probing.NewPinger(pingURL.Host) + if err != nil { + return ping.Result{ + RTT: 0, + Error: err, + } + } + pinger.Count = 1 + + err = pinger.RunWithContext(ctx) + if err != nil { + return ping.Result{ + RTT: 0, + Error: err, + } + } + lat := pinger.Statistics().AvgRtt + ht.recordLatency(p, lat) + return ping.Result{ + RTT: lat, + Error: nil, + } + +} + +// TODO +func (ht *httpnet) Latency(p peer.ID) time.Duration { + var lat time.Duration + ht.latMapLock.RLock() + { + lat = ht.latMap[p] + } + ht.latMapLock.RUnlock() + + // Add one more latency measurement every time latency is requested + // since we don't do it from anywhere else. + // FIXME: too much too often? + go func() { + ht.Ping(context.Background(), p) + }() + + return lat +} + +// similar to LatencyIWMA from peerstore. +func (ht *httpnet) recordLatency(p peer.ID, next time.Duration) { + nextf := float64(next) + s := 0.1 + ht.latMapLock.Lock() + { + ewma, found := ht.latMap[p] + ewmaf := float64(ewma) + if !found { + ht.latMap[p] = next // when no data, just take it as the mean. + } else { + nextf = ((1.0 - s) * ewmaf) + (s * nextf) + ht.latMap[p] = time.Duration(nextf) + } + } + ht.latMapLock.Unlock() +} + +func (ht *httpnet) isInCooldown(u *url.URL) bool { + ustr := u.String() + ht.cooldownURLsLock.RLock() + dl, ok := ht.cooldownURLs[ustr] + ht.cooldownURLsLock.RUnlock() + if !ok { + return false + } + if time.Now().After(dl) { + ht.cooldownURLsLock.Lock() + delete(ht.cooldownURLs, ustr) + ht.cooldownURLsLock.Unlock() + return false + } + return true +} + +func (ht *httpnet) setCooldownDuration(u *url.URL, d time.Duration) { + ht.cooldownURLsLock.Lock() + ht.cooldownURLs[u.String()] = time.Now().Add(d) + ht.cooldownURLsLock.Unlock() +} + +func (ht *httpnet) SendMessage(ctx context.Context, p peer.ID, msg bsmsg.BitSwapMessage) error { + log.Debugf("SendMessage: %s. %s", p, msg) + // todo opts + sender, err := ht.NewMessageSender(ctx, p, nil) + if err != nil { + return err + } + defer sender.Close() + return sender.SendMsg(ctx, msg) +} + +func (ht *httpnet) Self() peer.ID { + return ht.host.ID() +} + +func (ht *httpnet) Connect(ctx context.Context, p peer.AddrInfo) error { + log.Debugf("Connect: %s", p) + htaddrs, _ := network.SplitHTTPAddrs(p) + if len(htaddrs.Addrs) == 0 { + return nil + } + ht.host.Peerstore().AddAddrs(p.ID, htaddrs.Addrs, peerstore.PermanentAddrTTL) + urls := network.ExtractURLsFromPeer(htaddrs) + rand.Shuffle(len(urls), func(i, j int) { + urls[i], urls[j] = urls[j], urls[i] + }) + + // We will know try to talk to this peer by making an HTTP request. + // This allows re-using the connection that we are about to open next + // time with the client. The dialer callbacks will call peer.Connected() + // on success. + for _, u := range urls { + req, err := ht.buildRequest(ctx, p.ID, u, "GET", "bafyaabakaieac") + if err != nil { + log.Debug(err) + return err + } + + log.Debugf("connect request to %s", req.URL) + _, err = ht.client.Do(req) + if err != nil { + continue + } + return nil + // otherwise keep trying other urls. We don't care about the + // http status code as long as the request succeeded. + } + err := fmt.Errorf("%w: %s", ErrNoSuccess, p.ID) + log.Debug(err) + return err +} + +func (ht *httpnet) DisconnectFrom(ctx context.Context, p peer.ID) error { + // we noop. Idle connections will die by themselves. + return nil +} + +// ** We have no way of protecting a connection from our side other than using +// it so that it does not idle and gets closed. + +func (ht *httpnet) TagPeer(p peer.ID, tag string, w int) { +} +func (ht *httpnet) UntagPeer(p peer.ID, tag string) { +} + +func (ht *httpnet) Protect(p peer.ID, tag string) { +} +func (ht *httpnet) Unprotect(p peer.ID, tag string) bool { + return false +} + +// ** + +func (ht *httpnet) Stats() network.Stats { + return network.Stats{ + MessagesRecvd: atomic.LoadUint64(&ht.stats.MessagesRecvd), + MessagesSent: atomic.LoadUint64(&ht.stats.MessagesSent), + } +} + +func (ht *httpnet) buildRequest(ctx context.Context, pid peer.ID, u *url.URL, method string, cid string) (*http.Request, error) { + // copy url + sendURL, _ := url.Parse(u.String()) + sendURL.RawQuery = "format=raw" + sendURL.Path += "/ipfs/" + cid + + ctx = context.WithValue(ctx, pidCtxKey, pid) + req, err := http.NewRequestWithContext(ctx, + "GET", + sendURL.String(), + nil, + ) + if err != nil { + log.Error(err) + return nil, err + } + + headers := make(http.Header) + headers.Add("Accept", "application/vnd.ipld.raw") + headers.Add("User-Agent", ht.userAgent) + req.Header = headers + return req, nil +} + +func (ht *httpnet) NewMessageSender(ctx context.Context, p peer.ID, opts *network.MessageSenderOpts) (network.MessageSender, error) { + log.Debugf("NewMessageSender: %s", p) + pi := ht.host.Peerstore().PeerInfo(p) + urls := network.ExtractURLsFromPeer(pi) + if len(urls) == 0 { + return nil, ErrNoHTTPAddresses + } + + return &httpMsgSender{ + // ctx ?? + ht: ht, + peer: p, + urls: urls, + closing: make(chan struct{}, 1), + // opts: todo + }, nil +} + +type httpMsgSender struct { + peer peer.ID + urls []*url.URL + ht *httpnet + opts network.MessageSenderOpts + closing chan struct{} + closeOnce sync.Once +} + +// SendMsg performs an http request for the wanted cids per the msg's +// Wantlist. It reads the response and records it in a reponse BitswapMessage +// which is forwarded to the receivers (in a separate goroutine). +func (sender *httpMsgSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error { + // SendMsg gets called from MessageQueue and returning an error + // results in a MessageQueue shutdown. Errors are only returned when + // we are unable to obtain a single valid Block/Has response. When a + // URL errors in a bad way (connection, 500s), we continue checking + // with the next available one. + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + bsresp := msg.Clone() + recvdBlocks := make(map[cid.Cid]struct{}) + recvdHas := make(map[cid.Cid]struct{}) + + go func() { + for range sender.closing { + cancel() + } + }() + + // This is mostly a cosmetic action since the bitswap.Client is just + // logging the errors. + sendErrors := func(err error) { + if err != nil { + for _, recv := range sender.ht.receivers { + recv.ReceiveError(err) + } + } + } + + // We will try the HTTP URLs from peer and send the wantlist to one of + // them. We switch to the next URL in case of failures. We can 1) + // loop on urls and then wantlist, by keeping track of which blocks we + // have successfully requested already, or 2) loop on wantlist and + // then URLs, keeping track which URLs we should skip due to errors + // etc. It is simpler to do 1 and offers other small advantages + // (i.e. request object re-use). + + var merr error // collects all server errors + + for _, senderURL := range sender.urls { + if sender.ht.isInCooldown(senderURL) { + // this URL is cooling down due to previous + continue // with next url + } + + req, err := sender.ht.buildRequest(ctx, sender.peer, senderURL, "GET", "") + if err != nil { + log.Debug(err) + merr = multierr.Append(merr, err) + continue // with next url + } + // Store original path in case the "/ipfs/..." url is mounted + // on a subpath so we can just append to it. + origPath := req.URL.Path + + WANTLIST_LOOP: + for _, entry := range msg.Wantlist() { + // Reset the path to append a different CID. + req.URL.Path = origPath + + var method string + var resp *http.Response + var body []byte + + switch { + case entry.Cancel: // probably should not happen? + err = errors.New("received entry of type cancel") + log.Debug(err) + return err // full abort + + case entry.WantType == pb.Message_Wantlist_Block: + if _, ok := recvdBlocks[entry.Cid]; ok { + // previous url provided this block + continue // with next entry in wantlist. + } + method = "GET" + case entry.WantType == pb.Message_Wantlist_Have: + if _, ok := recvdHas[entry.Cid]; ok { + // previous url provided this Has + continue // with next entry in wantlist. + } + method = "HEAD" + default: + continue // with next entry, given unknown type. + } + + req.Method = method + req.URL.Path += entry.Cid.String() + + log.Debugf("cid request to %s %s", method, req.URL) + resp, err = sender.ht.client.Do(req) + if err != nil { + err = fmt.Errorf("error making request to %s: %w", req.URL, err) + merr = multierr.Append(merr, err) + log.Debug(err) + break WANTLIST_LOOP // stop processing wantlist. Try with next url. + } + + limReader := &io.LimitedReader{ + R: resp.Body, + N: sender.ht.maxBlockSize, + } + + body, err = io.ReadAll(limReader) + if err != nil { + err = fmt.Errorf("error reading body from %s: %w", req.URL, err) + merr = multierr.Append(merr, err) + log.Debug(err) + break WANTLIST_LOOP // stop processing wantlist. Try with next url. + } + + sender.ht.connEvtMgr.OnMessage(sender.peer) + switch resp.StatusCode { + // Valid responses signaling unavailability of the + // content. + case http.StatusNotFound, + http.StatusGone, + http.StatusForbidden, + http.StatusUnavailableForLegalReasons: + if entry.SendDontHave { + bsresp.AddDontHave(entry.Cid) + } + continue // with next entry in wantlist. + + case http.StatusOK: // \(^°^)/ + if req.Method == "HEAD" { + bsresp.AddHave(entry.Cid) + continue // with next entry in wantlist + } + + // GET + b, err := blocks.NewBlockWithCid(body, entry.Cid) + if err != nil { + log.Error("block received for cid %s does not match!", entry.Cid) + continue // with next entry in wantlist + } + bsresp.AddBlock(b) + continue // with next entry in wantlist + + // For any other code, we assume we must temporally + // backoff from the URL. + default: + err = fmt.Errorf("%s -> %d: %s", req.URL, resp.StatusCode, string(body)) + merr = multierr.Append(merr, err) + + cooldownPeriod := sender.ht.defaultCooldownPeriod + retryAfter := resp.Header.Get("Retry-After") + if d := parseRetryAfter(retryAfter); d > 0 { + cooldownPeriod = d + } + sender.ht.setCooldownDuration(senderURL, cooldownPeriod) + break WANTLIST_LOOP // and try next URL + } + } + } + + // send what we got ReceiveMessage and return + go func(receivers []network.Receiver, p peer.ID, msg bsmsg.BitSwapMessage) { + // todo: do not hang if closing + for i, recv := range receivers { + log.Debugf("Calling ReceiveMessage from %s (receiver %d)", p, i) + recv.ReceiveMessage( + context.Background(), // todo: which context? + p, + msg, + ) + } + }(sender.ht.receivers, sender.peer, bsresp) + + sendErrors(merr) + + // If we did not manage to obtain anything, return an errors if they + // existed. + if len(recvdBlocks)+len(recvdHas) == 0 { + return merr + } + // Otherwise errors are "ignored" (no need to disconnect). If we are + // in cooldown period, the connection might be eventually closed or we + // might retry later. + return nil +} + +func (sender *httpMsgSender) Close() error { + sender.closeOnce.Do(func() { + close(sender.closing) + }) + return nil +} + +func (sender *httpMsgSender) Reset() error { + return nil +} + +func (sender *httpMsgSender) SupportsHave() bool { + return false +} + +// defaultUserAgent returns a useful user agent version string allowing us to +// identify requests coming from official releases of this module vs forks. +func defaultUserAgent() (ua string) { + p := reflect.ValueOf(httpnet{}).Type().PkgPath() + // we have monorepo, so stripping the remainder + importPath := strings.TrimSuffix(p, "/bitswap/network/httpnet") + + ua = importPath + var module *debug.Module + if bi, ok := debug.ReadBuildInfo(); ok { + // If debug.ReadBuildInfo was successful, we can read Version by finding + // this client in the dependency list of the app that has it in go.mod + for _, dep := range bi.Deps { + if dep.Path == importPath { + module = dep + break + } + } + if module != nil { + ua += "@" + module.Version + return + } + ua += "@unknown" + } + return +} + +// parseRetryAfter returns how many seconds the Retry-After header header +// wants us to wait. +func parseRetryAfter(ra string) time.Duration { + if len(ra) == 0 { + return 0 + } + var d time.Duration + secs, err := strconv.ParseInt(ra, 10, 64) + if err != nil { + date, err := time.Parse(time.RFC1123, ra) + if err != nil { + return 0 + } + d = time.Until(date) + } else { + d = time.Duration(secs) + } + + if d < 0 { + d = 0 + } + return d +} diff --git a/bitswap/network/interface.go b/bitswap/network/interface.go index 6c56bab14..3b9c04c9a 100644 --- a/bitswap/network/interface.go +++ b/bitswap/network/interface.go @@ -5,30 +5,15 @@ import ( "time" bsmsg "github.com/ipfs/boxo/bitswap/message" - "github.com/ipfs/boxo/bitswap/network/internal" cid "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) -var ( - // ProtocolBitswapNoVers is equivalent to the legacy bitswap protocol - ProtocolBitswapNoVers = internal.ProtocolBitswapNoVers - // ProtocolBitswapOneZero is the prefix for the legacy bitswap protocol - ProtocolBitswapOneZero = internal.ProtocolBitswapOneZero - // ProtocolBitswapOneOne is the prefix for version 1.1.0 - ProtocolBitswapOneOne = internal.ProtocolBitswapOneOne - // ProtocolBitswap is the current version of the bitswap protocol: 1.2.0 - ProtocolBitswap = internal.ProtocolBitswap -) - // BitSwapNetwork provides network connectivity for BitSwap sessions. type BitSwapNetwork interface { - Self() peer.ID - // SendMessage sends a BitSwap message to a peer. SendMessage( context.Context, @@ -45,11 +30,19 @@ type BitSwapNetwork interface { NewMessageSender(context.Context, peer.ID, *MessageSenderOpts) (MessageSender, error) - ConnectionManager() connmgr.ConnManager - Stats() Stats + Self() peer.ID Pinger + PeerTagger +} + +// PeerTagger is an interface for tagging peers with metadata +type PeerTagger interface { + TagPeer(peer.ID, string, int) + UntagPeer(peer.ID, string) + Protect(peer.ID, string) + Unprotect(peer.ID, string) bool } // MessageSender is an interface for sending a series of messages over the bitswap diff --git a/bitswap/network/router.go b/bitswap/network/router.go new file mode 100644 index 000000000..aa2db2653 --- /dev/null +++ b/bitswap/network/router.go @@ -0,0 +1,121 @@ +package network + +import ( + "context" + "time" + + bsmsg "github.com/ipfs/boxo/bitswap/message" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "go.uber.org/multierr" +) + +type router struct { + Bitswap BitSwapNetwork + HTTP BitSwapNetwork + Peerstore peerstore.Peerstore +} + +// New returns a BitSwapNetwork supported by underlying IPFS host. +func New(pstore peerstore.Peerstore, bitswap BitSwapNetwork, http BitSwapNetwork) BitSwapNetwork { + return &router{ + Bitswap: bitswap, + HTTP: http, + } +} + +func (rt *router) Start(receivers ...Receiver) { + rt.Bitswap.Start(receivers...) + rt.HTTP.Start(receivers...) +} + +func (rt *router) Stop() { + rt.Bitswap.Stop() + rt.HTTP.Stop() +} + +// Should be the same for both +func (rt *router) Self() peer.ID { + return rt.Bitswap.Self() +} + +func (rt *router) Ping(ctx context.Context, p peer.ID) ping.Result { + pi := rt.Peerstore.PeerInfo(p) + htaddrs, _ := SplitHTTPAddrs(pi) + if len(htaddrs.Addrs) > 0 { + return rt.HTTP.Ping(ctx, p) + } + return rt.Bitswap.Ping(ctx, p) +} + +func (rt *router) Latency(p peer.ID) time.Duration { + pi := rt.Peerstore.PeerInfo(p) + htaddrs, _ := SplitHTTPAddrs(pi) + if len(htaddrs.Addrs) > 0 { + return rt.HTTP.Latency(p) + } + return rt.Bitswap.Latency(p) +} + +func (rt *router) SendMessage(ctx context.Context, p peer.ID, msg bsmsg.BitSwapMessage) error { + // SendMessage is only used by bitswap server so we send a bitswap + // message. + return rt.Bitswap.SendMessage(ctx, p, msg) +} + +// Connect attempts to connect to a peer. It prioritizes HTTP connections over +// bitswap. +func (rt *router) Connect(ctx context.Context, p peer.AddrInfo) error { + htaddrs, _ := SplitHTTPAddrs(p) + if len(htaddrs.Addrs) > 0 { + return rt.HTTP.Connect(ctx, p) + } else { + return rt.Bitswap.Connect(ctx, p) + } +} + +func (rt *router) DisconnectFrom(ctx context.Context, p peer.ID) error { + return multierr.Combine( + rt.HTTP.DisconnectFrom(ctx, p), + rt.Bitswap.DisconnectFrom(ctx, p), + ) +} + +func (rt *router) Stats() Stats { + htstats := rt.HTTP.Stats() + bsstats := rt.Bitswap.Stats() + return Stats{ + MessagesRecvd: htstats.MessagesRecvd + bsstats.MessagesRecvd, + MessagesSent: htstats.MessagesSent + bsstats.MessagesSent, + } +} + +// NewMessageSender returns a MessageSender using the HTTP network when HTTP +// addresses are knwon, and bitswap otherwise. +func (rt *router) NewMessageSender(ctx context.Context, p peer.ID, opts *MessageSenderOpts) (MessageSender, error) { + pi := rt.Peerstore.PeerInfo(p) + htaddrs, _ := SplitHTTPAddrs(pi) + if len(htaddrs.Addrs) > 0 { + return rt.HTTP.NewMessageSender(ctx, p, opts) + } + return rt.Bitswap.NewMessageSender(ctx, p, opts) +} + +func (rt *router) TagPeer(p peer.ID, tag string, w int) { + rt.HTTP.TagPeer(p, tag, w) + rt.Bitswap.TagPeer(p, tag, w) +} + +func (rt *router) UntagPeer(p peer.ID, tag string) { + rt.HTTP.UntagPeer(p, tag) + rt.Bitswap.UntagPeer(p, tag) +} + +func (rt *router) Protect(p peer.ID, tag string) { + rt.HTTP.Protect(p, tag) + rt.Bitswap.Protect(p, tag) +} +func (rt *router) Unprotect(p peer.ID, tag string) bool { + return rt.HTTP.Unprotect(p, tag) || rt.Bitswap.Unprotect(p, tag) +} diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 5bb277dfb..a62b5c15e 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -81,7 +81,7 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl s.engine = decision.NewEngine( ctx, bstore, - network.ConnectionManager(), + network, network.Self(), s.engineOptions..., ) diff --git a/bitswap/testinstance/testinstance.go b/bitswap/testinstance/testinstance.go index f09831b65..fae877623 100644 --- a/bitswap/testinstance/testinstance.go +++ b/bitswap/testinstance/testinstance.go @@ -5,7 +5,8 @@ import ( "time" "github.com/ipfs/boxo/bitswap" - bsnet "github.com/ipfs/boxo/bitswap/network" + iface "github.com/ipfs/boxo/bitswap/network" + bsnet "github.com/ipfs/boxo/bitswap/network/bsnet" tn "github.com/ipfs/boxo/bitswap/testnet" blockstore "github.com/ipfs/boxo/blockstore" mockrouting "github.com/ipfs/boxo/routing/mock" @@ -92,7 +93,7 @@ type Instance struct { Datastore ds.Batching Exchange *bitswap.Bitswap Blockstore blockstore.Blockstore - Adapter bsnet.BitSwapNetwork + Adapter iface.BitSwapNetwork Routing routing.Routing blockstoreDelay delay.D } diff --git a/bitswap/testnet/interface.go b/bitswap/testnet/interface.go index ec2818518..1da72d38d 100644 --- a/bitswap/testnet/interface.go +++ b/bitswap/testnet/interface.go @@ -1,7 +1,8 @@ package bitswap import ( - bsnet "github.com/ipfs/boxo/bitswap/network" + iface "github.com/ipfs/boxo/bitswap/network" + bsnet "github.com/ipfs/boxo/bitswap/network/bsnet" tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/peer" @@ -10,7 +11,7 @@ import ( // Network is an interface for generating bitswap network interfaces // based on a test network. type Network interface { - Adapter(tnet.Identity, ...bsnet.NetOpt) bsnet.BitSwapNetwork + Adapter(tnet.Identity, ...bsnet.NetOpt) iface.BitSwapNetwork HasPeer(peer.ID) bool } diff --git a/bitswap/testnet/peernet.go b/bitswap/testnet/peernet.go index 84fa70c6e..e2a9dd766 100644 --- a/bitswap/testnet/peernet.go +++ b/bitswap/testnet/peernet.go @@ -3,7 +3,8 @@ package bitswap import ( "context" - bsnet "github.com/ipfs/boxo/bitswap/network" + iface "github.com/ipfs/boxo/bitswap/network" + bsnet "github.com/ipfs/boxo/bitswap/network/bsnet" tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/peer" @@ -19,7 +20,7 @@ func StreamNet(ctx context.Context, net mockpeernet.Mocknet) (Network, error) { return &peernet{net}, nil } -func (pn *peernet) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapNetwork { +func (pn *peernet) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) iface.BitSwapNetwork { client, err := pn.Mocknet.AddPeer(p.PrivateKey(), p.Address()) if err != nil { panic(err.Error()) diff --git a/bitswap/testnet/virtual.go b/bitswap/testnet/virtual.go index 53e56d67d..390a1ece2 100644 --- a/bitswap/testnet/virtual.go +++ b/bitswap/testnet/virtual.go @@ -10,7 +10,8 @@ import ( "github.com/gammazero/deque" bsmsg "github.com/ipfs/boxo/bitswap/message" - bsnet "github.com/ipfs/boxo/bitswap/network" + iface "github.com/ipfs/boxo/bitswap/network" + bsnet "github.com/ipfs/boxo/bitswap/network/bsnet" delay "github.com/ipfs/go-ipfs-delay" tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/connmgr" @@ -79,7 +80,7 @@ type receiverQueue struct { lk sync.Mutex } -func (n *network) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapNetwork { +func (n *network) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) iface.BitSwapNetwork { n.mu.Lock() defer n.mu.Unlock() @@ -175,14 +176,14 @@ func (n *network) SendMessage( return nil } -var _ bsnet.Receiver = (*networkClient)(nil) +var _ iface.Receiver = (*networkClient)(nil) type networkClient struct { // These need to be at the top of the struct (allocated on the heap) for alignment on 32bit platforms. - stats bsnet.Stats + stats iface.Stats local peer.ID - receivers []bsnet.Receiver + receivers []iface.Receiver network *network supportedProtocols []protocol.ID } @@ -237,8 +238,8 @@ func (nc *networkClient) SendMessage( return nil } -func (nc *networkClient) Stats() bsnet.Stats { - return bsnet.Stats{ +func (nc *networkClient) Stats() iface.Stats { + return iface.Stats{ MessagesRecvd: atomic.LoadUint64(&nc.stats.MessagesRecvd), MessagesSent: atomic.LoadUint64(&nc.stats.MessagesSent), } @@ -283,7 +284,7 @@ func (mp *messagePasser) SupportsHave() bool { return false } -func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID, opts *bsnet.MessageSenderOpts) (bsnet.MessageSender, error) { +func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID, opts *iface.MessageSenderOpts) (iface.MessageSender, error) { return &messagePasser{ net: nc, target: p, @@ -292,7 +293,7 @@ func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID, opts * }, nil } -func (nc *networkClient) Start(r ...bsnet.Receiver) { +func (nc *networkClient) Start(r ...iface.Receiver) { nc.receivers = r } @@ -342,6 +343,19 @@ func (nc *networkClient) DisconnectFrom(_ context.Context, p peer.ID) error { return nil } +func (bsnet *networkClient) TagPeer(p peer.ID, tag string, w int) { +} + +func (bsnet *networkClient) UntagPeer(p peer.ID, tag string) { +} + +func (bsnet *networkClient) Protect(p peer.ID, tag string) { +} + +func (bsnet *networkClient) Unprotect(p peer.ID, tag string) bool { + return false +} + func (rq *receiverQueue) enqueue(m *message) { rq.lk.Lock() defer rq.lk.Unlock() diff --git a/go.mod b/go.mod index 2e95cd8d0..073eeea8e 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/multiformats/go-multihash v0.2.3 github.com/multiformats/go-multistream v0.6.0 github.com/polydawn/refmt v0.89.0 + github.com/prometheus-community/pro-bing v0.5.0 github.com/prometheus/client_golang v1.20.5 github.com/samber/lo v1.47.0 github.com/slok/go-http-metrics v0.12.0 diff --git a/go.sum b/go.sum index 1e9689a83..cdb19edce 100644 --- a/go.sum +++ b/go.sum @@ -420,6 +420,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/polydawn/refmt v0.89.0 h1:ADJTApkvkeBZsN0tBTx8QjpD9JkmxbKp0cxfr9qszm4= github.com/polydawn/refmt v0.89.0/go.mod h1:/zvteZs/GwLtCgZ4BL6CBsk9IKIlexP43ObX9AxTqTw= +github.com/prometheus-community/pro-bing v0.5.0 h1:Fq+4BUXKIvsPtXUY8K+04ud9dkAuFozqGmRAyNUpffY= +github.com/prometheus-community/pro-bing v0.5.0/go.mod h1:1joR9oXdMEAcAJJvhs+8vNDvTg5thfAZcRFhcUozG2g= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=