Skip to content

Commit

Permalink
Requested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ajnavarro committed Sep 22, 2022
1 parent 239ffb5 commit 2256a55
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 61 deletions.
3 changes: 3 additions & 0 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,9 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
routingOption, _ := req.Options[routingOptionKwd].(string)
if routingOption == routingOptionDefaultKwd {
routingOption = cfg.Routing.Type
if routingOption == "" {
routingOption = routingOptionDHTKwd
}
}
switch routingOption {
case routingOptionSupernodeKwd:
Expand Down
34 changes: 33 additions & 1 deletion config/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"encoding/json"
"fmt"
)

// Routing defines configuration options for libp2p routing
Expand Down Expand Up @@ -34,6 +35,37 @@ type Router struct {

type Routers map[string]RouterParser
type Methods map[MethodName]Method

func (m Methods) Check() error {

// Check supported methods
for _, mn := range MethodNameList {
_, ok := m[mn]
if !ok {
return fmt.Errorf("method name %q is missing from Routing.Methods config param", mn)
}
}

// Check unsupported methods
for k := range m {
seen := false
for _, mn := range MethodNameList {
if mn == k {
seen = true
break
}
}

if seen {
continue
}

return fmt.Errorf("method name %q is not a supported method on Routing.Methods config param", k)
}

return nil
}

type RouterParser struct {
Router
}
Expand Down Expand Up @@ -98,7 +130,7 @@ const (
MethodNamePutIPNS MethodName = "put-ipns"
)

const MethodsCount = 5
var MethodNameList = []MethodName{MethodNameProvide, MethodNameFindPeers, MethodNameFindProviders, MethodNameGetIPNS, MethodNamePutIPNS}

type ReframeRouterParams struct {
// Endpoint is the URL where the routing implementation will point to get the information.
Expand Down
45 changes: 43 additions & 2 deletions config/routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestRouterParameters(t *testing.T) {
}},
},
},
Methods: map[MethodName]Method{
Methods: Methods{
MethodNameFindPeers: {
RouterName: "router-reframe",
},
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestRouterMissingParameters(t *testing.T) {
},
}},
},
Methods: map[MethodName]Method{
Methods: Methods{
MethodNameFindPeers: {
RouterName: "router-wrong-reframe",
},
Expand Down Expand Up @@ -157,3 +157,44 @@ func TestRouterMissingParameters(t *testing.T) {
require.NoError(err)
require.Empty(r2.Routers["router-wrong-reframe"].Parameters.(*ReframeRouterParams).Endpoint)
}

func TestMethods(t *testing.T) {
require := require.New(t)

methodsOK := Methods{
MethodNameFindPeers: {
RouterName: "router-wrong-reframe",
},
MethodNameFindProviders: {
RouterName: "router-wrong-reframe",
},
MethodNameGetIPNS: {
RouterName: "router-wrong-reframe",
},
MethodNameProvide: {
RouterName: "router-wrong-reframe",
},
MethodNamePutIPNS: {
RouterName: "router-wrong-reframe",
},
}

require.NoError(methodsOK.Check())

methodsMissing := Methods{
MethodNameFindPeers: {
RouterName: "router-wrong-reframe",
},
MethodNameGetIPNS: {
RouterName: "router-wrong-reframe",
},
MethodNameProvide: {
RouterName: "router-wrong-reframe",
},
MethodNamePutIPNS: {
RouterName: "router-wrong-reframe",
},
}

require.Error(methodsMissing.Check())
}
3 changes: 1 addition & 2 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package config

import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -288,7 +287,7 @@ func (d *Duration) UnmarshalJSON(b []byte) error {
}
return nil
default:
return errors.New("invalid duration")
return fmt.Errorf("unable to parse duration, expected a duration string or a float, but got %T", v)
}
}

Expand Down
133 changes: 82 additions & 51 deletions routing/delegated.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"net/http"

