Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

race: simulatenous NewStream blocks on one side despite successful connection #2589

Open
Tracked by #3005 ...
dennis-tra opened this issue Sep 29, 2023 · 9 comments
Open
Tracked by #3005 ...
Assignees
Labels
kind/bug A bug in existing code (including security flaws)

Comments

@dennis-tra
Copy link
Contributor

dennis-tra commented Sep 29, 2023

Simultaneously calling NewStream blocks on one side until the dial timeout was reached. Minimum repro example:

package main

import (
	"context"
	"fmt"
	"sync"
	"testing"

	"github.com/libp2p/go-libp2p"
	"github.com/libp2p/go-libp2p/core/host"
	"github.com/libp2p/go-libp2p/core/peerstore"
	"github.com/stretchr/testify/require"
)

func newHost(t *testing.T) host.Host {
	listenAddr := libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")

	h, err := libp2p.New(listenAddr)
	require.NoError(t, err)

	t.Cleanup(func() {
		if err = h.Close(); err != nil {
			t.Logf("unexpected error when closing host: %s", err)
		}
	})
	return h
}

func TestBlock(t *testing.T) {
	for i := 0; i < 100; i++ {
		t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {
			h1 := newHost(t)
			h2 := newHost(t)

			h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
			h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL)

			var wg sync.WaitGroup
			wg.Add(2)
			go func() {
				defer wg.Done()
				_, _ = h1.NewStream(context.Background(), h2.ID(), "/any/protocol")
			}()

			go func() {
				defer wg.Done()
				_, _ = h2.NewStream(context.Background(), h1.ID(), "/any/protocol")
			}()

			wg.Wait()
		})
	}
}

I cannot reproduce it on my Mac, but on my Linux server. I need to run the test a few times because it's only sometimes happening. This is an example output:

$ go test -v -run ./...
=== RUN   TestBlock
=== RUN   TestBlock/[0]
=== RUN   TestBlock/[1]
...
=== RUN   TestBlock/[98]
=== RUN   TestBlock/[99]
--- PASS: TestBlock (11.99s)
    --- PASS: TestBlock/[0] (0.03s)
    --- PASS: TestBlock/[1] (0.02s)
    ...
    --- PASS: TestBlock/[11] (0.02s)
    --- PASS: TestBlock/[12] (5.02s) <--------------------
    --- PASS: TestBlock/[13] (0.02s)
    ...
    --- PASS: TestBlock/[21] (0.02s)
    --- PASS: TestBlock/[22] (5.02s) <--------------------
    --- PASS: TestBlock/[23] (0.02s)
    ...
    --- PASS: TestBlock/[97] (0.02s)
    --- PASS: TestBlock/[98] (0.02s)
    --- PASS: TestBlock/[99] (0.02s)
PASS
ok  	github.com/dennis-tra/newstream-block	12.012s

Here's a repository with the minimum repro example: https://github.com/dennis-tra/newstream-block
If you go back a few commits, you can see my debugging attempts.

In the blocking case I saw that both hosts are listening on a TCP port but one host stalls at maDial. At the same time that stalled host accepts the dial attempt from the other host. So, there is a successful connection, but the NewStream/Connect call blocks until the dial attempt has timed out. When the dial has timed out, NewStream seems to find the successful connection that the other host established, open a stream, and return it as everything worked fine.

@iand
Copy link
Contributor

iand commented Sep 29, 2023

Just out of interest can you reproduce it when every host is given a unique and deterministic port, e.g. by using the loop count in newHost

@dennis-tra
Copy link
Contributor Author

Just tried it in this branch: https://github.com/dennis-tra/newstream-block/tree/port

No change - it still blocks sometimes.

@marten-seemann
Copy link
Contributor

I'm having trouble reproducing this locally. Could you try using an older libp2p version? We made some changes to these code paths recently.

cc @sukunrt

dennis-tra added a commit to libp2p/go-libp2p-kad-dht that referenced this issue Sep 29, 2023
By skipping connectivity checks we reduce the chances of simultaneously opening a stream that will block connection establishment.

Context: libp2p/go-libp2p#2589
@dennis-tra
Copy link
Contributor Author

dennis-tra commented Sep 29, 2023

I tried it with v0.30.0, and it's also happening with that version. Haven't tried older versions yet.

Note again, that I also cannot reproduce it on my Mac. I can only semi-reliably reproduce it on my linux box.

@sukunrt
Copy link
Member

sukunrt commented Sep 30, 2023

Some racy behaviour with tcp simultaneous connect and reuseport with an Accept on the same socket as the reused port.

In the event that the dial times out the sequence of tcp conn establishment packets is:

1. A         -> B: SYN
2. BListener -> A: SYN-ACK
3. BDialer   -> A: SYN
4. A         -> B: ACK
5. A         -> B: Duplicate ACK for the SYN from B Dialer

The connection from A to B is established and is received in the Accept loop after step 4.
But the BDialer keeps on sending SYN packets because it expects the remote to send a SYN which it never does only ACKing the received SYN packet.

packet capture

Here the failure case happens for addresses /ip4/127.0.0.1/tcp/40797
/ip4/127.0.0.1/tcp/41331

I'll debug more how mac behaviour defers here. This is on go-libp2p v0.26, before all the swarm changes.
Note: Most of these NewStream calls would be erroring on master as we do not support tcp simultaneous connect any more.

@dennis-tra dennis-tra changed the title bug: simulatenous NewStream blocks on one side despite successful connection race: simulatenous NewStream blocks on one side despite successful connection Sep 30, 2023
@sukunrt
Copy link
Member

sukunrt commented Sep 30, 2023

I wish I could tell you to use quic. But looks like there's a problem with simultaneous udp messages on linux :(
I'm getting this error sometimes:

