Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: rcmgr: Change LimitConfig to use LimitVal type #2000

Merged
merged 18 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions p2p/host/resource-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,21 @@ scalingLimits := rcmgr.DefaultLimits
// Add limits around included libp2p protocols
libp2p.SetDefaultServiceLimits(&scalingLimits)

// Turn the scaling limits into a static set of limits using `.AutoScale`. This
// Turn the scaling limits into a reified set of limits using `.AutoScale`. This
// scales the limits proportional to your system memory.
limits := scalingLimits.AutoScale()
scaledDefaultLimits := scalingLimits.AutoScale()

// Tweak certain settings
cfg := rcmgr.LimitConfig{
System: &rcmgr.ResourceLimits{
// Allow unlimited outbound streams
StreamsOutbound: rcmgr.Unlimited,
},
// Everything else is default. The exact values will come from `scaledDefaultLimits` above.
}

// Create our limits by using our cfg and replacing the default values with values from `scaledDefaultLimits`
limits := cfg.Reify(scaledDefaultLimits)
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved

// The resource manager expects a limiter, se we create one from our limits.
limiter := rcmgr.NewFixedLimiter(limits)
Expand All @@ -51,6 +63,54 @@ if err != nil {
host, err := libp2p.New(libp2p.ResourceManager(rm))
```

### Saving the limits config
The easiest way to save the defined limits is to serialize the `LimitConfig`
type as JSON.

```go
noisyNeighbor, _ := peer.Decode("QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf")
cfg := rcmgr.LimitConfig{
System: &rcmgr.ResourceLimits{
// Allow unlimited outbound streams
StreamsOutbound: rcmgr.Unlimited,
},
Peer: map[peer.ID]rcmgr.ResourceLimits{
noisyNeighbor: {
// No inbound connections from this peer
ConnsInbound: rcmgr.BlockAllLimit,
// But let me open connections to them
Conns: rcmgr.DefaultLimit,
ConnsOutbound: rcmgr.DefaultLimit,
Comment on lines +81 to +83
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you even have to specify rcmgr.DefaultLimit or are those just added for clarity?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added for clarity, and to show that default doesn't get serialized into json. You'll also notice Memory is implicitly default.

// No inbound streams from this peer
StreamsInbound: rcmgr.BlockAllLimit,
// And let me open unlimited (by me) outbound streams (the peer may have their own limits on me)
StreamsOutbound: rcmgr.Unlimited,
},
},
}
jsonBytes, _ := json.Marshal(&cfg)

// string(jsonBytes)
// {
// "System": {
// "StreamsOutbound": "unlimited"
// },
// "Peer": {
// "QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf": {
// "StreamsInbound": "blockAll",
// "StreamsOutbound": "unlimited",
// "ConnsInbound": "blockAll"
Comment on lines +100 to +102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the order that they serialize as? If we are in control, I think it would be good to go from outwards to in

Conns
ConnsInbound
CoonnsOutbound
Streams
StreamsInbound
StreamsOutbound

// }
// }
// }
```

This will omit defaults from the JSON output. It will also serialize the
blockAll, and unlimited values explicitly.

The `Memory` field is serialized as a string to workaround the JSON limitation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To drive this home, maybe set a Memory limit in your example above so it shows up in the serialization?

of 32 bit integers (`Memory` is an int64).

## Basic Resources

### Memory
Expand Down
123 changes: 99 additions & 24 deletions p2p/host/resource-manager/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package rcmgr
import (
"encoding/json"
"io"
"math"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -56,33 +57,32 @@ func NewDefaultLimiterFromJSON(in io.Reader) (Limiter, error) {
}

// NewLimiterFromJSON creates a new limiter by parsing a json configuration.
func NewLimiterFromJSON(in io.Reader, defaults LimitConfig) (Limiter, error) {
func NewLimiterFromJSON(in io.Reader, defaults ReifiedLimitConfig) (Limiter, error) {
cfg, err := readLimiterConfigFromJSON(in, defaults)
if err != nil {
return nil, err
}
return &fixedLimiter{cfg}, nil
}

func readLimiterConfigFromJSON(in io.Reader, defaults LimitConfig) (LimitConfig, error) {
func readLimiterConfigFromJSON(in io.Reader, defaults ReifiedLimitConfig) (ReifiedLimitConfig, error) {
var cfg LimitConfig
if err := json.NewDecoder(in).Decode(&cfg); err != nil {
return LimitConfig{}, err
return ReifiedLimitConfig{}, err
}
cfg.Apply(defaults)
return cfg, nil
return cfg.Reify(defaults), nil
}

// fixedLimiter is a limiter with fixed limits.
type fixedLimiter struct {
LimitConfig
ReifiedLimitConfig
}

var _ Limiter = (*fixedLimiter)(nil)

func NewFixedLimiter(conf LimitConfig) Limiter {
func NewFixedLimiter(conf ReifiedLimitConfig) Limiter {
log.Debugw("initializing new limiter with config", "limits", conf)
return &fixedLimiter{LimitConfig: conf}
return &fixedLimiter{conf}
}

// BaseLimit is a mixin type for basic resource limits.
Expand All @@ -97,6 +97,81 @@ type BaseLimit struct {
Memory int64 `json:",omitempty"`
}

func valueOrBlockAll(n int) LimitVal {
if n == 0 {
return BlockAllLimit
}
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
return LimitVal(n)
}
func valueOrBlockAll64(n int64) LimitVal64 {
if n == 0 {
return BlockAllLimit64
}
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
return LimitVal64(n)
}

func limitValFromInt(i int, defaultVal int) LimitVal {
if i == defaultVal {
return DefaultLimit
} else if i == math.MaxInt {
return Unlimited
} else if i == 0 {
return BlockAllLimit
}
return LimitVal(i)
}

func limitValFromInt64(i int64, defaultVal int64) LimitVal64 {
if i == defaultVal {
return DefaultLimit64
} else if i == math.MaxInt {
return Unlimited64
} else if i == 0 {
return BlockAllLimit64
}
return LimitVal64(i)
}

// ToResourceLimits converts the BaseLimit to a ResourceLimits
func (l BaseLimit) ToResourceLimits() *ResourceLimits {
return &ResourceLimits{
Streams: valueOrBlockAll(l.Streams),
StreamsInbound: valueOrBlockAll(l.StreamsInbound),
StreamsOutbound: valueOrBlockAll(l.StreamsOutbound),
Conns: valueOrBlockAll(l.Conns),
ConnsInbound: valueOrBlockAll(l.ConnsInbound),
ConnsOutbound: valueOrBlockAll(l.ConnsOutbound),
FD: valueOrBlockAll(l.FD),
Memory: valueOrBlockAll64(l.Memory),
}
}

func (l BaseLimit) ToResourceLimitsWithDefault(defaultLimit BaseLimit) *ResourceLimits {
out := ResourceLimits{
Streams: limitValFromInt(l.Streams, defaultLimit.Streams),
StreamsInbound: limitValFromInt(l.StreamsInbound, defaultLimit.StreamsInbound),
StreamsOutbound: limitValFromInt(l.StreamsOutbound, defaultLimit.StreamsOutbound),
Conns: limitValFromInt(l.Conns, defaultLimit.Conns),
ConnsInbound: limitValFromInt(l.ConnsInbound, defaultLimit.ConnsInbound),
ConnsOutbound: limitValFromInt(l.ConnsOutbound, defaultLimit.ConnsOutbound),
FD: limitValFromInt(l.FD, defaultLimit.FD),
Memory: limitValFromInt64(l.Memory, defaultLimit.Memory),
}

if out.Streams == DefaultLimit &&
out.StreamsInbound == DefaultLimit &&
out.StreamsOutbound == DefaultLimit &&
out.Conns == DefaultLimit &&
out.ConnsInbound == DefaultLimit &&
out.ConnsOutbound == DefaultLimit &&
out.FD == DefaultLimit &&
out.Memory == DefaultLimit64 {
return nil
}
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved

return &out
}

// Apply overwrites all zero-valued limits with the values of l2
// Must not use a pointer receiver.
func (l *BaseLimit) Apply(l2 BaseLimit) {
Expand Down Expand Up @@ -202,65 +277,65 @@ func (l *BaseLimit) GetMemoryLimit() int64 {
}

func (l *fixedLimiter) GetSystemLimits() Limit {
return &l.System
return &l.system
ajnavarro marked this conversation as resolved.
Show resolved Hide resolved
}

func (l *fixedLimiter) GetTransientLimits() Limit {
return &l.Transient
return &l.transient
}

func (l *fixedLimiter) GetAllowlistedSystemLimits() Limit {
return &l.AllowlistedSystem
return &l.allowlistedSystem
}

func (l *fixedLimiter) GetAllowlistedTransientLimits() Limit {
return &l.AllowlistedTransient
return &l.allowlistedTransient
}

func (l *fixedLimiter) GetServiceLimits(svc string) Limit {
sl, ok := l.Service[svc]
sl, ok := l.service[svc]
if !ok {
return &l.ServiceDefault
return &l.serviceDefault
}
return &sl
}

func (l *fixedLimiter) GetServicePeerLimits(svc string) Limit {
pl, ok := l.ServicePeer[svc]
pl, ok := l.servicePeer[svc]
if !ok {
return &l.ServicePeerDefault
return &l.servicePeerDefault
}
return &pl
}

func (l *fixedLimiter) GetProtocolLimits(proto protocol.ID) Limit {
pl, ok := l.Protocol[proto]
pl, ok := l.protocol[proto]
if !ok {
return &l.ProtocolDefault
return &l.protocolDefault
}
return &pl
}

func (l *fixedLimiter) GetProtocolPeerLimits(proto protocol.ID) Limit {
pl, ok := l.ProtocolPeer[proto]
pl, ok := l.protocolPeer[proto]
if !ok {
return &l.ProtocolPeerDefault
return &l.protocolPeerDefault
}
return &pl
}

func (l *fixedLimiter) GetPeerLimits(p peer.ID) Limit {
pl, ok := l.Peer[p]
pl, ok := l.peer[p]
if !ok {
return &l.PeerDefault
return &l.peerDefault
}
return &pl
}

func (l *fixedLimiter) GetStreamLimits(_ peer.ID) Limit {
return &l.Stream
return &l.stream
}

func (l *fixedLimiter) GetConnLimits() Limit {
return &l.Conn
return &l.conn
}
45 changes: 45 additions & 0 deletions p2p/host/resource-manager/limit_config_test.backwards-compat.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"System": {
"Memory": 65536,
"Conns": 16,
"ConnsInbound": 8,
"ConnsOutbound": 16,
"FD": 16
},
"ServiceDefault": {
"Memory": 8765
},
"Service": {
"A": {
"Memory": 8192
},
"B": {}
},
"ServicePeerDefault": {
"Memory": 2048
},
"ServicePeer": {
"A": {
"Memory": 4096
}
},
"ProtocolDefault": {
"Memory": 2048
},
"ProtocolPeerDefault": {
"Memory": 1024
},
"Protocol": {
"/A": {
"Memory": 8192
}
},
"PeerDefault": {
"Memory": 4096
},
"Peer": {
"12D3KooWPFH2Bx2tPfw6RLxN8k2wh47GRXgkt9yrAHU37zFwHWzS": {
"Memory": 4097
}
}
}
Loading