Skip to content

Commit

Permalink
Add waiting for establishing network connections
Browse files Browse the repository at this point in the history
Signed-off-by: Matej Pavlovic <matopavlovic@gmail.com>
  • Loading branch information
matejpavlovic committed Oct 10, 2022
1 parent 65f15da commit 8dd4564
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 4 deletions.
37 changes: 35 additions & 2 deletions pkg/net/libp2p/libp2p_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
maxRetries = 20
noLoggingErrorAttempts = 2
PermanentAddrTTL = math.MaxInt64 - iota
connWaitPollInterval = 100 * time.Millisecond
)

type TransportMessage struct {
Expand All @@ -52,6 +53,10 @@ type connInfo struct {
Stream network.Stream
}

func (ci *connInfo) isConnected(h host.Host) bool {
return ci.Stream != nil && h.Network().Connectedness(ci.AddrInfo.ID) == network.Connected
}

type newStream struct {
NodeID types.NodeID
Stream network.Stream
Expand Down Expand Up @@ -158,6 +163,35 @@ func (t *Transport) Connect(ctx context.Context, nodes map[types.NodeID]types.No
t.connect(ctx, maputil.GetKeys(parsedAddrInfo))
}

// WaitFor polls the current connection state and returns when at least n connections have been established.
func (t *Transport) WaitFor(n int) {
for {
t.connsLock.RLock()
numConnections := 0
for _, ci := range t.conns {
if ci.isConnected(t.host) {
numConnections++
}
}
t.connsLock.RUnlock()

// We substract one, as we always assume to be connected to ourselves
// and the connection to self does not appear in t.conns.
if numConnections >= n-1 {
return
}
time.Sleep(connWaitPollInterval)
}
}

// ConnectSync is a convenience method that triggers the connection process
// and waits for n connections to be established before returning.
// Equivalent to calling Connect and WaitFor.
func (t *Transport) ConnectSync(ctx context.Context, nodes map[types.NodeID]types.NodeAddress, n int) {
t.Connect(ctx, nodes)
t.WaitFor(n)
}

func (t *Transport) Send(ctx context.Context, dest types.NodeID, msg *messagepb.Message) error {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -280,8 +314,7 @@ func (t *Transport) connectToNode(ctx context.Context, nodeID types.NodeID) {
return
}

if conn.Stream != nil &&
t.host.Network().Connectedness(conn.AddrInfo.ID) == network.Connected {
if conn.isConnected(t.host) {
t.connsLock.Unlock()
t.logger.Log(logging.LevelInfo, "connection to node already exists", "src", t.ownID, "dst", nodeID)
return
Expand Down
7 changes: 6 additions & 1 deletion pkg/net/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ type Transport interface {
// Send sends msg to the node with ID dest.
Send(ctx context.Context, dest t.NodeID, msg *messagepb.Message) error

// Connect establishes (in parallel) network connections to the provided nodes.
// Connect initiates the establishing of network connections to the provided nodes.
// When Connect returns, the connections might not yet have been established though (see WaitFor).
Connect(ctx context.Context, nodes map[t.NodeID]t.NodeAddress)

// WaitFor waits until at least n connections (including the potentially virtual connection to self)
// have been established and returns.
WaitFor(n int)

// CloseOldConnections closes connections to the nodes that don't needed.
CloseOldConnections(newNodes map[t.NodeID]t.NodeAddress)
}
3 changes: 2 additions & 1 deletion pkg/systems/smr/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package smr
import (
"context"

"github.com/filecoin-project/mir/pkg/net"
"github.com/libp2p/go-libp2p-core/host"
"github.com/pkg/errors"

Expand All @@ -16,7 +17,6 @@ import (
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/net"
libp2pnet "github.com/filecoin-project/mir/pkg/net/libp2p"
t "github.com/filecoin-project/mir/pkg/types"
)
Expand Down Expand Up @@ -57,6 +57,7 @@ func (sys *System) Start(ctx context.Context) error {
return errors.Wrap(err, "could not start network transport")
}
sys.transport.Connect(ctx, sys.initialMembership)
sys.transport.WaitFor(len(sys.initialMembership))
return nil
}

Expand Down

0 comments on commit 8dd4564

Please sign in to comment.