=== RUN   TestBlock/[16]
fail: failed to dial: failed to dial 12D3KooWM8LoGTwRMZtq5zXpRVxUX9smFLEs9zfir5sqGSgjJec2: all dials failed
  * [/ip4/127.0.0.1/udp/33017/quic-v1] INTERNAL_ERROR (local): write udp4 127.0.0.1:46972->127.0.0.1:33017: sendmsg: operation not permitted

If I add a 10ms delay to one of the goroutines this error goes away. Tried this on two different linux machines.

@marten-seemann any idea why this happens?

The patch if you want to test this:

diff --git a/main_test.go b/main_test.go
index 42df802..2b1e29f 100644
--- a/main_test.go
+++ b/main_test.go
@@ -12,7 +12,7 @@ import (
 )
 
 func newHost(t *testing.T) host.Host {
-	listenAddr := libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")
+	listenAddr := libp2p.ListenAddrStrings("/ip4/127.0.0.1/udp/0/quic-v1")
 
 	h, err := libp2p.New(listenAddr)
 	require.NoError(t, err)
@@ -38,12 +38,18 @@ func TestBlock(t *testing.T) {
 			wg.Add(2)
 			go func() {
 				defer wg.Done()
-				_, _ = h1.NewStream(context.Background(), h2.ID(), "/any/protocol")
+				_, err := h1.NewStream(context.Background(), h2.ID(), "/any/protocol")
+				if err != nil {
+					fmt.Println("fail:", err)
+				}
 			}()
 
 			go func() {
 				defer wg.Done()
-				_, _ = h2.NewStream(context.Background(), h1.ID(), "/any/protocol")
+				_, err := h2.NewStream(context.Background(), h1.ID(), "/any/protocol")
+				if err != nil {
+					fmt.Println("fail:", err)
+				}
 			}()
 
 			wg.Wait()

@sukunrt sukunrt added the kind/bug A bug in existing code (including security flaws) label Oct 1, 2023
@sukunrt sukunrt self-assigned this Oct 1, 2023
@marten-seemann
Copy link
Contributor

If I add a 10ms delay to one of the goroutines this error goes away. Tried this on two different linux machines.

@marten-seemann any idea why this happens?

This seems to be a standard library bug. I opened golang/go#63322 with a minimal example.

@dennis-tra
Copy link
Contributor Author

dennis-tra commented Oct 2, 2023

In the TCP case, is there anything we can do in the B->A case when B waits for SYN packets from A? I would have expected that B will stop waiting for SYNs if the connection from A->B succeeds. Btw, the NewStream calls succeed on my machine. They just take 5s longer. I guess further up the stack, the connection from A->B is identified and then used for the new stream. Haven't looked deeply into that.

Snippet
package main

import (
	"context"
	"fmt"
	"github.com/libp2p/go-libp2p"
	"github.com/libp2p/go-libp2p/core/host"
	"github.com/libp2p/go-libp2p/core/network"
	"github.com/libp2p/go-libp2p/core/peerstore"
	"github.com/stretchr/testify/require"
	"sync"
	"testing"
)

func newHost(t *testing.T) host.Host {
	listenAddr := libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/0"))

	h, err := libp2p.New(listenAddr)
	require.NoError(t, err)

	t.Cleanup(func() {
		if err = h.Close(); err != nil {
			t.Logf("unexpected error when closing host: %s", err)
		}
	})
	return h
}

func TestBlock(t *testing.T) {
	for i := 0; i < 100; i++ {
		t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {
			h1 := newHost(t)
			h2 := newHost(t)

			h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
			h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL)

			h1.SetStreamHandler("/any/protocol", func(stream network.Stream) {})
			h2.SetStreamHandler("/any/protocol", func(stream network.Stream) {})

			var wg sync.WaitGroup
			wg.Add(2)
			go func() {
				defer wg.Done()
				_, err := h1.NewStream(context.Background(), h2.ID(), "/any/protocol")
				require.NoError(t, err)
			}()

			go func() {
				defer wg.Done()
				_, err := h2.NewStream(context.Background(), h1.ID(), "/any/protocol")
				require.NoError(t, err)
			}()

			wg.Wait()
		})
	}
}

@sukunrt
Copy link
Member

sukunrt commented Oct 2, 2023

I would have expected that B will stop waiting for SYNs if the connection from A->B succeeds.

Same. I'm not sure what's happening in the kernel here.

Btw, the NewStream calls succeed on my machine.

The call succeeds because of this line: https://github.com/libp2p/go-libp2p/blob/master/p2p/net/swarm/dial_worker.go#L376

Here the dial_worker_loop before exiting checks if somehow a connection has appeared, which in this case it has. This check exists because a concurrent dial might add different addresses to the peerstore and trigger the dial, this helps check if any of the newer addresses have succeeded.

For the solution, we need to ensure that a dial call also considers any incoming connections that have been established in the meantime.

I'd like to implement some variant of this logic: #1603 (comment)

The dial worker loop might be simplified by uncoupling the dial result from the specific dial call. Maybe by having an Add(ma.Multiaddr) function that joins a new dial job, and then Get() <-chan network.Conn function that returns a channel with all successfully established functions (maybe pass a filter function to Get)

either this or to setup a notification of incoming connections here: https://github.com/libp2p/go-libp2p/blob/master/p2p/net/swarm/dial_sync.go#L61
making it something like.

select {
	case res := <-resch:
		return res.conn, res.err
	case conn := <-incommingConns[peer]:
		return conn, nil
	case <-ctx.Done():
		return nil, ctx.Err()
	}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug A bug in existing code (including security flaws)
Projects
None yet
Development

No branches or pull requests

4 participants