Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add waiting for establishing network connections #267

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/deploytest/faketransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ func (fl *FakeLink) Connect(ctx context.Context, nodes map[t.NodeID]t.NodeAddres
}
}

// WaitFor returns immediately.
// It does not need to wait for anything, since the Connect() function already waits for all the connections.
// TODO: Technically this does not properly implement the semantics, as calling WaitFor without having called Connect
// should block. Fix this.
func (fl *FakeLink) WaitFor(n int) {
}

func (fl *FakeLink) Stop() {
close(fl.DoneC)
}
4 changes: 4 additions & 0 deletions pkg/deploytest/simtransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func (m *simTransportModule) Connect(ctx context.Context, nodes map[t.NodeID]t.N
go m.handleOutChan(ctx, m.SimTransport.Simulation.Spawn())
}

// WaitFor returns immediately, since the simulated transport does not need to wait for anything.
func (m *simTransportModule) WaitFor(n int) {
}

func (m *simTransportModule) ApplyEvents(ctx context.Context, eventList *events.EventList) error {
_, err := modules.ApplyEventsSequentially(eventList, func(e *eventpb.Event) (*events.EventList, error) {
return events.EmptyList(), m.applyEvent(ctx, e)
Expand Down
5 changes: 5 additions & 0 deletions pkg/net/grpc/grpctransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ func (gt *Transport) Connect(ctx context.Context, nodes map[t.NodeID]t.NodeAddre
wg.Wait()
}

func (gt *Transport) WaitFor(n int) {
// TODO: We return immediately here, as the Connect() function already waits for all connections to be established.
// This is not right and should be done as in the libp2p transport.
}

// Establishes a connection to a single node at address addrString.
func (gt *Transport) connectToNode(ctx context.Context, addrString string) (GrpcTransport_ListenClient, error) {

Expand Down
37 changes: 35 additions & 2 deletions pkg/net/libp2p/libp2p_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
maxRetries = 20
noLoggingErrorAttempts = 2
PermanentAddrTTL = math.MaxInt64 - iota
connWaitPollInterval = 100 * time.Millisecond
)

type TransportMessage struct {
Expand All @@ -52,6 +53,10 @@ type connInfo struct {
Stream network.Stream
}

func (ci *connInfo) isConnected(h host.Host) bool {
return ci.Stream != nil && h.Network().Connectedness(ci.AddrInfo.ID) == network.Connected
}

type newStream struct {
NodeID types.NodeID
Stream network.Stream
Expand Down Expand Up @@ -158,6 +163,35 @@ func (t *Transport) Connect(ctx context.Context, nodes map[types.NodeID]types.No
t.connect(ctx, maputil.GetKeys(parsedAddrInfo))
}

// WaitFor polls the current connection state and returns when at least n connections have been established.
func (t *Transport) WaitFor(n int) {
for {
t.connsLock.RLock()
numConnections := 0
for _, ci := range t.conns {
if ci.isConnected(t.host) {
numConnections++
}
}
t.connsLock.RUnlock()

// We subtract one, as we always assume to be connected to ourselves
// and the connection to self does not appear in t.conns.
if numConnections >= n-1 {
return
}
time.Sleep(connWaitPollInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Busy waiting? 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

polling ;-)

(This is just an ad-hoc implementation that we needed quickly, it can be done properly together with the next overhaul of the transport module that will need to happen at some point anyway.)

}
}

// ConnectSync is a convenience method that triggers the connection process
// and waits for n connections to be established before returning.
// Equivalent to calling Connect and WaitFor.
func (t *Transport) ConnectSync(ctx context.Context, nodes map[types.NodeID]types.NodeAddress, n int) {
t.Connect(ctx, nodes)
t.WaitFor(n)
}

func (t *Transport) Send(ctx context.Context, dest types.NodeID, msg *messagepb.Message) error {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -280,8 +314,7 @@ func (t *Transport) connectToNode(ctx context.Context, nodeID types.NodeID) {
return
}

if conn.Stream != nil &&
t.host.Network().Connectedness(conn.AddrInfo.ID) == network.Connected {
if conn.isConnected(t.host) {
t.connsLock.Unlock()
t.logger.Log(logging.LevelInfo, "connection to node already exists", "src", t.ownID, "dst", nodeID)
return
Expand Down
7 changes: 6 additions & 1 deletion pkg/net/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ type Transport interface {
// Send sends msg to the node with ID dest.
Send(ctx context.Context, dest t.NodeID, msg *messagepb.Message) error

// Connect establishes (in parallel) network connections to the provided nodes.
// Connect initiates the establishing of network connections to the provided nodes.
// When Connect returns, the connections might not yet have been established though (see WaitFor).
Connect(ctx context.Context, nodes map[t.NodeID]t.NodeAddress)

// WaitFor waits until at least n connections (including the potentially virtual connection to self)
// have been established and returns.
WaitFor(n int)

// CloseOldConnections closes connections to the nodes that don't needed.
CloseOldConnections(newNodes map[t.NodeID]t.NodeAddress)
}
4 changes: 3 additions & 1 deletion pkg/systems/smr/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/pkg/errors"

"github.com/filecoin-project/mir/pkg/net"

"github.com/filecoin-project/mir/pkg/checkpoint"

"github.com/filecoin-project/mir/pkg/availability/batchdb/fakebatchdb"
Expand All @@ -16,7 +18,6 @@ import (
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/net"
libp2pnet "github.com/filecoin-project/mir/pkg/net/libp2p"
t "github.com/filecoin-project/mir/pkg/types"
)
Expand Down Expand Up @@ -57,6 +58,7 @@ func (sys *System) Start(ctx context.Context) error {
return errors.Wrap(err, "could not start network transport")
}
sys.transport.Connect(ctx, sys.initialMembership)
sys.transport.WaitFor(len(sys.initialMembership))
return nil
}

Expand Down