Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: proxy all delegated routing
Browse files Browse the repository at this point in the history
hacdias committed Nov 27, 2023
1 parent 0d72338 commit de535a2
Showing 5 changed files with 358 additions and 126 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -8,7 +8,6 @@ require (
github.com/libp2p/go-libp2p v0.32.0
github.com/libp2p/go-libp2p-kad-dht v0.23.0
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-libp2p-routing-helpers v0.7.0
github.com/multiformats/go-multibase v0.2.0
github.com/urfave/cli/v2 v2.25.7
)
@@ -57,6 +56,7 @@ require (
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.7.0 // indirect
github.com/libp2p/go-libp2p-xor v0.1.0 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/libp2p/go-nat v0.2.0 // indirect
162 changes: 37 additions & 125 deletions server.go
Original file line number Diff line number Diff line change
@@ -9,16 +9,12 @@ import (

"github.com/ipfs/boxo/ipns"
"github.com/ipfs/boxo/routing/http/client"
"github.com/ipfs/boxo/routing/http/contentrouter"
"github.com/ipfs/boxo/routing/http/server"
"github.com/ipfs/boxo/routing/http/types"
"github.com/ipfs/boxo/routing/http/types/iter"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
record "github.com/libp2p/go-libp2p-record"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
@@ -33,7 +29,7 @@ func start(ctx context.Context, port int, runAcceleratedDHTClient bool, contentE

var dhtRouting routing.Routing
if runAcceleratedDHTClient {
wrappedDHT, err := newWrappedStandardAndAcceleratedDHTClient(ctx, h)
wrappedDHT, err := newBundledDHT(ctx, h)
if err != nil {
return err
}
@@ -61,16 +57,14 @@ func start(ctx context.Context, port int, runAcceleratedDHTClient bool, contentE
return err
}

proxy := &delegatedRoutingProxy{
cr: crRouters,
pr: prRouters,
vs: ipnsRouters,
}

log.Printf("Listening on http://0.0.0.0:%d", port)
log.Printf("Delegated Routing API on http://127.0.0.1:%d/routing/v1", port)

http.Handle("/", server.Handler(proxy))
http.Handle("/", server.Handler(&composableRouter{
providers: crRouters,
peers: prRouters,
ipns: ipnsRouters,
}))
return http.ListenAndServe(":"+strconv.Itoa(port), nil)
}

@@ -108,148 +102,61 @@ func newHost(highOutboundLimits bool) (host.Host, error) {
return h, nil
}

type wrappedStandardAndAcceleratedDHTClient struct {
standard *dht.IpfsDHT
accelerated *fullrt.FullRT
}

func newWrappedStandardAndAcceleratedDHTClient(ctx context.Context, h host.Host) (routing.Routing, error) {
standardDHT, err := dht.New(ctx, h, dht.Mode(dht.ModeClient), dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...))
if err != nil {
return nil, err
}

acceleratedDHT, err := fullrt.NewFullRT(h, "/ipfs",
fullrt.DHTOption(
dht.BucketSize(20),
dht.Validator(record.NamespacedValidator{
"pk": record.PublicKeyValidator{},
"ipns": ipns.Validator{},
}),
dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
dht.Mode(dht.ModeClient),
))
if err != nil {
return nil, err
}

return &wrappedStandardAndAcceleratedDHTClient{
standard: standardDHT,
accelerated: acceleratedDHT,
}, nil
}

func (w *wrappedStandardAndAcceleratedDHTClient) Provide(ctx context.Context, c cid.Cid, b bool) error {
if w.accelerated.Ready() {
return w.accelerated.Provide(ctx, c, b)
}
return w.standard.Provide(ctx, c, b)
}

func (w *wrappedStandardAndAcceleratedDHTClient) FindProvidersAsync(ctx context.Context, c cid.Cid, i int) <-chan peer.AddrInfo {
if w.accelerated.Ready() {
return w.accelerated.FindProvidersAsync(ctx, c, i)
}
return w.standard.FindProvidersAsync(ctx, c, i)
}

func (w *wrappedStandardAndAcceleratedDHTClient) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) {
if w.accelerated.Ready() {
return w.accelerated.FindPeer(ctx, p)
}
return w.standard.FindPeer(ctx, p)
}

func (w *wrappedStandardAndAcceleratedDHTClient) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) error {
if w.accelerated.Ready() {
return w.accelerated.PutValue(ctx, key, value, opts...)
}
return w.standard.PutValue(ctx, key, value, opts...)
}

