diff --git a/p2p/test/transport/deadline_test.go b/p2p/test/transport/deadline_test.go index 5548c6eea9..3f79f1af70 100644 --- a/p2p/test/transport/deadline_test.go +++ b/p2p/test/transport/deadline_test.go @@ -2,17 +2,16 @@ package transport_integration import ( "context" - "regexp" "strings" "testing" + "time" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" ) -func TestStreamDeadlines(t *testing.T) { +func TestNewStreamDeadlines(t *testing.T) { for _, tc := range transportsToTest { t.Run(tc.Name, func(t *testing.T) { if strings.Contains(tc.Name, "WebSocket") || strings.Contains(tc.Name, "Yamux") { @@ -36,24 +35,59 @@ func TestStreamDeadlines(t *testing.T) { } } -func TestDialDeadlines(t *testing.T) { - portMatcher := regexp.MustCompile(`(tcp|udp)\/\d+`) +func TestReadWriteDeadlines(t *testing.T) { + // Send a lot of data so that writes have to flush (can't just buffer it all) + sendBuf := make([]byte, 1<<20) + for _, tc := range transportsToTest { t.Run(tc.Name, func(t *testing.T) { listener := tc.HostGenerator(t, TransportTestCaseOpts{}) dialer := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true}) - listenerAddrString := listener.Addrs()[0].String() - // Replace the actual port with one that won't work - listenerAddr := multiaddr.StringCast(portMatcher.ReplaceAllString(listenerAddrString, "$1/2")) + require.NoError(t, dialer.Connect(context.Background(), peer.AddrInfo{ + ID: listener.ID(), + Addrs: listener.Addrs(), + })) - dialer.Peerstore().AddAddr(listener.ID(), listenerAddr, peerstore.PermanentAddrTTL) + // This simply stalls + listener.SetStreamHandler("/stall/1", func(s network.Stream) { + time.Sleep(60 * time.Second) + s.Close() + }) - ctx, cancel := context.WithCancel(context.Background()) - cancel() - // Use cancelled context - _, err := dialer.Network().DialPeer(ctx, listener.ID()) - require.ErrorIs(t, err, context.Canceled) + t.Run("ReadDeadline", func(t *testing.T) { + s, err := dialer.NewStream(context.Background(), listener.ID(), "/stall/1") + require.NoError(t, err) + defer s.Close() + + start := time.Now() + // Set a deadline + s.SetReadDeadline(time.Now().Add(10 * time.Millisecond)) + buf := make([]byte, 1) + _, err = s.Read(buf) + require.Error(t, err) + require.Contains(t, err.Error(), "deadline") + require.Less(t, time.Since(start), 100*time.Millisecond) + }) + + t.Run("WriteDeadline", func(t *testing.T) { + s, err := dialer.NewStream(context.Background(), listener.ID(), "/stall/1") + require.NoError(t, err) + defer s.Close() + + // Set a deadline + s.SetWriteDeadline(time.Now().Add(10 * time.Millisecond)) + start := time.Now() + _, err = s.Write(sendBuf) + require.Error(t, err) + require.Contains(t, err.Error(), "deadline") + require.Less(t, time.Since(start), 100*time.Millisecond) + + if strings.Contains(tc.Name, "mplex") { + // FIXME: mplex stalls on close, so we reset so we don't spend an extra 5s waiting for nothing + s.Reset() + } + }) }) } }