-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Announce and discover peers over simple UDP broadcasts #243
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,187 @@ | ||
package discovery | ||
|
||
import ( | ||
"context" | ||
"math/rand" | ||
"net" | ||
"sync" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-host" | ||
"github.com/libp2p/go-libp2p-peer" | ||
pstore "github.com/libp2p/go-libp2p-peerstore" | ||
ma "github.com/multiformats/go-multiaddr" | ||
manet "github.com/multiformats/go-multiaddr-net" | ||
) | ||
|
||
type broadcastService struct { | ||
host host.Host | ||
listenSock *net.UDPConn // Socket listening for broadcasts | ||
sendSocks []*net.UDPConn // Sockets that send broadcasts | ||
lk sync.Mutex | ||
notifees []Notifee | ||
broadcastPort int | ||
active bool | ||
} | ||
|
||
// | ||
// Bind a socket to send and recv broadcasts | ||
// | ||
func bindBroadcast(ip net.IP, port int) (*net.UDPConn, error) { | ||
conn, err := net.ListenUDP("udp4", &net.UDPAddr{IP: ip, Port: port}) | ||
if err == nil { | ||
file, err := conn.File() | ||
if err == nil { | ||
err = syscall.SetsockoptInt(int(file.Fd()), syscall.SOL_SOCKET, syscall.SO_BROADCAST, 1) | ||
} | ||
} | ||
if err != nil { | ||
return nil, err | ||
} | ||
return conn, nil | ||
} | ||
|
||
func NewBroadcastService(ctx context.Context, peerhost host.Host, defaultPort int, interval time.Duration) (Service, error) { | ||
|
||
// Broadcasts can only be received from 0.0.0.0 or 255.255.255.255 | ||
// but they can not be sent from 255.255.255.255. | ||
// Therefor one listening socket is bound to 255.255.255.255 and a | ||
// sending socket will be bound to each address that broadcasts | ||
// should be sent from. | ||
listenSock, err := bindBroadcast(net.IPv4bcast, defaultPort) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
bs := &broadcastService{ | ||
host: peerhost, | ||
listenSock: listenSock, | ||
broadcastPort: defaultPort, | ||
active: true, | ||
} | ||
|
||
addrs, err := getDialableListenAddrs(peerhost) | ||
if err != nil { | ||
bs.Close() | ||
return nil, err | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need for an else branch here, you are returning in the case of error -- this just adds indentation and cognitive overhead. |
||
for _, a := range addrs { | ||
if a.IP.IsGlobalUnicast() { | ||
// binding to the same address and port as the TCP service | ||
// embeds that address and port into broadcast messages | ||
otherSock, err := bindBroadcast(a.IP, a.Port) | ||
if err == nil { | ||
bs.sendSocks = append(bs.sendSocks, otherSock) | ||
} | ||
} | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may want to return an error if we fail to bind to anything. I wouldn't stop IPFS from starting in that case but, if we return an error and log it, it'll make debugging easier. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I looked at the mdns file I saw three different methods of logging errors, so I thought it cleaner not to log at all. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We usually use |
||
|
||
// fudge the interval to dampen oscillations with other nodes | ||
margin := int(interval) >> 2 | ||
interval += time.Duration(rand.Intn(margin)) | ||
interval -= time.Duration(rand.Intn(margin)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks a bit funky. Any reason not to jus to just add a smaller random delay to the interval (rather than add and then subtract one)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I add and subtract so the the interval can jitter by a positive or negative amount. And I like getting funky. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but negative jitter is equivalent to just reducing the default interval (you could also just subtract |
||
|
||
go bs.sendBroadcasts(ctx, interval) | ||
go bs.recvBroadcasts(ctx) | ||
|
||
return bs, nil | ||
} | ||
|
||
func (bs *broadcastService) Close() error { | ||
bs.active = false | ||
bs.listenSock.Close() | ||
for _, conn := range bs.sendSocks { | ||
conn.Close() | ||
} | ||
return nil | ||
} | ||
|
||
func (bs *broadcastService) handleBroadcast(addr *net.UDPAddr, bpeer peer.ID) { | ||
// Broadcast messages are UDP, but represent IPv4 TCP sockets | ||
maddr, err := manet.FromNetAddr(&net.TCPAddr{ | ||
IP: addr.IP, | ||
Port: addr.Port, | ||
}) | ||
if err == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stylistic but... you can reduce the indentation level by using: if error != nil {
return
}
// ... |
||
pi := pstore.PeerInfo{ | ||
ID: bpeer, | ||
Addrs: []ma.Multiaddr{maddr}, | ||
} | ||
|
||
bs.lk.Lock() | ||
defer bs.lk.Unlock() | ||
for _, n := range bs.notifees { | ||
if n != nil { | ||
go n.HandlePeerFound(pi) | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (bs *broadcastService) recvBroadcasts(ctx context.Context) { | ||
var buf = make([]byte, 48) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are no exit conditions here. |
||
for bs.active { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Checking this without a lock isn't guaranteed to work. Try re-running your tests with the |
||
n, rAddr, err := bs.listenSock.ReadFromUDP(buf) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm pretty sure most errors from this function are fatal. That is, we can't continue listening on the socket (we'd end up in a rapid, infinite loop). It's probably better to log the error, stop the service, and return. You could also try checking to see if the error is temporary (cast to |
||
if err == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Again, this reduces indentation level and cognitive overhead. |
||
if n != 0 { | ||
rPeer, err := peer.IDB58Decode(string(buf[0:n])) | ||
if err == nil && rPeer != bs.host.ID() { | ||
bs.handleBroadcast(rAddr, rPeer) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (bs *broadcastService) sendBroadcast() { | ||
idBuf := []byte(bs.host.ID().Pretty()) | ||
broadcastAddr := net.UDPAddr{ | ||
IP: net.IPv4bcast, | ||
Port: bs.broadcastPort, | ||
} | ||
for _, conn := range bs.sendSocks { | ||
conn.WriteToUDP(idBuf, &broadcastAddr) | ||
} | ||
} | ||
|
||
func (bs *broadcastService) sendBroadcasts(ctx context.Context, interval time.Duration) { | ||
// send an initial broadcast after a quarter interval | ||
time.Sleep(interval >> 2) | ||
bs.sendBroadcast() | ||
|
||
// send broadcasts on a periodic timeout | ||
ticker := time.NewTicker(interval) | ||
for bs.active { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, can't read/write to a variable from different threads without using some synchronization mechanism (lock, channel, etc.). Really, I'd just get rid of this variable and use the context. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a bool. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I know, I'm referring to https://golang.org/ref/mem. For example, given: package main
func main() {
var active = true
go func() {
active = false
}()
for active {
}
} Go may choose to allow the second go routine to loop forever (optimizing out the check of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a race condition, it doesn't matter if its just a bool. |
||
select { | ||
case <-ticker.C: | ||
bs.sendBroadcast() | ||
|
||
case <-ctx.Done(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we're going to make canceling the context equivalent to calling close, we should call close here. Actually, we should defer a call to close so it gets called regardless. |
||
bs.Close() | ||
} | ||
} | ||
} | ||
|
||
func (m *broadcastService) RegisterNotifee(n Notifee) { | ||
m.lk.Lock() | ||
defer m.lk.Unlock() | ||
for i, v := range m.notifees { | ||
if v == nil { | ||
m.notifees[i] = n | ||
return | ||
} | ||
} | ||
m.notifees = append(m.notifees, n) | ||
} | ||
|
||
func (m *broadcastService) UnregisterNotifee(n Notifee) { | ||
m.lk.Lock() | ||
defer m.lk.Unlock() | ||
for i, notif := range m.notifees { | ||
if notif == n { | ||
m.notifees[i] = nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having an array filled with nils is probably not the best way to go. Instead, use https://github.com/golang/go/wiki/SliceTricks#delete-without-preserving-order. That will simplify |
||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The go idiom is to (almost) always write:
That helps prevent nesting/confusing control flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would argue that typing the same early return idiom over and over and over is worse for understanding control flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just how go is written (that is, consistency is king). However, I'd disagree:
Now, one can argue that there should be a better way to return errors in go but that's a separate issue (and not something we can do anything about).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps you should think of go's error handling as a monad; when an error occurs you return.
Otherwise the control flow is linear, the less branching the better.