"github.com/ipfs/go-datastore"
drc "github.com/ipfs/go-delegated-routing/client"
Expand All @@ -27,7 +28,7 @@ import (
var log = logging.Logger("routing/delegated")

func Parse(routers config.Routers, methods config.Methods, extraDHT *ExtraDHTParams, extraReframe *ExtraReframeParams) (routing.Routing, error) {
stack := make(map[string]routing.Routing)
createdRouters := make(map[string]routing.Routing)
processLater := make(config.Routers)
log.Info("starting to parse ", len(routers), " routers")
for k, r := range routers {
Expand All @@ -41,72 +42,43 @@ func Parse(routers config.Routers, methods config.Methods, extraDHT *ExtraDHTPar
continue
}
log.Info("creating router ", k)
router, err := routingFromConfig(r.Router, extraDHT, extraReframe)
router, err := routingFromConfig(r.Router, extraDHT, extraReframe, nil, nil)
if err != nil {
return nil, err
}

log.Info("router ", k, " created with params ", r.Parameters)

stack[k] = router
createdRouters[k] = router
}

// using the stack, instantiate all parallel and sequential routers
// using the createdRouters, instantiate all parallel and sequential routers
for k, r := range processLater {
crp, ok := r.Router.Parameters.(*config.ComposableRouterParams)
if !ok {
return nil, fmt.Errorf("problem getting composable router Parameters from router %s", k)
return nil, fmt.Errorf("problem getting composable router Parameters from router %q", k)
}

log.Info("creating router helper ", k)
switch r.Type {
case config.RouterTypeParallel:
var pr []*routinghelpers.ParallelRouter
for _, cr := range crp.Routers {
ri, ok := stack[cr.RouterName]
if !ok {
return nil, fmt.Errorf("router with name %s not found", cr.RouterName)
}

pr = append(pr, &routinghelpers.ParallelRouter{
Router: ri,
IgnoreError: cr.IgnoreErrors,
Timeout: cr.Timeout.Duration,
ExecuteAfter: cr.ExecuteAfter.WithDefault(0),
})
}

stack[k] = routinghelpers.NewComposableParallel(pr)
case config.RouterTypeSequential:
var sr []*routinghelpers.SequentialRouter
for _, cr := range crp.Routers {
ri, ok := stack[cr.RouterName]
if !ok {
return nil, fmt.Errorf("router with name %s not found", cr.RouterName)
}

sr = append(sr, &routinghelpers.SequentialRouter{
Router: ri,
IgnoreError: cr.IgnoreErrors,
Timeout: cr.Timeout.Duration,
})
}

stack[k] = routinghelpers.NewComposableSequential(sr)
router, err := routingFromConfig(r.Router, extraDHT, extraReframe, crp, createdRouters)
if err != nil {
return nil, err
}

createdRouters[k] = router

log.Info("router ", k, " created with params ", r.Parameters)
}

if len(methods) != config.MethodsCount {
return nil, fmt.Errorf("number of methods from routing configuration must be %d", config.MethodsCount)
if err := methods.Check(); err != nil {
return nil, err
}

finalRouter := &Composer{}
for mn, m := range methods {
router, ok := stack[m.RouterName]
router, ok := createdRouters[m.RouterName]
if !ok {
return nil, fmt.Errorf("router with name %s not found for method %s", m.RouterName, mn)
return nil, fmt.Errorf("router with name %q not found for method %q", m.RouterName, mn)
}
switch mn {
case config.MethodNamePutIPNS:
Expand All @@ -127,16 +99,66 @@ func Parse(routers config.Routers, methods config.Methods, extraDHT *ExtraDHTPar
return finalRouter, nil
}

func routingFromConfig(conf config.Router, extraDHT *ExtraDHTParams, extraReframe *ExtraReframeParams) (routing.Routing, error) {
func routingFromConfig(conf config.Router,
extraDHT *ExtraDHTParams,
extraReframe *ExtraReframeParams,
extraComposableParams *config.ComposableRouterParams,
routers map[string]routing.Routing,
) (routing.Routing, error) {
var router routing.Routing
var err error
switch conf.Type {
case config.RouterTypeReframe:
router, err = reframeRoutingFromConfig(conf, extraReframe)
case config.RouterTypeDHT:
router, err = dhtRoutingFromConfig(conf, extraDHT)
case config.RouterTypeParallel:
if extraComposableParams == nil || routers == nil {
err = fmt.Errorf("missing params needed to create a composable router")
break
}
var pr []*routinghelpers.ParallelRouter
for _, cr := range extraComposableParams.Routers {
ri, ok := routers[cr.RouterName]
if !ok {
err = fmt.Errorf("router with name %q not found. If you have a router with this name, "+
"check routers order in configuration. Take into account that nested parallel and/or sequential "+
"routers are not supported", cr.RouterName)
break
}

pr = append(pr, &routinghelpers.ParallelRouter{
Router: ri,
IgnoreError: cr.IgnoreErrors,
Timeout: cr.Timeout.Duration,
ExecuteAfter: cr.ExecuteAfter.WithDefault(0),
})
}

router = routinghelpers.NewComposableParallel(pr)
case config.RouterTypeSequential:
if extraComposableParams == nil || routers == nil {
err = fmt.Errorf("missing params needed to create a composable router")
break
}
var sr []*routinghelpers.SequentialRouter
for _, cr := range extraComposableParams.Routers {
ri, ok := routers[cr.RouterName]
if !ok {
err = fmt.Errorf("router with name %q not found", cr.RouterName)
break
}

sr = append(sr, &routinghelpers.SequentialRouter{
Router: ri,
IgnoreError: cr.IgnoreErrors,
Timeout: cr.Timeout.Duration,
})
}

router = routinghelpers.NewComposableSequential(sr)
default:
return nil, fmt.Errorf("unknown router type %s", conf.Type)
return nil, fmt.Errorf("unknown router type %q", conf.Type)
}

return router, err
Expand All @@ -151,21 +173,30 @@ type ExtraReframeParams struct {
func reframeRoutingFromConfig(conf config.Router, extraReframe *ExtraReframeParams) (routing.Routing, error) {
var dr drp.DelegatedRouting_Client

params, ok := conf.Parameters.(*config.ReframeRouterParams)
if !ok {
return nil, errors.New("problem getting reframe Parameters")
}
params := conf.Parameters.(*config.ReframeRouterParams)

if params.Endpoint == "" {
return nil, NewParamNeededErr("Endpoint", conf.Type)
}

dr, err := drp.New_DelegatedRouting_Client(params.Endpoint)
// Increase per-host connection pool since we are making lots of concurrent requests.
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConns = 500
transport.MaxIdleConnsPerHost = 100

delegateHTTPClient := &http.Client{
Transport: transport,
}
dr, err := drp.New_DelegatedRouting_Client(params.Endpoint,
drp.DelegatedRouting_Client_WithHTTPClient(delegateHTTPClient),
)
if err != nil {
return nil, err
}

var c *drc.Client

// this path is for tests only
if extraReframe == nil {
c, err = drc.NewClient(dr, nil, nil)
if err != nil {
Expand Down Expand Up @@ -258,7 +289,7 @@ func dhtRoutingFromConfig(conf config.Router, extra *ExtraDHTParams) (routing.Ro
case config.DHTModeServer:
mode = dht.ModeServer
default:
return nil, fmt.Errorf("invalid DHT mode: [%s]", params.Mode)
return nil, fmt.Errorf("invalid DHT mode: %q", params.Mode)
}

return createDHT(extra, params.PublicIPNetwork, mode)
Expand Down
10 changes: 5 additions & 5 deletions routing/delegated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ func TestRoutingFromConfig(t *testing.T) {

r, err := routingFromConfig(config.Router{
Type: "unknown",
}, nil, nil)
}, nil, nil, nil, nil)

require.Nil(r)
require.EqualError(err, "unknown router type unknown")
require.EqualError(err, "unknown router type \"unknown\"")

r, err = routingFromConfig(config.Router{
Type: config.RouterTypeReframe,
Parameters: &config.ReframeRouterParams{},
}, nil, nil)
}, nil, nil, nil, nil)

require.Nil(r)
require.EqualError(err, "configuration param 'Endpoint' is needed for reframe delegated routing types")
Expand All @@ -33,7 +33,7 @@ func TestRoutingFromConfig(t *testing.T) {
Parameters: &config.ReframeRouterParams{
Endpoint: "test",
},
}, nil, nil)
}, nil, nil, nil, nil)

require.NoError(err)
require.NotNil(r)
Expand All @@ -56,7 +56,7 @@ func TestRoutingFromConfig(t *testing.T) {
PeerID: id.String(),
Addrs: []string{"/ip4/0.0.0.0/tcp/4001"},
PrivKeyB64: base64.StdEncoding.EncodeToString(privM),
})
}, nil, nil)

require.NotNil(r)
require.NoError(err)
Expand Down
Loading

0 comments on commit 2256a55

Please sign in to comment.