Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p, swarm: fix node up races by granular locking #18976

Merged
merged 12 commits into from
Feb 18, 2019
Merged
43 changes: 32 additions & 11 deletions p2p/simulations/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ var (
// It is useful when constructing a chain network topology
// when Network adds and removes nodes dynamically.
func (net *Network) ConnectToLastNode(id enode.ID) (err error) {
net.lock.Lock()
defer net.lock.Unlock()

ids := net.getUpNodeIDs()
l := len(ids)
if l < 2 {
Expand All @@ -41,29 +44,35 @@ func (net *Network) ConnectToLastNode(id enode.ID) (err error) {
if last == id {
last = ids[l-2]
}
return net.connect(last, id)
return net.connectNotConnected(last, id)
}

// ConnectToRandomNode connects the node with provided NodeID
// to a random node that is up.
func (net *Network) ConnectToRandomNode(id enode.ID) (err error) {
selected := net.GetRandomUpNode(id)
net.lock.Lock()
defer net.lock.Unlock()

selected := net.getRandomUpNode(id)
if selected == nil {
return ErrNodeNotFound
}
return net.connect(selected.ID(), id)
return net.connectNotConnected(selected.ID(), id)
}

// ConnectNodesFull connects all nodes one to another.
// It provides a complete connectivity in the network
// which should be rarely needed.
func (net *Network) ConnectNodesFull(ids []enode.ID) (err error) {
net.lock.Lock()
defer net.lock.Unlock()

if ids == nil {
ids = net.getUpNodeIDs()
}
for i, lid := range ids {
for _, rid := range ids[i+1:] {
if err = net.connect(lid, rid); err != nil {
if err = net.connectNotConnected(lid, rid); err != nil {
return err
}
}
Expand All @@ -74,12 +83,19 @@ func (net *Network) ConnectNodesFull(ids []enode.ID) (err error) {
// ConnectNodesChain connects all nodes in a chain topology.
// If ids argument is nil, all nodes that are up will be connected.
func (net *Network) ConnectNodesChain(ids []enode.ID) (err error) {
net.lock.Lock()
defer net.lock.Unlock()

return net.connectNodesChain(ids)
}

func (net *Network) connectNodesChain(ids []enode.ID) (err error) {
if ids == nil {
ids = net.getUpNodeIDs()
}
l := len(ids)
for i := 0; i < l-1; i++ {
if err := net.connect(ids[i], ids[i+1]); err != nil {
if err := net.connectNotConnected(ids[i], ids[i+1]); err != nil {
return err
}
}
Expand All @@ -89,39 +105,44 @@ func (net *Network) ConnectNodesChain(ids []enode.ID) (err error) {
// ConnectNodesRing connects all nodes in a ring topology.
// If ids argument is nil, all nodes that are up will be connected.
func (net *Network) ConnectNodesRing(ids []enode.ID) (err error) {
net.lock.Lock()
defer net.lock.Unlock()

if ids == nil {
ids = net.getUpNodeIDs()
}
l := len(ids)
if l < 2 {
return nil
}
if err := net.ConnectNodesChain(ids); err != nil {
if err := net.connectNodesChain(ids); err != nil {
return err
}
return net.connect(ids[l-1], ids[0])
return net.connectNotConnected(ids[l-1], ids[0])
}

// ConnectNodesStar connects all nodes into a star topology
// If ids argument is nil, all nodes that are up will be connected.
func (net *Network) ConnectNodesStar(ids []enode.ID, center enode.ID) (err error) {
net.lock.Lock()
defer net.lock.Unlock()

if ids == nil {
ids = net.getUpNodeIDs()
}
for _, id := range ids {
if center == id {
continue
}
if err := net.connect(center, id); err != nil {
if err := net.connectNotConnected(center, id); err != nil {
return err
}
}
return nil
}

// connect connects two nodes but ignores already connected error.
func (net *Network) connect(oneID, otherID enode.ID) error {
return ignoreAlreadyConnectedErr(net.Connect(oneID, otherID))
func (net *Network) connectNotConnected(oneID, otherID enode.ID) error {
frncmx marked this conversation as resolved.
Show resolved Hide resolved
return ignoreAlreadyConnectedErr(net.connect(oneID, otherID))
}

func ignoreAlreadyConnectedErr(err error) error {
Expand Down
2 changes: 1 addition & 1 deletion p2p/simulations/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func ControlEvent(v interface{}) *Event {
func (e *Event) String() string {
switch e.Type {
case EventTypeNode:
return fmt.Sprintf("<node-event> id: %s up: %t", e.Node.ID().TerminalString(), e.Node.Up)
return fmt.Sprintf("<node-event> id: %s up: %t", e.Node.ID().TerminalString(), e.Node.Up())
case EventTypeConn:
return fmt.Sprintf("<conn-event> nodes: %s->%s up: %t", e.Conn.One.TerminalString(), e.Conn.Other.TerminalString(), e.Conn.Up)
case EventTypeMsg:
Expand Down
18 changes: 10 additions & 8 deletions p2p/simulations/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,15 @@ type expectEvents struct {
}

func (t *expectEvents) nodeEvent(id string, up bool) *Event {
node := Node{
Config: &adapters.NodeConfig{
ID: enode.HexID(id),
},
up: up,
}
return &Event{
Type: EventTypeNode,
Node: &Node{
Config: &adapters.NodeConfig{
ID: enode.HexID(id),
},
Up: up,
},
Node: &node,
}
}

Expand Down Expand Up @@ -480,6 +481,7 @@ loop:
}

func (t *expectEvents) expect(events ...*Event) {
t.Helper()
timeout := time.After(10 * time.Second)
i := 0
for {
Expand All @@ -501,8 +503,8 @@ func (t *expectEvents) expect(events ...*Event) {
if event.Node.ID() != expected.Node.ID() {
t.Fatalf("expected node event %d to have id %q, got %q", i, expected.Node.ID().TerminalString(), event.Node.ID().TerminalString())
}
if event.Node.Up != expected.Node.Up {
t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up, event.Node.Up)
if event.Node.Up() != expected.Node.Up() {
t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up(), event.Node.Up())
}

case EventTypeConn:
Expand Down
9 changes: 5 additions & 4 deletions p2p/simulations/mocker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,12 @@ func TestMocker(t *testing.T) {
for {
select {
case event := <-events:
//if the event is a node Up event only
if event.Node != nil && event.Node.Up {
if isNodeUp(event) {
//add the correspondent node ID to the map
nodemap[event.Node.Config.ID] = true
//this means all nodes got a nodeUp event, so we can continue the test
if len(nodemap) == nodeCount {
nodesComplete = true
//wait for 3s as the mocker will need time to connect the nodes
//time.Sleep( 3 *time.Second)
}
} else if event.Conn != nil && nodesComplete {
connCount += 1
Expand Down Expand Up @@ -169,3 +166,7 @@ func TestMocker(t *testing.T) {
t.Fatalf("Expected empty list of nodes, got: %d", len(nodesInfo))
}
}

func isNodeUp(event *Event) bool {
return event.Node != nil && event.Node.Up()
}
Loading