Skip to content

Commit

Permalink
Test read/write deadlines
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoPolo committed May 16, 2023
1 parent 3bce855 commit 336830e
Showing 1 changed file with 49 additions and 15 deletions.
64 changes: 49 additions & 15 deletions p2p/test/transport/deadline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ package transport_integration

import (
"context"
"regexp"
"strings"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

func TestStreamDeadlines(t *testing.T) {
func TestNewStreamDeadlines(t *testing.T) {
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
if strings.Contains(tc.Name, "WebSocket") || strings.Contains(tc.Name, "Yamux") {
Expand All @@ -36,24 +35,59 @@ func TestStreamDeadlines(t *testing.T) {
}
}

func TestDialDeadlines(t *testing.T) {
portMatcher := regexp.MustCompile(`(tcp|udp)\/\d+`)
func TestReadWriteDeadlines(t *testing.T) {
// Send a lot of data so that writes have to flush (can't just buffer it all)
sendBuf := make([]byte, 1<<20)

for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
listener := tc.HostGenerator(t, TransportTestCaseOpts{})
dialer := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})

listenerAddrString := listener.Addrs()[0].String()
// Replace the actual port with one that won't work
listenerAddr := multiaddr.StringCast(portMatcher.ReplaceAllString(listenerAddrString, "$1/2"))
require.NoError(t, dialer.Connect(context.Background(), peer.AddrInfo{
ID: listener.ID(),
Addrs: listener.Addrs(),
}))

dialer.Peerstore().AddAddr(listener.ID(), listenerAddr, peerstore.PermanentAddrTTL)
// This simply stalls
listener.SetStreamHandler("/stall/1", func(s network.Stream) {
time.Sleep(60 * time.Second)
s.Close()
})

ctx, cancel := context.WithCancel(context.Background())
cancel()
// Use cancelled context
_, err := dialer.Network().DialPeer(ctx, listener.ID())
require.ErrorIs(t, err, context.Canceled)
t.Run("ReadDeadline", func(t *testing.T) {
s, err := dialer.NewStream(context.Background(), listener.ID(), "/stall/1")
require.NoError(t, err)
defer s.Close()

start := time.Now()
// Set a deadline
s.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
buf := make([]byte, 1)
_, err = s.Read(buf)
require.Error(t, err)
require.Contains(t, err.Error(), "deadline")
require.Less(t, time.Since(start), 100*time.Millisecond)
})

t.Run("WriteDeadline", func(t *testing.T) {
s, err := dialer.NewStream(context.Background(), listener.ID(), "/stall/1")
require.NoError(t, err)
defer s.Close()

// Set a deadline
s.SetWriteDeadline(time.Now().Add(10 * time.Millisecond))
start := time.Now()
_, err = s.Write(sendBuf)
require.Error(t, err)
require.Contains(t, err.Error(), "deadline")
require.Less(t, time.Since(start), 100*time.Millisecond)

if strings.Contains(tc.Name, "mplex") {
// FIXME: mplex stalls on close, so we reset so we don't spend an extra 5s waiting for nothing
s.Reset()
}
})
})
}
}

0 comments on commit 336830e

Please sign in to comment.