From edfaf9fc057f640a1ba3dbf13afee1652c0daf8c Mon Sep 17 00:00:00 2001 From: dirkmc Date: Wed, 24 Aug 2022 09:57:31 +0200 Subject: [PATCH] libp2p retrieval transports endpoint (#723) * feat: libp2p retrieval transports endpoint * refactor: transports protocol can have multiple multiaddresses * feat: boost provider retrieval-transports CLI command * docs: add known protocols to transports ipld schema --- cmd/boost/provider_cmd.go | 109 ++++++++++++++++++++ cmd/boost/provider_cmd_test.go | 45 +++++++++ go.mod | 5 +- go.sum | 12 ++- node/builder.go | 4 + node/config/doc_gen.go | 7 ++ node/config/types.go | 4 + node/modules/retrieval.go | 60 ++++++++++++ retrievalmarket/lp2pimpl/transports.go | 120 +++++++++++++++++++++++ retrievalmarket/types/transports.go | 53 ++++++++++ retrievalmarket/types/transports.ipldsch | 15 +++ 11 files changed, 428 insertions(+), 6 deletions(-) create mode 100644 cmd/boost/provider_cmd_test.go create mode 100644 node/modules/retrieval.go create mode 100644 retrievalmarket/lp2pimpl/transports.go create mode 100644 retrievalmarket/types/transports.go create mode 100644 retrievalmarket/types/transports.ipldsch diff --git a/cmd/boost/provider_cmd.go b/cmd/boost/provider_cmd.go index 2a2eb48af..73bbd2da4 100644 --- a/cmd/boost/provider_cmd.go +++ b/cmd/boost/provider_cmd.go @@ -9,12 +9,16 @@ import ( "github.com/filecoin-project/boost/cli/ctxutil" clinode "github.com/filecoin-project/boost/cli/node" "github.com/filecoin-project/boost/cmd" + "github.com/filecoin-project/boost/retrievalmarket/lp2pimpl" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/storagemarket/network" + // TODO: This multiaddr util library should probably live in its own repo + multiaddrutil "github.com/filecoin-project/go-legs/httpsync/multiaddr" "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" "github.com/ipfs/go-cid" + "github.com/multiformats/go-multiaddr" "github.com/urfave/cli/v2" ) @@ -30,6 +34,7 @@ var providerCmd = &cli.Command{ libp2pInfoCmd, storageAskCmd, retrievalAskCmd, + retrievalTransportsCmd, }, } @@ -294,3 +299,107 @@ var retrievalAskCmd = &cli.Command{ return nil }, } + +var retrievalTransportsCmd = &cli.Command{ + Name: "retrieval-transports", + Usage: "Query a storage provider's available retrieval transports (libp2p, http, etc)", + ArgsUsage: "[provider]", + Action: func(cctx *cli.Context) error { + ctx := bcli.ReqContext(cctx) + + afmt := NewAppFmt(cctx.App) + if cctx.NArg() != 1 { + afmt.Println("Usage: retrieval-transports [provider]") + return nil + } + + n, err := clinode.Setup(cctx.String(cmd.FlagRepo.Name)) + if err != nil { + return err + } + + api, closer, err := lcli.GetGatewayAPI(cctx) + if err != nil { + return fmt.Errorf("cant setup gateway connection: %w", err) + } + defer closer() + + maddr, err := address.NewFromString(cctx.Args().First()) + if err != nil { + return err + } + + addrInfo, err := cmd.GetAddrInfo(ctx, api, maddr) + if err != nil { + return err + } + + log.Debugw("found storage provider", "id", addrInfo.ID, "multiaddrs", addrInfo.Addrs, "addr", maddr) + + if err := n.Host.Connect(ctx, *addrInfo); err != nil { + return fmt.Errorf("failed to connect to peer %s: %w", addrInfo.ID, err) + } + + // Send the query to the Storage Provider + client := lp2pimpl.NewTransportsClient(n.Host) + resp, err := client.SendQuery(ctx, addrInfo.ID) + if err != nil { + return fmt.Errorf("failed to fetch transports from peer %s: %w", addrInfo.ID, err) + } + + if cctx.Bool("json") { + type addr struct { + Multiaddr string `json:"multiaddr"` + Address string `json:"address,omitempty"` + } + json := make(map[string]interface{}, len(resp.Protocols)) + for _, p := range resp.Protocols { + addrs := make([]addr, 0, len(p.Addresses)) + for _, ma := range p.Addresses { + // Get the multiaddress, and also try to get the address + // in the protocol's native format (eg URL format for + // http protocol) + addrs = append(addrs, addr{ + Multiaddr: ma.String(), + Address: multiaddrToNative(p.Name, ma), + }) + } + json[p.Name] = addrs + } + return cmd.PrintJson(json) + } + + if len(resp.Protocols) == 0 { + afmt.Println("No available retrieval protocols") + return nil + } + for _, p := range resp.Protocols { + afmt.Println(p.Name) + for _, a := range p.Addresses { + // Output the multiaddress + afmt.Println(" " + a.String()) + // Try to get the address in the protocol's native format + // (eg URL format for http protocol) + nativeFmt := multiaddrToNative(p.Name, a) + if nativeFmt != "" { + afmt.Println(" " + nativeFmt) + } + } + } + + return nil + }, +} + +func multiaddrToNative(proto string, ma multiaddr.Multiaddr) string { + switch proto { + case "http", "https": + u, err := multiaddrutil.ToURL(ma) + if err != nil { + return "" + } + return u.String() + } + + return "" +} diff --git a/cmd/boost/provider_cmd_test.go b/cmd/boost/provider_cmd_test.go new file mode 100644 index 000000000..05a6b102d --- /dev/null +++ b/cmd/boost/provider_cmd_test.go @@ -0,0 +1,45 @@ +package main + +import ( + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" + "testing" +) + +func TestMultiaddrToNative(t *testing.T) { + testCases := []struct { + name string + proto string + ma string + expected string + }{{ + name: "http", + proto: "http", + ma: "/dns/foo.com/http", + expected: "http://foo.com", + }, { + name: "http IP 4 address", + proto: "http", + ma: "/ip4/192.168.0.1/tcp/80/http", + expected: "http://192.168.0.1:80", + }, { + name: "https", + proto: "https", + ma: "/dns/foo.com/tcp/443/https", + expected: "https://foo.com:443", + }, { + name: "unknown protocol", + proto: "fancynewproto", + ma: "/dns/foo.com/tcp/80/http", + expected: "", + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ma, err := multiaddr.NewMultiaddr(tc.ma) + require.NoError(t, err) + res := multiaddrToNative(tc.proto, ma) + require.Equal(t, tc.expected, res) + }) + } +} diff --git a/go.mod b/go.mod index 5235a5e83..33f06aaf2 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 github.com/filecoin-project/go-fil-markets v1.23.2 github.com/filecoin-project/go-jsonrpc v0.1.5 + github.com/filecoin-project/go-legs v0.4.9 github.com/filecoin-project/go-padreader v0.0.1 github.com/filecoin-project/go-paramfetch v0.0.4 github.com/filecoin-project/go-state-types v0.1.10 @@ -64,7 +65,7 @@ require ( github.com/ipfs/go-unixfs v0.3.1 github.com/ipld/go-car v0.4.1-0.20220707083113-89de8134e58e github.com/ipld/go-car/v2 v2.4.2-0.20220707083113-89de8134e58e - github.com/ipld/go-ipld-prime v0.17.0 + github.com/ipld/go-ipld-prime v0.18.0 github.com/ipld/go-ipld-selector-text-lite v0.0.1 github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c github.com/jpillora/backoff v1.0.0 @@ -83,7 +84,7 @@ require ( github.com/mitchellh/go-homedir v1.1.0 github.com/multiformats/go-multiaddr v0.5.0 github.com/multiformats/go-multibase v0.0.3 - github.com/multiformats/go-multihash v0.1.0 + github.com/multiformats/go-multihash v0.2.0 github.com/multiformats/go-varint v0.0.6 github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333 github.com/pressly/goose/v3 v3.5.3 diff --git a/go.sum b/go.sum index bceca945c..25e05c6b6 100644 --- a/go.sum +++ b/go.sum @@ -398,8 +398,9 @@ github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0/go.mod h1:bxmzgT8tmeVQA1/gvBw github.com/filecoin-project/go-indexer-core v0.2.16/go.mod h1:5kCKyhtT9k1vephr9l9SFGX8B/HowXIvOhGCkmbxwbY= github.com/filecoin-project/go-jsonrpc v0.1.5 h1:ckxqZ09ivBAVf5CSmxxrqqNHC7PJm3GYGtYKiNQ+vGk= github.com/filecoin-project/go-jsonrpc v0.1.5/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4= -github.com/filecoin-project/go-legs v0.4.4 h1:mpMmAOOnamaz0CV9rgeKhEWA8j9kMC+f+UGCGrxKaZo= github.com/filecoin-project/go-legs v0.4.4/go.mod h1:JQ3hA6xpJdbR8euZ2rO0jkxaMxeidXf0LDnVuqPAe9s= +github.com/filecoin-project/go-legs v0.4.9 h1:9ccbv5zDPqMviEpSpf0TdfKKI64TMYGSiuY2A1EXHFY= +github.com/filecoin-project/go-legs v0.4.9/go.mod h1:JQ3hA6xpJdbR8euZ2rO0jkxaMxeidXf0LDnVuqPAe9s= github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak= github.com/filecoin-project/go-padreader v0.0.1 h1:8h2tVy5HpoNbr2gBRr+WD6zV6VD6XHig+ynSGJg8ZOs= github.com/filecoin-project/go-padreader v0.0.1/go.mod h1:VYVPJqwpsfmtoHnAmPx6MUwmrK6HIcDqZJiuZhtmfLQ= @@ -1022,8 +1023,9 @@ github.com/ipld/go-ipld-prime v0.14.1/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704n github.com/ipld/go-ipld-prime v0.14.2/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0= github.com/ipld/go-ipld-prime v0.14.4-0.20211217152141-008fd70fc96f/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0= github.com/ipld/go-ipld-prime v0.16.0/go.mod h1:axSCuOCBPqrH+gvXr2w9uAOulJqBPhHPT2PjoiiU1qA= -github.com/ipld/go-ipld-prime v0.17.0 h1:+U2peiA3aQsE7mrXjD2nYZaZrCcakoz2Wge8K42Ld8g= github.com/ipld/go-ipld-prime v0.17.0/go.mod h1:aYcKm5TIvGfY8P3QBKz/2gKcLxzJ1zDaD+o0bOowhgs= +github.com/ipld/go-ipld-prime v0.18.0 h1:xUk7NUBSWHEXdjiOu2sLXouFJOMs0yoYzeI5RAqhYQo= +github.com/ipld/go-ipld-prime v0.18.0/go.mod h1:735yXW548CKrLwVCYXzqx90p5deRJMVVxM9eJ4Qe+qE= github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5/go.mod h1:gcvzoEDBjwycpXt3LBE061wT9f46szXGHAmj9uoP6fU= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73 h1:TsyATB2ZRRQGTwafJdgEUQkmjOExRV0DNokcihZxbnQ= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73/go.mod h1:2PJ0JgxyB08t0b2WKrcuqI3di0V+5n6RS/LTUJhkoxY= @@ -1736,8 +1738,9 @@ github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUj github.com/multiformats/go-multihash v0.0.14/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= github.com/multiformats/go-multihash v0.0.15/go.mod h1:D6aZrWNLFTV/ynMpKsNtB40mJzmCl4jb1alC0OvHiHg= github.com/multiformats/go-multihash v0.0.16/go.mod h1:zhfEIgVnB/rPMfxgFw15ZmGoNaKyNUIE4IWHG/kC+Ag= -github.com/multiformats/go-multihash v0.1.0 h1:CgAgwqk3//SVEw3T+6DqI4mWMyRuDwZtOWcJT0q9+EA= github.com/multiformats/go-multihash v0.1.0/go.mod h1:RJlXsxt6vHGaia+S8We0ErjhojtKzPP2AH4+kYM7k84= +github.com/multiformats/go-multihash v0.2.0 h1:oytJb9ZA1OUW0r0f9ea18GiaPOo4SXyc7p2movyUuo4= +github.com/multiformats/go-multihash v0.2.0/go.mod h1:WxoMcYG85AZVQUyRyo9s4wULvW5qrI9vb2Lt6evduFc= github.com/multiformats/go-multistream v0.0.1/go.mod h1:fJTiDfXJVmItycydCnNx4+wSzZ5NwG2FEVAI30fiovg= github.com/multiformats/go-multistream v0.0.4/go.mod h1:fJTiDfXJVmItycydCnNx4+wSzZ5NwG2FEVAI30fiovg= github.com/multiformats/go-multistream v0.1.0/go.mod h1:fJTiDfXJVmItycydCnNx4+wSzZ5NwG2FEVAI30fiovg= @@ -2296,8 +2299,9 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220210151621-f4118a5b28e2/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 h1:kUhD7nTDoI3fVd9G4ORWrbV5NY0liEs/Jg2pv5f+bBA= golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM= +golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= diff --git a/node/builder.go b/node/builder.go index 8712c3abe..abe8243d1 100644 --- a/node/builder.go +++ b/node/builder.go @@ -18,6 +18,7 @@ import ( "github.com/filecoin-project/boost/node/impl/common" "github.com/filecoin-project/boost/node/modules" "github.com/filecoin-project/boost/node/modules/dtypes" + "github.com/filecoin-project/boost/retrievalmarket/lp2pimpl" "github.com/filecoin-project/boost/sealingpipeline" "github.com/filecoin-project/boost/storagemanager" "github.com/filecoin-project/boost/storagemarket" @@ -137,6 +138,7 @@ const ( HandleMigrateProviderFundsKey HandleDealsKey HandleRetrievalKey + HandleRetrievalTransportsKey RunSectorServiceKey // boost should be started after legacy markets (HandleDealsKey) @@ -514,6 +516,8 @@ func ConfigBoost(cfg *config.Boost) Option { Override(new(rmnet.RetrievalMarketNetwork), lotus_modules.RetrievalNetwork), Override(new(retrievalmarket.RetrievalProvider), lotus_modules.RetrievalProvider), Override(HandleRetrievalKey, lotus_modules.HandleRetrieval), + Override(new(*lp2pimpl.TransportsListener), modules.NewTransportsListener(cfg)), + Override(HandleRetrievalTransportsKey, modules.HandleRetrievalTransports), Override(new(idxprov.MeshCreator), idxprov.NewMeshCreator), Override(new(provider.Interface), modules.IndexProvider(cfg.IndexProvider)), diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index cdddee232..8a29b89df 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -239,6 +239,13 @@ see https://docs.filecoin.io/mine/lotus/miner-configuration/#using-filters-for-f Comment: `Whether to do commp on the Boost node (local) or on the Sealer (remote)`, }, + { + Name: "HTTPRetrievalMultiaddr", + Type: "string", + + Comment: `The public multi-address for retrieving deals with booster-http. +Note: Must be in multiaddr format, eg /dns/foo.com/tcp/443/https`, + }, }, "FeeConfig": []DocField{ { diff --git a/node/config/types.go b/node/config/types.go index 9b0c6a1b5..7ff9c3a2d 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -169,6 +169,10 @@ type DealmakingConfig struct { MaxTransferDuration Duration // Whether to do commp on the Boost node (local) or on the Sealer (remote) RemoteCommp bool + + // The public multi-address for retrieving deals with booster-http. + // Note: Must be in multiaddr format, eg /dns/foo.com/tcp/443/https + HTTPRetrievalMultiaddr string } type FeeConfig struct { diff --git a/node/modules/retrieval.go b/node/modules/retrieval.go new file mode 100644 index 000000000..214c6683d --- /dev/null +++ b/node/modules/retrieval.go @@ -0,0 +1,60 @@ +package modules + +import ( + "context" + "fmt" + + "github.com/filecoin-project/boost/node/config" + "github.com/filecoin-project/boost/retrievalmarket/lp2pimpl" + "github.com/filecoin-project/boost/retrievalmarket/types" + "github.com/libp2p/go-libp2p-core/host" + "github.com/multiformats/go-multiaddr" + "go.uber.org/fx" +) + +func NewTransportsListener(cfg *config.Boost) func(h host.Host) (*lp2pimpl.TransportsListener, error) { + return func(h host.Host) (*lp2pimpl.TransportsListener, error) { + protos := []types.Protocol{} + + // Get the libp2p addresses from the Host + if len(h.Addrs()) > 0 { + protos = append(protos, types.Protocol{ + Name: "libp2p", + Addresses: h.Addrs(), + }) + } + + // If there's an http retrieval address specified, add HTTP to the list + // of supported protocols + if cfg.Dealmaking.HTTPRetrievalMultiaddr != "" { + maddr, err := multiaddr.NewMultiaddr(cfg.Dealmaking.HTTPRetrievalMultiaddr) + if err != nil { + msg := "HTTPRetrievalURL must be in multi-address format. " + msg += "Could not parse '%s' as multiaddr: %w" + return nil, fmt.Errorf(msg, cfg.Dealmaking.HTTPRetrievalMultiaddr, err) + } + + protos = append(protos, types.Protocol{ + Name: "http", + Addresses: []multiaddr.Multiaddr{maddr}, + }) + } + + return lp2pimpl.NewTransportsListener(h, protos), nil + } +} + +func HandleRetrievalTransports(lc fx.Lifecycle, l *lp2pimpl.TransportsListener) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + log.Debug("starting retrieval transports listener") + l.Start() + return nil + }, + OnStop: func(context.Context) error { + log.Debug("stopping retrieval transports listener") + l.Stop() + return nil + }, + }) +} diff --git a/retrievalmarket/lp2pimpl/transports.go b/retrievalmarket/lp2pimpl/transports.go new file mode 100644 index 000000000..3e11f8c85 --- /dev/null +++ b/retrievalmarket/lp2pimpl/transports.go @@ -0,0 +1,120 @@ +package lp2pimpl + +import ( + "context" + "fmt" + "time" + + "github.com/filecoin-project/boost/retrievalmarket/types" + "github.com/filecoin-project/go-fil-markets/shared" + logging "github.com/ipfs/go-log/v2" + "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" +) + +var clog = logging.Logger("boost:lp2p:tspt:client") +var slog = logging.Logger("boost:lp2p:tspt") + +// TransportsProtocolID is the protocol for querying which retrieval transports +// the Storage Provider supports (http, libp2p, etc) +const TransportsProtocolID = protocol.ID("/fil/retrieval/transports/1.0.0") + +// TransportsListener listens for incoming queries over libp2p +type TransportsListener struct { + host host.Host + protocols []types.Protocol +} + +const streamReadDeadline = 30 * time.Second +const streamWriteDeadline = 30 * time.Second + +// QueryClientOption is an option for configuring the libp2p storage deal client +type QueryClientOption func(*TransportsClient) + +// RetryParameters changes the default parameters around connection reopening +func RetryParameters(minDuration time.Duration, maxDuration time.Duration, attempts float64, backoffFactor float64) QueryClientOption { + return func(c *TransportsClient) { + c.retryStream.SetOptions(shared.RetryParameters(minDuration, maxDuration, attempts, backoffFactor)) + } +} + +// TransportsClient sends retrieval queries over libp2p +type TransportsClient struct { + retryStream *shared.RetryStream +} + +func NewTransportsClient(h host.Host, options ...QueryClientOption) *TransportsClient { + c := &TransportsClient{ + retryStream: shared.NewRetryStream(h), + } + for _, option := range options { + option(c) + } + return c +} + +// SendQuery sends a retrieval query over a libp2p stream to the peer +func (c *TransportsClient) SendQuery(ctx context.Context, id peer.ID) (*types.QueryResponse, error) { + clog.Debugw("query", "peer", id) + + // Create a libp2p stream to the provider + s, err := c.retryStream.OpenStream(ctx, id, []protocol.ID{TransportsProtocolID}) + if err != nil { + return nil, err + } + + defer s.Close() // nolint + + // Set a deadline on reading from the stream so it doesn't hang + _ = s.SetReadDeadline(time.Now().Add(streamReadDeadline)) + defer s.SetReadDeadline(time.Time{}) // nolint + + // Read the response from the stream + queryResponsei, err := types.BindnodeRegistry.TypeFromReader(s, (*types.QueryResponse)(nil), dagcbor.Decode) + if err != nil { + return nil, fmt.Errorf("reading query response: %w", err) + } + queryResponse := queryResponsei.(*types.QueryResponse) + + clog.Debugw("response", "peer", id) + + return queryResponse, nil +} + +func NewTransportsListener(h host.Host, protos []types.Protocol) *TransportsListener { + return &TransportsListener{ + host: h, + protocols: protos, + } +} + +func (p *TransportsListener) Start() { + p.host.SetStreamHandler(TransportsProtocolID, p.handleNewQueryStream) +} + +func (p *TransportsListener) Stop() { + p.host.RemoveStreamHandler(TransportsProtocolID) +} + +// Called when the client opens a libp2p stream +func (l *TransportsListener) handleNewQueryStream(s network.Stream) { + defer s.Close() + + slog.Debugw("query", "peer", s.Conn().RemotePeer()) + + response := types.QueryResponse{Protocols: l.protocols} + + // Set a deadline on writing to the stream so it doesn't hang + _ = s.SetWriteDeadline(time.Now().Add(streamWriteDeadline)) + defer s.SetWriteDeadline(time.Time{}) // nolint + + // Write the response to the client + err := types.BindnodeRegistry.TypeToWriter(&response, s, dagcbor.Encode) + if err != nil { + slog.Infow("error writing query response", "peer", s.Conn().RemotePeer(), "err", err) + return + } +} diff --git a/retrievalmarket/types/transports.go b/retrievalmarket/types/transports.go new file mode 100644 index 000000000..aee9d51d5 --- /dev/null +++ b/retrievalmarket/types/transports.go @@ -0,0 +1,53 @@ +package types + +import ( + _ "embed" + "fmt" + + "github.com/ipld/go-ipld-prime/node/bindnode" + bindnoderegistry "github.com/ipld/go-ipld-prime/node/bindnode/registry" + "github.com/multiformats/go-multiaddr" +) + +type Protocol struct { + // The name of the transport protocol eg "libp2p" or "http" + Name string + // The address of the endpoint in multiaddr format + Addresses []multiaddr.Multiaddr +} + +type QueryResponse struct { + Protocols []Protocol +} + +//go:embed transports.ipldsch +var embedSchema []byte + +func multiAddrFromBytes(b []byte) (interface{}, error) { + ma, err := multiaddr.NewMultiaddrBytes(b) + if err != nil { + return nil, err + } + return &ma, err +} + +func multiAddrToBytes(iface interface{}) ([]byte, error) { + ma, ok := iface.(*multiaddr.Multiaddr) + if !ok { + return nil, fmt.Errorf("expected *Multiaddr value") + } + + return (*ma).Bytes(), nil +} + +var BindnodeRegistry = bindnoderegistry.NewRegistry() + +func init() { + var dummyMa multiaddr.Multiaddr + var bindnodeOptions = []bindnode.Option{ + bindnode.TypedBytesConverter(&dummyMa, multiAddrFromBytes, multiAddrToBytes), + } + if err := BindnodeRegistry.RegisterType((*QueryResponse)(nil), string(embedSchema), "QueryResponse", bindnodeOptions...); err != nil { + panic(err.Error()) + } +} diff --git a/retrievalmarket/types/transports.ipldsch b/retrievalmarket/types/transports.ipldsch new file mode 100644 index 000000000..f6ab6d799 --- /dev/null +++ b/retrievalmarket/types/transports.ipldsch @@ -0,0 +1,15 @@ +# Defines the response to a query asking which transport protocols a +# Storage Provider supports +type Multiaddr bytes + +type Protocol struct { + # The name of the transport protocol + # Known protocols: "libp2p", "http", "https" + Name String + # The addresses of the endpoint in multiaddr format + Addresses [Multiaddr] +} + +type QueryResponse struct { + Protocols [Protocol] +}