func (w *wrappedStandardAndAcceleratedDHTClient) GetValue(ctx context.Context, s string, opts ...routing.Option) ([]byte, error) {
if w.accelerated.Ready() {
return w.accelerated.GetValue(ctx, s, opts...)
}
return w.standard.GetValue(ctx, s, opts...)
}

func (w *wrappedStandardAndAcceleratedDHTClient) SearchValue(ctx context.Context, s string, opts ...routing.Option) (<-chan []byte, error) {
if w.accelerated.Ready() {
return w.accelerated.SearchValue(ctx, s, opts...)
}
return w.standard.SearchValue(ctx, s, opts...)
}

func (w *wrappedStandardAndAcceleratedDHTClient) Bootstrap(ctx context.Context) error {
return w.standard.Bootstrap(ctx)
}

func getCombinedRouting(endpoints []string, dht routing.Routing) (routing.Routing, error) {
func getCombinedRouting(endpoints []string, dht routing.Routing) (server.ContentRouter, error) {
if len(endpoints) == 0 {
return dht, nil
return &wrappedRouting{dht: dht}, nil
}

var routers []routing.Routing
var routers []server.ContentRouter

for _, endpoint := range endpoints {
drclient, err := client.New(endpoint)
if err != nil {
return nil, err
}
routers = append(routers, newWrappedDelegatedRouting(drclient))
routers = append(routers, &wrappedDelegatedClient{Client: drclient})
}

return routinghelpers.Parallel{
Routers: append(routers, dht),
return parallelRouter{
routers: append(routers, &wrappedRouting{dht: dht}),
}, nil
}

type wrappedDelegatedRouting struct {
routing.ValueStore
routing.PeerRouting
routing.ContentRouting
type wrappedDelegatedClient struct {
*client.Client
}

func newWrappedDelegatedRouting(drc *client.Client) routing.Routing {
v := contentrouter.NewContentRoutingClient(drc)
func (d *wrappedDelegatedClient) FindProviders(ctx context.Context, cid cid.Cid, limit int) (iter.ResultIter[types.Record], error) {
return d.Client.FindProviders(ctx, cid)
}

return &wrappedDelegatedRouting{
ValueStore: v,
PeerRouting: v,
ContentRouting: v,
}
func (d *wrappedDelegatedClient) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) {
return d.Client.FindPeers(ctx, pid)
}

func (c *wrappedDelegatedRouting) Bootstrap(ctx context.Context) error {
return routing.ErrNotSupported
//lint:ignore SA1019 // ignore staticcheck
func (d *wrappedDelegatedClient) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) {
return 0, routing.ErrNotSupported
}

type delegatedRoutingProxy struct {
cr routing.ContentRouting
pr routing.PeerRouting
vs routing.ValueStore
type wrappedRouting struct {
dht routing.Routing
}

func (d *delegatedRoutingProxy) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) {
func (d *wrappedRouting) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) {
ctx, cancel := context.WithCancel(ctx)
ch := d.cr.FindProvidersAsync(ctx, key, limit)
ch := d.dht.FindProvidersAsync(ctx, key, limit)
return iter.ToResultIter[types.Record](&peerChanIter{
ch: ch,
cancel: cancel,
}), nil
}

//lint:ignore SA1019 // ignore staticcheck
func (d *delegatedRoutingProxy) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) {
return 0, routing.ErrNotSupported
}

func (d *delegatedRoutingProxy) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) {
func (d *wrappedRouting) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

addr, err := d.pr.FindPeer(ctx, pid)
addr, err := d.dht.FindPeer(ctx, pid)
if err != nil {
return nil, err
}
@@ -266,19 +173,19 @@ func (d *delegatedRoutingProxy) FindPeers(ctx context.Context, pid peer.ID, limi
return iter.ToResultIter[*types.PeerRecord](iter.FromSlice[*types.PeerRecord]([]*types.PeerRecord{rec})), nil
}

func (d *delegatedRoutingProxy) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
func (d *wrappedRouting) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

raw, err := d.vs.GetValue(ctx, string(name.RoutingKey()))
raw, err := d.dht.GetValue(ctx, string(name.RoutingKey()))
if err != nil {
return nil, err
}

return ipns.UnmarshalRecord(raw)
}

func (d *delegatedRoutingProxy) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error {
func (d *wrappedRouting) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

@@ -287,7 +194,12 @@ func (d *delegatedRoutingProxy) PutIPNS(ctx context.Context, name ipns.Name, rec
return err
}

return d.vs.PutValue(ctx, string(name.RoutingKey()), raw)
return d.dht.PutValue(ctx, string(name.RoutingKey()), raw)
}

//lint:ignore SA1019 // ignore staticcheck
func (d *wrappedRouting) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) {
return 0, routing.ErrNotSupported
}

