Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay: make only 1 reservation per peer #2974

Merged
merged 2 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 51 additions & 43 deletions p2p/protocol/circuitv2/relay/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package relay

import (
"errors"
"slices"
"sync"
"time"

Expand All @@ -12,46 +13,48 @@ import (
manet "github.com/multiformats/go-multiaddr/net"
)

var validity = 30 * time.Minute

var (
errTooManyReservations = errors.New("too many reservations")
errTooManyReservationsForPeer = errors.New("too many reservations for peer")
errTooManyReservationsForIP = errors.New("too many peers for IP address")
errTooManyReservationsForASN = errors.New("too many peers for ASN")
errTooManyReservations = errors.New("too many reservations")
errTooManyReservationsForIP = errors.New("too many peers for IP address")
errTooManyReservationsForASN = errors.New("too many peers for ASN")
)

type peerWithExpiry struct {
Expiry time.Time
Peer peer.ID
}

// constraints implements various reservation constraints
type constraints struct {
rc *Resources

mutex sync.Mutex
total []time.Time
peers map[peer.ID][]time.Time
ips map[string][]time.Time
asns map[uint32][]time.Time
total []peerWithExpiry
ips map[string][]peerWithExpiry
asns map[uint32][]peerWithExpiry
}

// newConstraints creates a new constraints object.
// The methods are *not* thread-safe; an external lock must be held if synchronization
// is required.
func newConstraints(rc *Resources) *constraints {
return &constraints{
rc: rc,
peers: make(map[peer.ID][]time.Time),
ips: make(map[string][]time.Time),
asns: make(map[uint32][]time.Time),
rc: rc,
ips: make(map[string][]peerWithExpiry),
asns: make(map[uint32][]peerWithExpiry),
}
}

// AddReservation adds a reservation for a given peer with a given multiaddr.
// If adding this reservation violates IP constraints, an error is returned.
func (c *constraints) AddReservation(p peer.ID, a ma.Multiaddr) error {
// Reserve adds a reservation for a given peer with a given multiaddr.
// If adding this reservation violates IP, ASN, or total reservation constraints, an error is returned.
func (c *constraints) Reserve(p peer.ID, a ma.Multiaddr, expiry time.Time) error {
c.mutex.Lock()
defer c.mutex.Unlock()

now := time.Now()
c.cleanup(now)
// To handle refreshes correctly, remove the existing reservation for the peer.
c.cleanupPeer(p)

if len(c.total) >= c.rc.MaxReservations {
return errTooManyReservations
Expand All @@ -62,17 +65,12 @@ func (c *constraints) AddReservation(p peer.ID, a ma.Multiaddr) error {
return errors.New("no IP address associated with peer")
}

peerReservations := c.peers[p]
if len(peerReservations) >= c.rc.MaxReservationsPerPeer {
return errTooManyReservationsForPeer
}

ipReservations := c.ips[ip.String()]
if len(ipReservations) >= c.rc.MaxReservationsPerIP {
return errTooManyReservationsForIP
}

var asnReservations []time.Time
var asnReservations []peerWithExpiry
var asn uint32
if ip.To4() == nil {
asn = asnutil.AsnForIPv6(ip)
Expand All @@ -84,42 +82,52 @@ func (c *constraints) AddReservation(p peer.ID, a ma.Multiaddr) error {
}
}

expiry := now.Add(validity)
c.total = append(c.total, expiry)

peerReservations = append(peerReservations, expiry)
c.peers[p] = peerReservations
c.total = append(c.total, peerWithExpiry{Expiry: expiry, Peer: p})

ipReservations = append(ipReservations, expiry)
ipReservations = append(ipReservations, peerWithExpiry{Expiry: expiry, Peer: p})
c.ips[ip.String()] = ipReservations

if asn != 0 {
asnReservations = append(asnReservations, expiry)
asnReservations = append(asnReservations, peerWithExpiry{Expiry: expiry, Peer: p})
c.asns[asn] = asnReservations
}
return nil
}

func (c *constraints) cleanupList(l []time.Time, now time.Time) []time.Time {
var index int
for i, t := range l {
if t.After(now) {
break
func (c *constraints) cleanup(now time.Time) {
expireFunc := func(pe peerWithExpiry) bool {
return pe.Expiry.Before(now)
}
c.total = slices.DeleteFunc(c.total, expireFunc)
for k, ipReservations := range c.ips {
c.ips[k] = slices.DeleteFunc(ipReservations, expireFunc)
if len(c.ips[k]) == 0 {
delete(c.ips, k)
}
}
for k, asnReservations := range c.asns {
c.asns[k] = slices.DeleteFunc(asnReservations, expireFunc)
if len(c.asns[k]) == 0 {
delete(c.asns, k)
}
index = i + 1
}
return l[index:]
}

func (c *constraints) cleanup(now time.Time) {
c.total = c.cleanupList(c.total, now)
for k, peerReservations := range c.peers {
c.peers[k] = c.cleanupList(peerReservations, now)
func (c *constraints) cleanupPeer(p peer.ID) {
removeFunc := func(pe peerWithExpiry) bool {
return pe.Peer == p
}
c.total = slices.DeleteFunc(c.total, removeFunc)
for k, ipReservations := range c.ips {
c.ips[k] = c.cleanupList(ipReservations, now)
c.ips[k] = slices.DeleteFunc(ipReservations, removeFunc)
if len(c.ips[k]) == 0 {
delete(c.ips, k)
}
}
for k, asnReservations := range c.asns {
c.asns[k] = c.cleanupList(asnReservations, now)
c.asns[k] = slices.DeleteFunc(asnReservations, removeFunc)
if len(c.asns[k]) == 0 {
delete(c.asns, k)
}
}
}
51 changes: 27 additions & 24 deletions p2p/protocol/circuitv2/relay/constraints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,35 +34,40 @@ func TestConstraints(t *testing.T) {
}
}
const limit = 7
expiry := time.Now().Add(30 * time.Minute)

t.Run("total reservations", func(t *testing.T) {
res := infResources()
res.MaxReservations = limit
c := newConstraints(res)
for i := 0; i < limit; i++ {
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != nil {
t.Fatal(err)
}
}
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != errTooManyReservations {
if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != errTooManyReservations {
t.Fatalf("expected to run into total reservation limit, got %v", err)
}
})

t.Run("reservations per peer", func(t *testing.T) {
t.Run("updates reservations on the same peer", func(t *testing.T) {
p := test.RandPeerIDFatal(t)
p2 := test.RandPeerIDFatal(t)
res := infResources()
res.MaxReservationsPerPeer = limit
res.MaxReservationsPerIP = 1
c := newConstraints(res)
for i := 0; i < limit; i++ {
if err := c.AddReservation(p, randomIPv4Addr(t)); err != nil {
t.Fatal(err)
}

ipAddr := randomIPv4Addr(t)
if err := c.Reserve(p, ipAddr, expiry); err != nil {
t.Fatal(err)
}
if err := c.AddReservation(p, randomIPv4Addr(t)); err != errTooManyReservationsForPeer {
t.Fatalf("expected to run into total reservation limit, got %v", err)
if err := c.Reserve(p2, ipAddr, expiry); err != errTooManyReservationsForIP {
t.Fatalf("expected to run into IP reservation limit as this IP has already been reserved by a different peer, got %v", err)
}
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
if err := c.Reserve(p, randomIPv4Addr(t), expiry); err != nil {
t.Fatalf("expected to update existing reservation for peer, got %v", err)
}
if err := c.Reserve(p2, ipAddr, expiry); err != nil {
t.Fatalf("expected reservation for different peer to be possible, got %v", err)
sukunrt marked this conversation as resolved.
Show resolved Hide resolved
}
})
Expand All @@ -73,14 +78,14 @@ func TestConstraints(t *testing.T) {
res.MaxReservationsPerIP = limit
c := newConstraints(res)
for i := 0; i < limit; i++ {
if err := c.AddReservation(test.RandPeerIDFatal(t), ip); err != nil {
if err := c.Reserve(test.RandPeerIDFatal(t), ip, expiry); err != nil {
t.Fatal(err)
}
}
if err := c.AddReservation(test.RandPeerIDFatal(t), ip); err != errTooManyReservationsForIP {
if err := c.Reserve(test.RandPeerIDFatal(t), ip, expiry); err != errTooManyReservationsForIP {
t.Fatalf("expected to run into total reservation limit, got %v", err)
}
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != nil {
t.Fatalf("expected reservation for different IP to be possible, got %v", err)
}
})
Expand All @@ -101,25 +106,23 @@ func TestConstraints(t *testing.T) {
const ipv6Prefix = "2a03:2880:f003:c07:face:b00c::"
for i := 0; i < limit; i++ {
addr := getAddr(t, net.ParseIP(fmt.Sprintf("%s%d", ipv6Prefix, i+1)))
if err := c.AddReservation(test.RandPeerIDFatal(t), addr); err != nil {
if err := c.Reserve(test.RandPeerIDFatal(t), addr, expiry); err != nil {
t.Fatal(err)
}
}
if err := c.AddReservation(test.RandPeerIDFatal(t), getAddr(t, net.ParseIP(fmt.Sprintf("%s%d", ipv6Prefix, 42)))); err != errTooManyReservationsForASN {
if err := c.Reserve(test.RandPeerIDFatal(t), getAddr(t, net.ParseIP(fmt.Sprintf("%s%d", ipv6Prefix, 42))), expiry); err != errTooManyReservationsForASN {
t.Fatalf("expected to run into total reservation limit, got %v", err)
}
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != nil {
t.Fatalf("expected reservation for different IP to be possible, got %v", err)
}
})
}

func TestConstraintsCleanup(t *testing.T) {
origValidity := validity
defer func() { validity = origValidity }()
validity = 500 * time.Millisecond

const limit = 7
validity := 500 * time.Millisecond
expiry := time.Now().Add(validity)
res := &Resources{
MaxReservations: limit,
MaxReservationsPerPeer: math.MaxInt32,
Expand All @@ -128,16 +131,16 @@ func TestConstraintsCleanup(t *testing.T) {
}
c := newConstraints(res)
for i := 0; i < limit; i++ {
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != nil {
t.Fatal(err)
}
}
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != errTooManyReservations {
if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != errTooManyReservations {
t.Fatalf("expected to run into total reservation limit, got %v", err)
}

time.Sleep(validity + time.Millisecond)
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != nil {
t.Fatalf("expected old reservations to have been garbage collected, %v", err)
}
}
15 changes: 7 additions & 8 deletions p2p/protocol/circuitv2/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,18 +202,16 @@ func (r *Relay) handleReserve(s network.Stream) pbv2.Status {
return pbv2.Status_PERMISSION_DENIED
}
now := time.Now()
expire := now.Add(r.rc.ReservationTTL)

_, exists := r.rsvp[p]
if !exists {
if err := r.constraints.AddReservation(p, a); err != nil {
r.mx.Unlock()
log.Debugf("refusing relay reservation for %s; IP constraint violation: %s", p, err)
r.handleError(s, pbv2.Status_RESERVATION_REFUSED)
return pbv2.Status_RESERVATION_REFUSED
}
if err := r.constraints.Reserve(p, a, expire); err != nil {
r.mx.Unlock()
log.Debugf("refusing relay reservation for %s; IP constraint violation: %s", p, err)
r.handleError(s, pbv2.Status_RESERVATION_REFUSED)
return pbv2.Status_RESERVATION_REFUSED
}

expire := now.Add(r.rc.ReservationTTL)
r.rsvp[p] = expire
r.host.ConnManager().TagPeer(p, "relay-reservation", ReservationTagWeight)
r.mx.Unlock()
Expand Down Expand Up @@ -673,6 +671,7 @@ func (r *Relay) disconnected(n network.Network, c network.Conn) {
if ok {
delete(r.rsvp, p)
}
r.constraints.cleanupPeer(p)
r.mx.Unlock()

if ok && r.metricsTracer != nil {
Expand Down
4 changes: 3 additions & 1 deletion p2p/protocol/circuitv2/relay/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type Resources struct {

// MaxReservationsPerPeer is the maximum number of reservations originating from the same
// peer; default is 4.
//
// Deprecated: We only need 1 reservation per peer.
sukunrt marked this conversation as resolved.
Show resolved Hide resolved
MaxReservationsPerPeer int
// MaxReservationsPerIP is the maximum number of reservations originating from the same
// IP address; default is 8.
Expand Down Expand Up @@ -51,7 +53,7 @@ func DefaultResources() Resources {
MaxCircuits: 16,
BufferSize: 2048,

MaxReservationsPerPeer: 4,
MaxReservationsPerPeer: 1,
MaxReservationsPerIP: 8,
MaxReservationsPerASN: 32,
}
Expand Down
Loading