Skip to content

Commit

Permalink
Merge pull request quic-go#3290 from lucas-clemente/fix-receive-strea…
Browse files Browse the repository at this point in the history
…m-mutex

don't unlock the receive stream mutex for copying from STREAM frames
  • Loading branch information
marten-seemann authored Oct 19, 2021
2 parents bb8d484 + 2c8b939 commit b935a54
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 3 deletions.
88 changes: 88 additions & 0 deletions integrationtests/self/cancelation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,4 +631,92 @@ var _ = Describe("Stream Cancelations", func() {
Expect(server.Close()).To(Succeed())
})
})

It("doesn't run into any errors when streams are canceled all the time", func() {
const maxIncomingStreams = 1000
server, err := quic.ListenAddr(
"localhost:0",
getTLSConfig(),
getQuicConfig(&quic.Config{MaxIncomingStreams: maxIncomingStreams, MaxIdleTimeout: 10 * time.Second}),
)
Expect(err).ToNot(HaveOccurred())

var wg sync.WaitGroup
wg.Add(2 * 4 * maxIncomingStreams)
handleStream := func(str quic.Stream) {
str.SetDeadline(time.Now().Add(time.Second))
go func() {
defer wg.Done()
if rand.Int31()%2 == 0 {
defer GinkgoRecover()
io.ReadAll(str)
}
}()
go func() {
defer wg.Done()
if rand.Int31()%2 == 0 {
str.Write([]byte("foobar"))
if rand.Int31()%2 == 0 {
str.Close()
}
}
}()
go func() {
defer wg.Done()
// Make sure we at least send out *something* for the last stream,
// otherwise the peer might never receive this anything for this stream.
if rand.Int31()%2 == 0 || str.StreamID() == 4*(maxIncomingStreams-1) {
str.CancelWrite(1234)
}
}()
go func() {
defer wg.Done()
if rand.Int31()%2 == 0 {
str.CancelRead(1234)
}
}()
}

serverRunning := make(chan struct{})
go func() {
defer GinkgoRecover()
defer close(serverRunning)
conn, err := server.Accept(context.Background())
Expect(err).ToNot(HaveOccurred())
for {
str, err := conn.AcceptStream(context.Background())
if err != nil {
// Make sure the session is closed regularly.
Expect(err).To(BeAssignableToTypeOf(&quic.ApplicationError{}))
return
}
handleStream(str)
}
}()

sess, err := quic.DialAddr(
fmt.Sprintf("localhost:%d", server.Addr().(*net.UDPAddr).Port),
getTLSClientConfig(),
getQuicConfig(&quic.Config{}),
)
Expect(err).ToNot(HaveOccurred())

for i := 0; i < maxIncomingStreams; i++ {
str, err := sess.OpenStreamSync(context.Background())
Expect(err).ToNot(HaveOccurred())
handleStream(str)
}

// We don't expect to accept any stream here.
// We're just making sure the session stays open and there's no error.
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
_, err = sess.AcceptStream(ctx)
Expect(err).To(MatchError(context.DeadlineExceeded))

wg.Wait()

Expect(sess.CloseWithError(0, "")).To(Succeed())
Eventually(serverRunning).Should(BeClosed())
})
})
3 changes: 0 additions & 3 deletions receive_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,10 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err
return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame))
}

s.mutex.Unlock()

m := copy(p[bytesRead:], s.currentFrame[s.readPosInFrame:])
s.readPosInFrame += m
bytesRead += m

s.mutex.Lock()
// when a RESET_STREAM was received, the was already informed about the final byteOffset for this stream
if !s.resetRemotely {
s.flowController.AddBytesRead(protocol.ByteCount(m))
Expand Down

0 comments on commit b935a54

Please sign in to comment.