Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

simplify limiter by removing the injected isFdConsumingFnc #274

Merged
merged 1 commit into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ func (dj *dialJob) dialTimeout() time.Duration {
type dialLimiter struct {
lk sync.Mutex

isFdConsumingFnc isFdConsumingFnc
fdConsuming int
fdLimit int
waitingOnFd []*dialJob
fdConsuming int
fdLimit int
waitingOnFd []*dialJob

dialFunc dialfunc

Expand All @@ -55,21 +54,19 @@ type dialLimiter struct {
}

type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (transport.CapableConn, error)
type isFdConsumingFnc func(ma.Multiaddr) bool

func newDialLimiter(df dialfunc, fdFnc isFdConsumingFnc) *dialLimiter {
func newDialLimiter(df dialfunc) *dialLimiter {
fd := ConcurrentFdDials
if env := os.Getenv("LIBP2P_SWARM_FD_LIMIT"); env != "" {
if n, err := strconv.ParseInt(env, 10, 32); err == nil {
fd = int(n)
}
}
return newDialLimiterWithParams(fdFnc, df, fd, DefaultPerPeerRateLimit)
return newDialLimiterWithParams(df, fd, DefaultPerPeerRateLimit)
}

func newDialLimiterWithParams(isFdConsumingFnc isFdConsumingFnc, df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
return &dialLimiter{
isFdConsumingFnc: isFdConsumingFnc,
fdLimit: fdLimit,
perPeerLimit: perPeerLimit,
waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
Expand Down Expand Up @@ -157,7 +154,7 @@ func (dl *dialLimiter) shouldConsumeFd(addr ma.Multiaddr) bool {

isRelay := err == nil

return !isRelay && dl.isFdConsumingFnc(addr)
return !isRelay && isFdConsumingAddr(addr)
}

func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) {
Expand Down
23 changes: 5 additions & 18 deletions limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,6 @@ import (
mafmt "github.com/multiformats/go-multiaddr-fmt"
)

var isFdConsuming = func(addr ma.Multiaddr) bool {
res := false

ma.ForEach(addr, func(c ma.Component) bool {
if c.Protocol().Code == ma.P_TCP {
res = true
return false
}
return true
})
return res
}

func mustAddr(t *testing.T, s string) ma.Multiaddr {
a, err := ma.NewMultiaddr(s)
if err != nil {
Expand Down Expand Up @@ -95,7 +82,7 @@ func TestLimiterBasicDials(t *testing.T) {
hang := make(chan struct{})
defer close(hang)

l := newDialLimiterWithParams(isFdConsuming, hangDialFunc(hang), ConcurrentFdDials, 4)
l := newDialLimiterWithParams(hangDialFunc(hang), ConcurrentFdDials, 4)

bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
good := addrWithPort(t, 20)
Expand Down Expand Up @@ -144,7 +131,7 @@ func TestLimiterBasicDials(t *testing.T) {
func TestFDLimiting(t *testing.T) {
hang := make(chan struct{})
defer close(hang)
l := newDialLimiterWithParams(isFdConsuming, hangDialFunc(hang), 16, 5)
l := newDialLimiterWithParams(hangDialFunc(hang), 16, 5)

bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
pids := []peer.ID{"testpeer1", "testpeer2", "testpeer3", "testpeer4"}
Expand Down Expand Up @@ -220,7 +207,7 @@ func TestTokenRedistribution(t *testing.T) {
<-ch
return nil, fmt.Errorf("test bad dial")
}
l := newDialLimiterWithParams(isFdConsuming, df, 8, 4)
l := newDialLimiterWithParams(df, 8, 4)

bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
pids := []peer.ID{"testpeer1", "testpeer2"}
Expand Down Expand Up @@ -313,7 +300,7 @@ func TestStressLimiter(t *testing.T) {
return nil, fmt.Errorf("test bad dial")
}

l := newDialLimiterWithParams(isFdConsuming, df, 20, 5)
l := newDialLimiterWithParams(df, 20, 5)

var bads []ma.Multiaddr
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -367,7 +354,7 @@ func TestFDLimitUnderflow(t *testing.T) {
}

const fdLimit = 20
l := newDialLimiterWithParams(isFdConsuming, df, fdLimit, 3)
l := newDialLimiterWithParams(df, fdLimit, 3)

var addrs []ma.Multiaddr
for i := 0; i <= 1000; i++ {
Expand Down
2 changes: 1 addition & 1 deletion swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc
}

s.dsync = newDialSync(s.startDialWorker)
s.limiter = newDialLimiter(s.dialAddr, isFdConsumingAddr)
s.limiter = newDialLimiter(s.dialAddr)
s.proc = goprocessctx.WithContext(ctx)
s.ctx = goprocessctx.OnClosingContext(s.proc)
s.backf.init(s.ctx)
Expand Down