Skip to content

Commit

Permalink
Add waiting for establishing network connections
Browse files Browse the repository at this point in the history
Signed-off-by: Matej Pavlovic <matopavlovic@gmail.com>
  • Loading branch information
matejpavlovic committed Oct 10, 2022
1 parent 65f15da commit c8c62d9
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 4 deletions.
7 changes: 7 additions & 0 deletions pkg/deploytest/faketransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ func (fl *FakeLink) Connect(ctx context.Context, nodes map[t.NodeID]t.NodeAddres
}
}

// WaitFor returns immediately.
// It does not need to wait for anything, since the Connect() function already waits for all the connections.
// TODO: Technically this does not properly implement the semantics, as calling WaitFor without having called Connect
// should block. Fix this.
func (fl *FakeLink) WaitFor(n int) {
}

func (fl *FakeLink) Stop() {
close(fl.DoneC)
}
4 changes: 4 additions & 0 deletions pkg/deploytest/simtransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func (m *simTransportModule) Connect(ctx context.Context, nodes map[t.NodeID]t.N
go m.handleOutChan(ctx, m.SimTransport.Simulation.Spawn())
}

// WaitFor returns immediately, since the simulated transport does not need to wait for anything.
func (m *simTransportModule) WaitFor(n int) {
}

func (m *simTransportModule) ApplyEvents(ctx context.Context, eventList *events.EventList) error {
_, err := modules.ApplyEventsSequentially(eventList, func(e *eventpb.Event) (*events.EventList, error) {
return events.EmptyList(), m.applyEvent(ctx, e)
Expand Down
5 changes: 5 additions & 0 deletions pkg/net/grpc/grpctransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ func (gt *Transport) Connect(ctx context.Context, nodes map[t.NodeID]t.NodeAddre
wg.Wait()
}

func (gt *Transport) WaitFor(n int) {
// TODO: We return immediately here, as the Connect() function already waits for all connections to be established.
// This is not right and should be done as in the libp2p transport.
}

// Establishes a connection to a single node at address addrString.
func (gt *Transport) connectToNode(ctx context.Context, addrString string) (GrpcTransport_ListenClient, error) {

Expand Down
37 changes: 35 additions & 2 deletions pkg/net/libp2p/libp2p_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
maxRetries = 20
noLoggingErrorAttempts = 2
PermanentAddrTTL = math.MaxInt64 - iota
connWaitPollInterval = 100 * time.Millisecond
)

type TransportMessage struct {
Expand All @@ -52,6 +53,10 @@ type connInfo struct {
Stream network.Stream
}

func (ci *connInfo) isConnected(h host.Host) bool {
return ci.Stream != nil && h.Network().Connectedness(ci.AddrInfo.ID) == network.Connected
}

type newStream struct {
NodeID types.NodeID
Stream network.Stream
Expand Down Expand Up @@ -158,6 +163,35 @@ func (t *Transport) Connect(ctx context.Context, nodes map[types.NodeID]types.No
t.connect(ctx, maputil.GetKeys(parsedAddrInfo))
}

// WaitFor polls the current connection state and returns when at least n connections have been established.
func (t *Transport) WaitFor(n int) {
for {
t.connsLock.RLock()
numConnections := 0
for _, ci := range t.conns {
if ci.isConnected(t.host) {
numConnections++
}
}
t.connsLock.RUnlock()

// We subtract one, as we always assume to be connected to ourselves
// and the connection to self does not appear in t.conns.
if numConnections >= n-1 {
return
}
time.Sleep(connWaitPollInterval)
}
}

// ConnectSync is a convenience method that triggers the connection process
// and waits for n connections to be established before returning.
// Equivalent to calling Connect and WaitFor.
func (t *Transport) ConnectSync(ctx context.Context, nodes map[types.NodeID]types.NodeAddress, n int) {
t.Connect(ctx, nodes)
t.WaitFor(n)
}

func (t *Transport) Send(ctx context.Context, dest types.NodeID, msg *messagepb.Message) error {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -280,8 +314,7 @@ func (t *Transport) connectToNode(ctx context.Context, nodeID types.NodeID) {
return
}

if conn.Stream != nil &&
t.host.Network().Connectedness(conn.AddrInfo.ID) == network.Connected {
if conn.isConnected(t.host) {
t.connsLock.Unlock()
t.logger.Log(logging.LevelInfo, "connection to node already exists", "src", t.ownID, "dst", nodeID)
return
Expand Down
7 changes: 6 additions & 1 deletion pkg/net/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ type Transport interface {
// Send sends msg to the node with ID dest.
Send(ctx context.Context, dest t.NodeID, msg *messagepb.Message) error

// Connect establishes (in parallel) network connections to the provided nodes.
// Connect initiates the establishing of network connections to the provided nodes.
// When Connect returns, the connections might not yet have been established though (see WaitFor).
Connect(ctx context.Context, nodes map[t.NodeID]t.NodeAddress)

// WaitFor waits until at least n connections (including the potentially virtual connection to self)
// have been established and returns.
WaitFor(n int)

// CloseOldConnections closes connections to the nodes that don't needed.
CloseOldConnections(newNodes map[t.NodeID]t.NodeAddress)
}
4 changes: 3 additions & 1 deletion pkg/systems/smr/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/pkg/errors"

"github.com/filecoin-project/mir/pkg/net"

"github.com/filecoin-project/mir/pkg/checkpoint"

"github.com/filecoin-project/mir/pkg/availability/batchdb/fakebatchdb"
Expand All @@ -16,7 +18,6 @@ import (
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/net"
libp2pnet "github.com/filecoin-project/mir/pkg/net/libp2p"
t "github.com/filecoin-project/mir/pkg/types"
)
Expand Down Expand Up @@ -57,6 +58,7 @@ func (sys *System) Start(ctx context.Context) error {
return errors.Wrap(err, "could not start network transport")
}
sys.transport.Connect(ctx, sys.initialMembership)
sys.transport.WaitFor(len(sys.initialMembership))
return nil
}

Expand Down

0 comments on commit c8c62d9

Please sign in to comment.