type peerChanIter struct {
43 changes: 43 additions & 0 deletions server_composable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"context"
"time"

"github.com/ipfs/boxo/ipns"
"github.com/ipfs/boxo/routing/http/server"
"github.com/ipfs/boxo/routing/http/types"
"github.com/ipfs/boxo/routing/http/types/iter"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
)

var _ server.ContentRouter = composableRouter{}

type composableRouter struct {
providers server.ContentRouter
peers server.ContentRouter
ipns server.ContentRouter
}

func (r composableRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) {
return r.providers.FindProviders(ctx, key, limit)
}

//lint:ignore SA1019 // ignore staticcheck
func (r composableRouter) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) {
return 0, routing.ErrNotSupported
}

func (r composableRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) {
return r.providers.FindPeers(ctx, pid, limit)
}

func (r composableRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
return r.ipns.GetIPNS(ctx, name)
}

func (r composableRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error {
return r.ipns.PutIPNS(ctx, name, record)
}
80 changes: 80 additions & 0 deletions server_dht.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"context"

"github.com/ipfs/boxo/ipns"
"github.com/ipfs/go-cid"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
)

type bundledDHT struct {
standard *dht.IpfsDHT
fullRT *fullrt.FullRT
}

func newBundledDHT(ctx context.Context, h host.Host) (routing.Routing, error) {
standardDHT, err := dht.New(ctx, h, dht.Mode(dht.ModeClient), dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...))
if err != nil {
return nil, err
}

fullRT, err := fullrt.NewFullRT(h, "/ipfs",
fullrt.DHTOption(
dht.BucketSize(20),
dht.Validator(record.NamespacedValidator{
"pk": record.PublicKeyValidator{},
"ipns": ipns.Validator{},
}),
dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
dht.Mode(dht.ModeClient),
))
if err != nil {
return nil, err
}

return &bundledDHT{
standard: standardDHT,
fullRT: fullRT,
}, nil
}

func (b *bundledDHT) getDHT() routing.Routing {
if b.fullRT.Ready() {
return b.fullRT
}
return b.standard
}

