Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #516 from rade/514_lower_topology_gossip_complexity
Browse files Browse the repository at this point in the history
LGTM.  Closes #514
  • Loading branch information
bboreham committed Apr 7, 2015
2 parents 9b9d8a0 + f80410b commit 86abc6a
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 48 deletions.
86 changes: 64 additions & 22 deletions router/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ type Gossip interface {
// specific message from one peer to another
// intermediate peers relay it using unicast topology.
GossipUnicast(dstPeerName PeerName, msg []byte) error
// send a message to every peer, relayed using broadcast topology.
GossipBroadcast(msg []byte) error
// send gossip to every peer, relayed using broadcast topology.
GossipBroadcast(update GossipData) error
}

type Gossiper interface {
OnGossipUnicast(sender PeerName, msg []byte) error
OnGossipBroadcast(msg []byte) error
// merge received data into state and return a representation of
// the received data, for further propagation
OnGossipBroadcast(update []byte) (GossipData, error)
// return state of everything we know; gets called periodically
Gossip() GossipData
// merge received data into state and return "everything new I've
Expand Down Expand Up @@ -76,24 +78,27 @@ func (sender *GossipSender) Stop() {
}

type connectionSenders map[Connection]*GossipSender
type peerSenders map[PeerName]*GossipSender

type GossipChannel struct {
sync.Mutex
ourself *LocalPeer
name string
hash uint32
gossiper Gossiper
senders connectionSenders
ourself *LocalPeer
name string
hash uint32
gossiper Gossiper
senders connectionSenders
broadcasters peerSenders
}

func (router *Router) NewGossip(channelName string, g Gossiper) Gossip {
channelHash := hash(channelName)
channel := &GossipChannel{
ourself: router.Ourself,
name: channelName,
hash: channelHash,
gossiper: g,
senders: make(connectionSenders)}
ourself: router.Ourself,
name: channelName,
hash: channelHash,
gossiper: g,
senders: make(connectionSenders),
broadcasters: make(peerSenders)}
router.GossipChannels[channelHash] = channel
return channel
}
Expand Down Expand Up @@ -150,15 +155,16 @@ func (c *GossipChannel) deliverUnicast(srcName PeerName, origPayload []byte, dec
return c.gossiper.OnGossipUnicast(srcName, payload)
}

