-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
0436764
commit d7121bf
Showing
9 changed files
with
516 additions
and
225 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package addrutil | ||
|
||
import ( | ||
ma "github.com/jbenet/go-multiaddr" | ||
mafmt "github.com/whyrusleeping/mafmt" | ||
) | ||
|
||
func SubtractFilter(addrs ...ma.Multiaddr) func(ma.Multiaddr) bool { | ||
addrmap := make(map[string]bool) | ||
for _, a := range addrs { | ||
addrmap[string(a.Bytes())] = true | ||
} | ||
|
||
return func(a ma.Multiaddr) bool { | ||
return !addrmap[string(a.Bytes())] | ||
} | ||
} | ||
|
||
func IsFDCostlyTransport(a ma.Multiaddr) bool { | ||
return mafmt.TCP.Matches(a) | ||
} | ||
|
||
func FilterNeg(f func(ma.Multiaddr) bool) func(ma.Multiaddr) bool { | ||
return func(a ma.Multiaddr) bool { | ||
return !f(a) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
package swarm | ||
|
||
import ( | ||
"sync" | ||
|
||
peer "github.com/ipfs/go-libp2p-peer" | ||
ma "github.com/jbenet/go-multiaddr" | ||
context "golang.org/x/net/context" | ||
|
||
conn "github.com/ipfs/go-libp2p/p2p/net/conn" | ||
addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr" | ||
) | ||
|
||
type dialResult struct { | ||
Conn conn.Conn | ||
Err error | ||
} | ||
|
||
type dialJob struct { | ||
addr ma.Multiaddr | ||
peer peer.ID | ||
ctx context.Context | ||
resp chan dialResult | ||
success bool | ||
} | ||
|
||
type dialLimiter struct { | ||
rllock sync.Mutex | ||
fdConsuming int | ||
fdLimit int | ||
waitingOnFd []*dialJob | ||
|
||
dialFunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error) | ||
|
||
activePerPeer map[peer.ID]int | ||
perPeerLimit int | ||
waitingOnPeerLimit map[peer.ID][]*dialJob | ||
} | ||
|
||
type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error) | ||
|
||
func newDialLimiter(df dialfunc) *dialLimiter { | ||
return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit) | ||
} | ||
|
||
func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter { | ||
return &dialLimiter{ | ||
fdLimit: fdl, | ||
perPeerLimit: ppl, | ||
waitingOnPeerLimit: make(map[peer.ID][]*dialJob), | ||
activePerPeer: make(map[peer.ID]int), | ||
dialFunc: df, | ||
} | ||
} | ||
|
||
func (dl *dialLimiter) finishedDial(dj *dialJob) { | ||
dl.rllock.Lock() | ||
defer dl.rllock.Unlock() | ||
|
||
// release tokens in reverse order than we take them | ||
dl.activePerPeer[dj.peer]-- | ||
if dl.activePerPeer[dj.peer] == 0 { | ||
delete(dl.activePerPeer, dj.peer) | ||
} | ||
|
||
waitlist := dl.waitingOnPeerLimit[dj.peer] | ||
if !dj.success && len(waitlist) > 0 { | ||
next := waitlist[0] | ||
if len(waitlist) == 1 { | ||
delete(dl.waitingOnPeerLimit, dj.peer) | ||
} else { | ||
dl.waitingOnPeerLimit[dj.peer] = waitlist[1:] | ||
} | ||
dl.activePerPeer[dj.peer]++ // just kidding, we still want this token | ||
|
||
// can kick this off right here, dials in this list already | ||
// have the other tokens needed | ||
go dl.executeDial(next) | ||
} | ||
|
||
if addrutil.IsFDCostlyTransport(dj.addr) { | ||
dl.fdConsuming-- | ||
if len(dl.waitingOnFd) > 0 { | ||
next := dl.waitingOnFd[0] | ||
dl.waitingOnFd = dl.waitingOnFd[1:] | ||
dl.fdConsuming++ | ||
|
||
// now, attempt to take the 'per peer limit' token | ||
dl.schedulePerPeerDial(next) | ||
} | ||
} | ||
} | ||
|
||
// AddDialJob tries to take the needed tokens for starting the given dial job. | ||
// If it acquires all needed tokens, it immediately starts the dial, otherwise | ||
// it will put it on the waitlist for the requested token. | ||
func (dl *dialLimiter) AddDialJob(dj *dialJob) { | ||
dl.rllock.Lock() | ||
defer dl.rllock.Unlock() | ||
|
||
if addrutil.IsFDCostlyTransport(dj.addr) { | ||
if dl.fdConsuming >= dl.fdLimit { | ||
dl.waitingOnFd = append(dl.waitingOnFd, dj) | ||
return | ||
} | ||
|
||
// take token | ||
dl.fdConsuming++ | ||
} | ||
|
||
dl.schedulePerPeerDial(dj) | ||
} | ||
|
||
// executeDial calls the dialFunc, and reports the result through the response | ||
// channel when finished. Once the response is sent it also releases all tokens | ||
// it held during the dial. | ||
func (dl *dialLimiter) executeDial(j *dialJob) { | ||
defer dl.finishedDial(j) | ||
con, err := dl.dialFunc(j.ctx, j.peer, j.addr) | ||
select { | ||
case j.resp <- dialResult{Conn: con, Err: err}: | ||
case <-j.ctx.Done(): | ||
} | ||
} | ||
|
||
func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) { | ||
if dl.activePerPeer[j.peer] >= dl.perPeerLimit { | ||
wlist := dl.waitingOnPeerLimit[j.peer] | ||
dl.waitingOnPeerLimit[j.peer] = append(wlist, j) | ||
return | ||
} | ||
|
||
// take second needed token and start dial! | ||
dl.activePerPeer[j.peer]++ | ||
go dl.executeDial(j) | ||
} |
Oops, something went wrong.