Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1483 from weaveworks/1478-cm-termination-race
Browse files Browse the repository at this point in the history
LGTM.  Fixes #1478
  • Loading branch information
bboreham committed Oct 1, 2015
2 parents b5e7e9f + 0159e26 commit 6619b58
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 19 deletions.
3 changes: 2 additions & 1 deletion router/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func (conn *LocalConnection) run(actionChan <-chan ConnectionAction, finished ch
if err = conn.Router.Ourself.AddConnection(conn); err != nil {
return
}
conn.Router.ConnectionMaker.ConnectionCreated(conn)

// SetListener has the side-effect of telling the forwarder
// that the connection is confirmed. This comes after
Expand Down Expand Up @@ -360,7 +361,7 @@ func (conn *LocalConnection) shutdown(err error) {
conn.forwarder.Stop()
}

conn.Router.ConnectionMaker.ConnectionTerminated(conn.remoteTCPAddr, err)
conn.Router.ConnectionMaker.ConnectionTerminated(conn, err)
}

func (conn *LocalConnection) forwarderCrypto() *OverlayCrypto {
Expand Down
44 changes: 33 additions & 11 deletions router/connection_maker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type ConnectionMaker struct {
port int
discovery bool
targets map[string]*Target
connections map[Connection]struct{}
directPeers peerAddrs
actionChan chan<- ConnectionMakerAction
}
Expand All @@ -43,6 +44,7 @@ func NewConnectionMaker(ourself *LocalPeer, peers *Peers, port int, discovery bo
discovery: discovery,
directPeers: peerAddrs{},
targets: make(map[string]*Target),
connections: make(map[Connection]struct{}),
actionChan: actionChan}
go cm.queryLoop(actionChan)
return cm
Expand Down Expand Up @@ -88,12 +90,28 @@ func (cm *ConnectionMaker) ForgetConnections(peers []string) {
}
}

func (cm *ConnectionMaker) ConnectionTerminated(address string, err error) {
func (cm *ConnectionMaker) ConnectionAborted(address string, err error) {
cm.actionChan <- func() bool {
if target, found := cm.targets[address]; found {
target.attempting = false
target.lastError = err
target.retry()
cm.retry(address, err)
return true
}
}

func (cm *ConnectionMaker) ConnectionCreated(conn Connection) {
cm.actionChan <- func() bool {
cm.connections[conn] = void
if conn.Outbound() {
delete(cm.targets, conn.RemoteTCPAddr())
}
return false
}
}

func (cm *ConnectionMaker) ConnectionTerminated(conn Connection, err error) {
cm.actionChan <- func() bool {
delete(cm.connections, conn)
if conn.Outbound() {
cm.retry(conn.RemoteTCPAddr(), err)
}
return true
}
Expand Down Expand Up @@ -123,9 +141,6 @@ func (cm *ConnectionMaker) checkStateAndAttemptConnections() time.Duration {
validTarget = make(map[string]struct{})
directTarget = make(map[string]struct{})
)
// Copy the set of things we are connected to, so we can access
// them without locking. Also clear out any entries in cm.targets
// for existing connections.
ourConnectedPeers, ourConnectedTargets, ourInboundIPs := cm.ourConnections()

addTarget := func(address string) {
Expand Down Expand Up @@ -176,9 +191,8 @@ func (cm *ConnectionMaker) ourConnections() (PeerNameSet, map[string]struct{}, m
ourConnectedTargets = make(map[string]struct{})
ourInboundIPs = make(map[string]struct{})
)
for conn := range cm.ourself.Connections() {
for conn := range cm.connections {
address := conn.RemoteTCPAddr()
delete(cm.targets, address)
ourConnectedPeers[conn.Remote().Name] = void
ourConnectedTargets[address] = void
if conn.Outbound() {
Expand Down Expand Up @@ -250,7 +264,15 @@ func (cm *ConnectionMaker) attemptConnection(address string, acceptNewPeer bool)
log.Printf("->[%s] attempting connection", address)
if err := cm.ourself.CreateConnection(address, acceptNewPeer); err != nil {
log.Errorf("->[%s] error during connection attempt: %v", address, err)
cm.ConnectionTerminated(address, err)
cm.ConnectionAborted(address, err)
}
}

func (cm *ConnectionMaker) retry(address string, err error) {
if target, found := cm.targets[address]; found {
target.attempting = false
target.lastError = err
target.retry()
}
}

Expand Down
8 changes: 1 addition & 7 deletions router/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,10 @@ func NewBroadcastRouteStatusSlice(routes *Routes) []BroadcastRouteStatus {
}

func NewLocalConnectionStatusSlice(cm *ConnectionMaker) []LocalConnectionStatus {
// We need to Refresh first in order to clear out any 'attempting'
// connections from cm.targets that have been established since
// the last run of cm.checkStateAndAttemptConnections. These
// entries are harmless but do represent stale state that we do
// not want to report.
cm.Refresh()
resultChan := make(chan []LocalConnectionStatus, 0)
cm.actionChan <- func() bool {
var slice []LocalConnectionStatus
for conn := range cm.ourself.Connections() {
for conn := range cm.connections {
state := "pending"
if conn.Established() {
state = "established"
Expand Down

0 comments on commit 6619b58

Please sign in to comment.