func (c *GossipChannel) deliverBroadcast(srcName PeerName, origPayload []byte, dec *gob.Decoder) error {
func (c *GossipChannel) deliverBroadcast(srcName PeerName, _ []byte, dec *gob.Decoder) error {
var payload []byte
if err := dec.Decode(&payload); err != nil {
return err
}
if err := c.gossiper.OnGossipBroadcast(payload); err != nil {
data, err := c.gossiper.OnGossipBroadcast(payload)
if err != nil || data == nil {
return err
}
return c.relayBroadcast(srcName, origPayload)
return c.relayBroadcast(srcName, data)
}

func (c *GossipChannel) deliver(_ PeerName, _ []byte, dec *gob.Decoder) error {
Expand Down Expand Up @@ -214,8 +220,8 @@ func (c *GossipChannel) GossipUnicast(dstPeerName PeerName, msg []byte) error {
return c.relayUnicast(dstPeerName, GobEncode(c.hash, c.ourself.Name, dstPeerName, msg))
}

func (c *GossipChannel) GossipBroadcast(msg []byte) error {
return c.relayBroadcast(c.ourself.Name, GobEncode(c.hash, c.ourself.Name, msg))
func (c *GossipChannel) GossipBroadcast(update GossipData) error {
return c.relayBroadcast(c.ourself.Name, update)
}

func (c *GossipChannel) relayUnicast(dstPeerName PeerName, buf []byte) error {
Expand All @@ -229,17 +235,50 @@ func (c *GossipChannel) relayUnicast(dstPeerName PeerName, buf []byte) error {
return nil
}

func (c *GossipChannel) relayBroadcast(srcName PeerName, buf []byte) error {
func (c *GossipChannel) relayBroadcast(srcName PeerName, update GossipData) error {
names := c.ourself.Router.Peers.Names() // do this outside the lock so they don't nest
c.Lock()
defer c.Unlock()
// GC - randomly (courtesy of go's map iterator) pick some
// existing broadcasters and stop&remove them if their source peer
// is unknown. We stop as soon as we encounter a valid entry; the
// idea being that when there is little or no garbage then this
// executes close to O(1)[1], whereas when there is lots of
// garbage we remove it quickly.
//
// [1] TODO Unfortunately, due to the desire to avoid nested
// locks, instead of simply invoking Peers.Fetch(name) below, we
// have that Peers.Names() invocation above. That is O(n_peers) at
// best.
for name, broadcaster := range c.broadcasters {
if _, found := names[name]; !found {
delete(c.broadcasters, name)
broadcaster.Stop()
} else {
break
}
}
broadcaster, found := c.broadcasters[srcName]
if !found {
broadcaster = NewGossipSender(func(pending GossipData) { c.sendBroadcast(srcName, pending) })
c.broadcasters[srcName] = broadcaster
broadcaster.Start()
}
broadcaster.Send(update)
return nil
}

func (c *GossipChannel) sendBroadcast(srcName PeerName, update GossipData) {
c.ourself.Router.Routes.EnsureRecalculated()
nextHops := c.ourself.Router.Routes.BroadcastAll(srcName)
if len(nextHops) == 0 {
return nil
return
}
protocolMsg := ProtocolMsg{ProtocolGossipBroadcast, buf}
protocolMsg := ProtocolMsg{ProtocolGossipBroadcast, GobEncode(c.hash, srcName, update.Encode())}
// FIXME a single blocked connection can stall us
for _, conn := range c.ourself.ConnectionsTo(nextHops) {
conn.(ProtocolSender).SendProtocolMsg(protocolMsg)
}
return nil
}

func (c *GossipChannel) log(args ...interface{}) {
Expand All @@ -256,6 +295,9 @@ func (router *Router) sendPendingGossip() {
for _, sender := range channel.senders {
sender.flush()
}
for _, sender := range channel.broadcasters {
sender.flush()
}
}
}

Expand Down
9 changes: 1 addition & 8 deletions router/local_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,7 @@ func (peer *LocalPeer) handleDeleteConnection(conn Connection) {

func (peer *LocalPeer) broadcastPeerUpdate(peers ...*Peer) {
peer.Router.Routes.Recalculate()
// TODO We should just be invoking TopologyGossip.GossipBroadcast
// here, but route calculation is asynchronous and in this
// particular case would likely result in the broadcast not
// reaching all peers. So instead we slightly break the Gossip
// abstraction (hence the cast) and send a regular update. This is
// less efficient though since it will almost certainly reach
// peers more than once.
peer.Router.TopologyGossip.(*GossipChannel).Send(NewTopologyGossipData(peer.Router.Peers, append(peers, peer.Peer)...))
peer.Router.TopologyGossip.GossipBroadcast(NewTopologyGossipData(peer.Router.Peers, append(peers, peer.Peer)...))
}

func (peer *LocalPeer) checkConnectionLimit() error {
Expand Down
14 changes: 9 additions & 5 deletions router/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ func (peers *Peers) ForEach(fun func(*Peer)) {
//
// We add peers hitherto unknown to us, and update peers for which the
// update contains a more recent version than known to us. The return
// value is an "improved" update containing just these new/updated
// elements.
func (peers *Peers) ApplyUpdate(update []byte) (PeerNameSet, error) {
// value is a) a representation of the received update, and b) an
// "improved" update containing just these new/updated elements.
func (peers *Peers) ApplyUpdate(update []byte) (PeerNameSet, PeerNameSet, error) {
peers.Lock()

newPeers, decodedUpdate, decodedConns, err := peers.decodeUpdate(update)
if err != nil {
peers.Unlock()
return nil, err
return nil, nil, err
}

// By this point, we know the update doesn't refer to any peers we
Expand All @@ -97,7 +97,11 @@ func (peers *Peers) ApplyUpdate(update []byte) (PeerNameSet, error) {
// Don't need to hold peers lock any longer
peers.Unlock()

return setFromPeersMap(newUpdate), nil
updateNames := make(PeerNameSet)
for _, peer := range decodedUpdate {
updateNames[peer.Name] = void
}
return updateNames, setFromPeersMap(newUpdate), nil
}

func (peers *Peers) Names() PeerNameSet {
Expand Down
2 changes: 1 addition & 1 deletion router/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package router

const (
Protocol = "weave"
ProtocolVersion = 14
ProtocolVersion = 15
)

type ProtocolTag byte
Expand Down
20 changes: 12 additions & 8 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,38 +377,42 @@ func (router *Router) OnGossipUnicast(sender PeerName, msg []byte) error {
return fmt.Errorf("unexpected topology gossip unicast: %v", msg)
}

func (router *Router) OnGossipBroadcast(msg []byte) error {
return fmt.Errorf("unexpected topology gossip broadcast: %v", msg)
func (router *Router) OnGossipBroadcast(update []byte) (GossipData, error) {
origUpdate, _, err := router.applyTopologyUpdate(update)
if err != nil || len(origUpdate) == 0 {
return nil, err
}
return &TopologyGossipData{peers: router.Peers, update: origUpdate}, nil
}

func (router *Router) Gossip() GossipData {
return &TopologyGossipData{peers: router.Peers, update: router.Peers.Names()}
}

func (router *Router) OnGossip(update []byte) (GossipData, error) {
newUpdate, err := router.applyTopologyUpdate(update)
_, newUpdate, err := router.applyTopologyUpdate(update)
if err != nil || len(newUpdate) == 0 {
return nil, err
}
return &TopologyGossipData{peers: router.Peers, update: newUpdate}, nil
}

func (router *Router) applyTopologyUpdate(update []byte) (PeerNameSet, error) {
newUpdate, err := router.Peers.ApplyUpdate(update)
func (router *Router) applyTopologyUpdate(update []byte) (PeerNameSet, PeerNameSet, error) {
origUpdate, newUpdate, err := router.Peers.ApplyUpdate(update)
if _, ok := err.(UnknownPeerError); err != nil && ok {
// That update contained a reference to a peer which wasn't
// itself included in the update, and we didn't know about
// already. We ignore this; eventually we should receive an
// update containing a complete topology.
log.Println("Topology gossip:", err)
return nil, nil
return nil, nil, nil
}
if err != nil {
return nil, err
return nil, nil, err
}
if len(newUpdate) > 0 {
router.ConnectionMaker.Refresh()
router.Routes.Recalculate()
}
return newUpdate, nil
return origUpdate, newUpdate, nil
}
33 changes: 29 additions & 4 deletions router/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Routes struct {
broadcast map[PeerName][]PeerName
broadcastAll map[PeerName][]PeerName // [1]
recalculate chan<- *struct{}
wait chan<- chan struct{}
// [1] based on *all* connections, not just established &
// symmetric ones
}
Expand All @@ -36,8 +37,10 @@ func NewRoutes(ourself *Peer, peers *Peers) *Routes {

func (routes *Routes) Start() {
recalculate := make(chan *struct{}, 1)
wait := make(chan chan struct{})
routes.recalculate = recalculate
go routes.run(recalculate)
routes.wait = wait
go routes.run(recalculate, wait)
}

func (routes *Routes) Unicast(name PeerName) (PeerName, bool) {
Expand Down Expand Up @@ -91,17 +94,39 @@ func (routes *Routes) String() string {
return buf.String()
}

// Request recalculation of the routing table. This is async but can
// effectively be made synchronous with a subsequent call to
// EnsureRecalculated.
func (routes *Routes) Recalculate() {
// The use of a 1-capacity channel in combination with the
// non-blocking send is an optimisation that results in multiple
// requests being coalesced.
select {
case routes.recalculate <- nil:
default:
}
}

func (routes *Routes) run(recalculate <-chan *struct{}) {
// Wait for any preceding Recalculate requests to be processed.
func (routes *Routes) EnsureRecalculated() {
done := make(chan struct{})
routes.wait <- done
<-done
}

func (routes *Routes) run(recalculate <-chan *struct{}, wait <-chan chan struct{}) {
for {
<-recalculate
routes.calculate()
select {
case <-recalculate:
routes.calculate()
case done := <-wait:
select {
case <-recalculate:
routes.calculate()
default:
}
close(done)
}
}
}

Expand Down

0 comments on commit 86abc6a

Please sign in to comment.