func (b *bundledDHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error {
return b.getDHT().Provide(ctx, c, brdcst)
}

func (b *bundledDHT) FindProvidersAsync(ctx context.Context, c cid.Cid, i int) <-chan peer.AddrInfo {
return b.getDHT().FindProvidersAsync(ctx, c, i)
}

func (b *bundledDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
return b.getDHT().FindPeer(ctx, id)
}

func (b *bundledDHT) PutValue(ctx context.Context, k string, v []byte, option ...routing.Option) error {
return b.getDHT().PutValue(ctx, k, v, option...)
}

func (b *bundledDHT) GetValue(ctx context.Context, s string, option ...routing.Option) ([]byte, error) {
return b.getDHT().GetValue(ctx, s, option...)
}

func (b *bundledDHT) SearchValue(ctx context.Context, s string, option ...routing.Option) (<-chan []byte, error) {
return b.getDHT().SearchValue(ctx, s, option...)
}

func (b *bundledDHT) Bootstrap(ctx context.Context) error {
return b.standard.Bootstrap(ctx)
}
197 changes: 197 additions & 0 deletions server_parallel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package main

import (
"context"
"errors"
"sync"
"time"

"github.com/ipfs/boxo/ipns"
"github.com/ipfs/boxo/routing/http/server"
"github.com/ipfs/boxo/routing/http/types"
"github.com/ipfs/boxo/routing/http/types/iter"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
)

var _ server.ContentRouter = parallelRouter{}

type parallelRouter struct {
routers []server.ContentRouter
}

func (r parallelRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) {
switch len(r.routers) {
case 0:
return iter.ToResultIter(iter.FromSlice([]types.Record{})), nil
case 1:
return r.routers[0].FindProviders(ctx, key, limit)
}

its := make([]iter.ResultIter[types.Record], len(r.routers))
for i, ri := range r.routers {
var err error
its[i], err = ri.FindProviders(ctx, key, limit)
if err != nil {
for _, it := range its {
if it != nil {
_ = it.Close()
}
}
return nil, err
}
}
return &manyIter[types.Record]{it: its}, nil
}

type manyIter[T any] struct {
it []iter.ResultIter[T]
next int
}

func (mi *manyIter[T]) Next() bool {
for i, it := range mi.it {
if it.Next() {
mi.next = i
return true
}
}

mi.next = -1
return false
}

func (mi *manyIter[T]) Val() iter.Result[T] {
if mi.next == -1 {
return iter.Result[T]{Err: errors.New("no next value")}
}
return mi.it[mi.next].Val()
}

func (mi *manyIter[T]) Close() error {
var err error
for _, it := range mi.it {
err = errors.Join(err, it.Close())
}
return err
}

//lint:ignore SA1019 // ignore staticcheck
func (r parallelRouter) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) {
return 0, routing.ErrNotSupported
}

func (r parallelRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) {
switch len(r.routers) {
case 0:
return iter.ToResultIter(iter.FromSlice([]*types.PeerRecord{})), nil
case 1:
return r.routers[0].FindPeers(ctx, pid, limit)
}

its := make([]iter.ResultIter[*types.PeerRecord], len(r.routers))
for i, ri := range r.routers {
var err error
its[i], err = ri.FindPeers(ctx, pid, limit)
if err != nil {
for _, it := range its {
if it != nil {
_ = it.Close()
}
}
return nil, err
}
}
return &manyIter[*types.PeerRecord]{it: its}, nil
}

func (r parallelRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
switch len(r.routers) {
case 0:
return nil, routing.ErrNotFound
case 1:
return r.routers[0].GetIPNS(ctx, name)
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

results := make(chan struct {
val *ipns.Record
err error
})
for _, ri := range r.routers {
go func(ri server.ContentRouter) {
value, err := ri.GetIPNS(ctx, name)
select {
case results <- struct {
val *ipns.Record
err error
}{
val: value,
err: err,
}:
case <-ctx.Done():
}
}(ri)
}

var errs error

for range r.routers {
select {
case res := <-results:
switch res.err {
case nil:
return res.val, nil
case routing.ErrNotFound, routing.ErrNotSupported:
continue
}
// If the context has expired, just return that error
// and ignore the other errors.
if ctx.Err() != nil {
return nil, ctx.Err()
}

errs = errors.Join(errs, res.err)
case <-ctx.Done():
return nil, ctx.Err()
}
}

if errs == nil {
return nil, routing.ErrNotFound
}

return nil, errs
}

func (r parallelRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error {
switch len(r.routers) {
case 0:
return nil
case 1:
return r.routers[0].PutIPNS(ctx, name, record)
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

var wg sync.WaitGroup
results := make([]error, len(r.routers))
wg.Add(len(r.routers))
for i, ri := range r.routers {
go func(ri server.ContentRouter, i int) {
results[i] = ri.PutIPNS(ctx, name, record)
wg.Done()
}(ri, i)
}
wg.Wait()

var errs error
for _, err := range results {
errs = errors.Join(errs, err)
}
return errs
}

0 comments on commit de535a2

Please sign in to comment.