Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: rcmgr: Change LimitConfig to use LimitVal type #2000

Merged
merged 18 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 64 additions & 4 deletions p2p/host/resource-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,21 @@ scalingLimits := rcmgr.DefaultLimits
// Add limits around included libp2p protocols
libp2p.SetDefaultServiceLimits(&scalingLimits)

// Turn the scaling limits into a static set of limits using `.AutoScale`. This
// Turn the scaling limits into a concrete set of limits using `.AutoScale`. This
// scales the limits proportional to your system memory.
limits := scalingLimits.AutoScale()
scaledDefaultLimits := scalingLimits.AutoScale()

// Tweak certain settings
cfg := rcmgr.PartialLimitConfig{
System: &rcmgr.ResourceLimits{
// Allow unlimited outbound streams
StreamsOutbound: rcmgr.Unlimited,
},
// Everything else is default. The exact values will come from `scaledDefaultLimits` above.
}

// Create our limits by using our cfg and replacing the default values with values from `scaledDefaultLimits`
limits := cfg.Build(scaledDefaultLimits)

// The resource manager expects a limiter, se we create one from our limits.
limiter := rcmgr.NewFixedLimiter(limits)
Expand All @@ -51,6 +63,54 @@ if err != nil {
host, err := libp2p.New(libp2p.ResourceManager(rm))
```

### Saving the limits config
The easiest way to save the defined limits is to serialize the `PartialLimitConfig`
type as JSON.

```go
noisyNeighbor, _ := peer.Decode("QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf")
cfg := rcmgr.PartialLimitConfig{
System: &rcmgr.ResourceLimits{
// Allow unlimited outbound streams
StreamsOutbound: rcmgr.Unlimited,
},
Peer: map[peer.ID]rcmgr.ResourceLimits{
noisyNeighbor: {
// No inbound connections from this peer
ConnsInbound: rcmgr.BlockAllLimit,
// But let me open connections to them
Conns: rcmgr.DefaultLimit,
ConnsOutbound: rcmgr.DefaultLimit,
Comment on lines +81 to +83
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you even have to specify rcmgr.DefaultLimit or are those just added for clarity?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added for clarity, and to show that default doesn't get serialized into json. You'll also notice Memory is implicitly default.

// No inbound streams from this peer
StreamsInbound: rcmgr.BlockAllLimit,
// And let me open unlimited (by me) outbound streams (the peer may have their own limits on me)
StreamsOutbound: rcmgr.Unlimited,
},
},
}
jsonBytes, _ := json.Marshal(&cfg)

// string(jsonBytes)
// {
// "System": {
// "StreamsOutbound": "unlimited"
// },
// "Peer": {
// "QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf": {
// "StreamsInbound": "blockAll",
// "StreamsOutbound": "unlimited",
// "ConnsInbound": "blockAll"
Comment on lines +100 to +102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the order that they serialize as? If we are in control, I think it would be good to go from outwards to in

Conns
ConnsInbound
CoonnsOutbound
Streams
StreamsInbound
StreamsOutbound

// }
// }
// }
```

This will omit defaults from the JSON output. It will also serialize the
blockAll, and unlimited values explicitly.

The `Memory` field is serialized as a string to workaround the JSON limitation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To drive this home, maybe set a Memory limit in your example above so it shows up in the serialization?

of 32 bit integers (`Memory` is an int64).

## Basic Resources

### Memory
Expand Down Expand Up @@ -278,7 +338,7 @@ This is done using the `ScalingLimitConfig`. For every scope, this configuration
struct defines the absolutely bare minimum limits, and an (optional) increase of
these limits, which will be applied on nodes that have sufficient memory.

A `ScalingLimitConfig` can be converted into a `LimitConfig` (which can then be
A `ScalingLimitConfig` can be converted into a `ConcreteLimitConfig` (which can then be
used to initialize a fixed limiter with `NewFixedLimiter`) by calling the `Scale` method.
The `Scale` method takes two parameters: the amount of memory and the number of file
descriptors that an application is willing to dedicate to libp2p.
Comment on lines +341 to 344
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Maybe also mention the AutoScale method for added convenience?

Expand Down Expand Up @@ -346,7 +406,7 @@ go-libp2p process. For the default definitions see [`DefaultLimits` and

If the defaults seem mostly okay, but you want to adjust one facet you can
simply copy the default struct object and update the field you want to change. You can
apply changes to a `BaseLimit`, `BaseLimitIncrease`, and `LimitConfig` with
apply changes to a `BaseLimit`, `BaseLimitIncrease`, and `ConcreteLimitConfig` with
`.Apply`.

Example
Expand Down
93 changes: 62 additions & 31 deletions p2p/host/resource-manager/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package rcmgr
import (
"encoding/json"
"io"
"math"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -56,33 +57,32 @@ func NewDefaultLimiterFromJSON(in io.Reader) (Limiter, error) {
}

// NewLimiterFromJSON creates a new limiter by parsing a json configuration.
func NewLimiterFromJSON(in io.Reader, defaults LimitConfig) (Limiter, error) {
func NewLimiterFromJSON(in io.Reader, defaults ConcreteLimitConfig) (Limiter, error) {
cfg, err := readLimiterConfigFromJSON(in, defaults)
if err != nil {
return nil, err
}
return &fixedLimiter{cfg}, nil
}

func readLimiterConfigFromJSON(in io.Reader, defaults LimitConfig) (LimitConfig, error) {
var cfg LimitConfig
func readLimiterConfigFromJSON(in io.Reader, defaults ConcreteLimitConfig) (ConcreteLimitConfig, error) {
var cfg PartialLimitConfig
if err := json.NewDecoder(in).Decode(&cfg); err != nil {
return LimitConfig{}, err
return ConcreteLimitConfig{}, err
}
cfg.Apply(defaults)
return cfg, nil
return cfg.Build(defaults), nil
}

// fixedLimiter is a limiter with fixed limits.
type fixedLimiter struct {
LimitConfig
ConcreteLimitConfig
}

var _ Limiter = (*fixedLimiter)(nil)

func NewFixedLimiter(conf LimitConfig) Limiter {
func NewFixedLimiter(conf ConcreteLimitConfig) Limiter {
log.Debugw("initializing new limiter with config", "limits", conf)
return &fixedLimiter{LimitConfig: conf}
return &fixedLimiter{conf}
}

// BaseLimit is a mixin type for basic resource limits.
Expand All @@ -97,6 +97,37 @@ type BaseLimit struct {
Memory int64 `json:",omitempty"`
}

func valueOrBlockAll(n int) LimitVal {
if n == 0 {
return BlockAllLimit
} else if n == math.MaxInt {
return Unlimited
}
return LimitVal(n)
}
func valueOrBlockAll64(n int64) LimitVal64 {
if n == 0 {
return BlockAllLimit64
} else if n == math.MaxInt {
return Unlimited64
}
return LimitVal64(n)
}

// ToResourceLimits converts the BaseLimit to a ResourceLimits
func (l BaseLimit) ToResourceLimits() ResourceLimits {
return ResourceLimits{
Streams: valueOrBlockAll(l.Streams),
StreamsInbound: valueOrBlockAll(l.StreamsInbound),
StreamsOutbound: valueOrBlockAll(l.StreamsOutbound),
Conns: valueOrBlockAll(l.Conns),
ConnsInbound: valueOrBlockAll(l.ConnsInbound),
ConnsOutbound: valueOrBlockAll(l.ConnsOutbound),
FD: valueOrBlockAll(l.FD),
Memory: valueOrBlockAll64(l.Memory),
}
}

// Apply overwrites all zero-valued limits with the values of l2
// Must not use a pointer receiver.
func (l *BaseLimit) Apply(l2 BaseLimit) {
Expand Down Expand Up @@ -169,98 +200,98 @@ func (l *BaseLimitIncrease) Apply(l2 BaseLimitIncrease) {
}
}

func (l *BaseLimit) GetStreamLimit(dir network.Direction) int {
func (l BaseLimit) GetStreamLimit(dir network.Direction) int {
if dir == network.DirInbound {
return l.StreamsInbound
} else {
return l.StreamsOutbound
}
}

func (l *BaseLimit) GetStreamTotalLimit() int {
func (l BaseLimit) GetStreamTotalLimit() int {
return l.Streams
}

func (l *BaseLimit) GetConnLimit(dir network.Direction) int {
func (l BaseLimit) GetConnLimit(dir network.Direction) int {
if dir == network.DirInbound {
return l.ConnsInbound
} else {
return l.ConnsOutbound
}
}

func (l *BaseLimit) GetConnTotalLimit() int {
func (l BaseLimit) GetConnTotalLimit() int {
return l.Conns
}

func (l *BaseLimit) GetFDLimit() int {
func (l BaseLimit) GetFDLimit() int {
return l.FD
}

func (l *BaseLimit) GetMemoryLimit() int64 {
func (l BaseLimit) GetMemoryLimit() int64 {
return l.Memory
}

func (l *fixedLimiter) GetSystemLimits() Limit {
return &l.System
return &l.system
ajnavarro marked this conversation as resolved.
Show resolved Hide resolved
}

func (l *fixedLimiter) GetTransientLimits() Limit {
return &l.Transient
return &l.transient
}

func (l *fixedLimiter) GetAllowlistedSystemLimits() Limit {
return &l.AllowlistedSystem
return &l.allowlistedSystem
}

func (l *fixedLimiter) GetAllowlistedTransientLimits() Limit {
return &l.AllowlistedTransient
return &l.allowlistedTransient
}

func (l *fixedLimiter) GetServiceLimits(svc string) Limit {
sl, ok := l.Service[svc]
sl, ok := l.service[svc]
if !ok {
return &l.ServiceDefault
return &l.serviceDefault
}
return &sl
}

func (l *fixedLimiter) GetServicePeerLimits(svc string) Limit {
pl, ok := l.ServicePeer[svc]
pl, ok := l.servicePeer[svc]
if !ok {
return &l.ServicePeerDefault
return &l.servicePeerDefault
}
return &pl
}

func (l *fixedLimiter) GetProtocolLimits(proto protocol.ID) Limit {
pl, ok := l.Protocol[proto]
pl, ok := l.protocol[proto]
if !ok {
return &l.ProtocolDefault
return &l.protocolDefault
}
return &pl
}

func (l *fixedLimiter) GetProtocolPeerLimits(proto protocol.ID) Limit {
pl, ok := l.ProtocolPeer[proto]
pl, ok := l.protocolPeer[proto]
if !ok {
return &l.ProtocolPeerDefault
return &l.protocolPeerDefault
}
return &pl
}

func (l *fixedLimiter) GetPeerLimits(p peer.ID) Limit {
pl, ok := l.Peer[p]
pl, ok := l.peer[p]
if !ok {
return &l.PeerDefault
return &l.peerDefault
}
return &pl
}

func (l *fixedLimiter) GetStreamLimits(_ peer.ID) Limit {
return &l.Stream
return &l.stream
}

func (l *fixedLimiter) GetConnLimits() Limit {
return &l.Conn
return &l.conn
}
45 changes: 45 additions & 0 deletions p2p/host/resource-manager/limit_config_test.backwards-compat.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"System": {
"Memory": 65536,
"Conns": 16,
"ConnsInbound": 8,
"ConnsOutbound": 16,
"FD": 16
},
"ServiceDefault": {
"Memory": 8765
},
"Service": {
"A": {
"Memory": 8192
},
"B": {}
},
"ServicePeerDefault": {
"Memory": 2048
},
"ServicePeer": {
"A": {
"Memory": 4096
}
},
"ProtocolDefault": {
"Memory": 2048
},
"ProtocolPeerDefault": {
"Memory": 1024
},
"Protocol": {
"/A": {
"Memory": 8192
}
},
"PeerDefault": {
"Memory": 4096
},
"Peer": {
"12D3KooWPFH2Bx2tPfw6RLxN8k2wh47GRXgkt9yrAHU37zFwHWzS": {
"Memory": 4097
}
}
}
Loading