diff --git a/.github/actions/go-test-setup/action.yml b/.github/actions/go-test-setup/action.yml index c7e4d11ac5..cd7b96dd2e 100644 --- a/.github/actions/go-test-setup/action.yml +++ b/.github/actions/go-test-setup/action.yml @@ -8,4 +8,5 @@ runs: - name: Run nocover tests. These are tests that require the coverage analysis to be off # See https://github.com/protocol/.github/issues/460 shell: bash # This matches only tests with "NoCover" in their test name to avoid running all tests again. - run: go test -tags nocover -run NoCover -v ./... + run: GOLOG_LOG_LEVEL=debug go test -v -run TestTransportWebRTC ./... + diff --git a/.github/workflows/go-test-config.json b/.github/workflows/go-test-config.json new file mode 100644 index 0000000000..77a15bb875 --- /dev/null +++ b/.github/workflows/go-test-config.json @@ -0,0 +1,3 @@ +{ + "skipOSes": ["windows", "linux", "ubuntu"] +} diff --git a/p2p/transport/webrtc/listener.go b/p2p/transport/webrtc/listener.go index dff23d984f..7d6a791ffc 100644 --- a/p2p/transport/webrtc/listener.go +++ b/p2p/transport/webrtc/listener.go @@ -324,7 +324,7 @@ func addOnConnectionStateChangeCallback(pc *webrtc.PeerConnection, side string) errC := make(chan error, 1) var once sync.Once pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { - fmt.Println(side, "connection state: ", state) + fmt.Printf("%p: %s connection state: %v\n", pc, side, state) switch state { case webrtc.PeerConnectionStateConnected: once.Do(func() { close(errC) }) diff --git a/p2p/transport/webrtc/transport_test.go b/p2p/transport/webrtc/transport_test.go index e0cc5ce6d4..01f6719a30 100644 --- a/p2p/transport/webrtc/transport_test.go +++ b/p2p/transport/webrtc/transport_test.go @@ -797,3 +797,85 @@ func TestMaxInFlightRequests(t *testing.T) { require.Equal(t, count, int(success.Load()), "expected exactly 3 dial successes") require.Equal(t, 1, int(fails.Load()), "expected exactly 1 dial failure") } + +func TestTransportWebRTC_ManyConnections(t *testing.T) { + tr, listeningPeer := getTransport(t) + listenMultiaddr := ma.StringCast("/ip4/127.0.0.1/udp/0/webrtc-direct") + listener, err := tr.Listen(listenMultiaddr) + require.NoError(t, err) + defer listener.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + N := 1000 + errs := make(chan error, N) + + // exits on listener close + go func() { + for i := 0; i < N; i++ { + lconn, err := listener.Accept() + if err != nil { + return + } + go func() { + stream, err := lconn.AcceptStream() + if err != nil { + return + } + stream.Write([]byte{1, 2, 3, 4}) + buf := make([]byte, 10) + stream.Read(buf) + lconn.Close() + }() + } + }() + + dialAndReceiveData := func() { + var err error + defer func() { + errs <- err + }() + tr1, _ := getTransport(t) + conn, err := tr1.Dial(ctx, listener.Multiaddr(), listeningPeer) + if !assert.NoError(t, err) { + return + } + defer conn.Close() + // create a stream + stream, err := conn.OpenStream(context.Background()) + if !assert.NoError(t, err) { + return + } + stream.SetReadDeadline(time.Now().Add(30 * time.Second)) + buf := make([]byte, 10) + n, err := stream.Read(buf) + if !assert.NoError(t, err) { + return + } + if !assert.Equal(t, 4, n) { + return + } + } + + go func() { + var cnt atomic.Int32 + sem := make(chan struct{}, 2) + for i := 0; i < N; i++ { + sem <- struct{}{} + go func() { + dialAndReceiveData() + fmt.Println("completed", cnt.Add(1)) + <-sem + }() + } + }() + + for i := 0; i < N; i++ { + err := <-errs + if err != nil { + t.Fatal(err) + return + } + } +}