Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.9.x]: Confirm we can read a response that completed before RST_STREAM (#6293) #6914

Merged
merged 1 commit into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
*/
package okhttp3.internal.http2

import java.io.EOFException
import java.io.IOException
import java.io.InterruptedIOException
import java.net.SocketTimeoutException
import java.util.ArrayDeque
import okhttp3.Headers
import okhttp3.internal.EMPTY_HEADERS
import okhttp3.internal.assertThreadDoesntHoldLock
Expand All @@ -32,6 +27,11 @@ import okio.BufferedSource
import okio.Sink
import okio.Source
import okio.Timeout
import java.io.EOFException
import java.io.IOException
import java.io.InterruptedIOException
import java.net.SocketTimeoutException
import java.util.ArrayDeque

/** A logical bidirectional stream. */
@Suppress("NAME_SHADOWING")
Expand Down Expand Up @@ -61,7 +61,7 @@ class Http2Stream internal constructor(
var writeBytesMaximum: Long = connection.peerSettings.initialWindowSize.toLong()
internal set

/** Received headers yet to be [taken][takeHeaders], or [read][FramingSource.read]. */
/** Received headers yet to be [taken][takeHeaders]. */
private val headersQueue = ArrayDeque<Headers>()

/** True if response headers have been sent or received. */
Expand Down Expand Up @@ -154,13 +154,13 @@ class Http2Stream internal constructor(
*/
@Synchronized @Throws(IOException::class)
fun trailers(): Headers {
if (source.finished && source.receiveBuffer.exhausted() && source.readBuffer.exhausted()) {
return source.trailers ?: EMPTY_HEADERS
}
if (errorCode != null) {
throw errorException ?: StreamResetException(errorCode!!)
}
check(source.finished && source.receiveBuffer.exhausted() && source.readBuffer.exhausted()) {
"too early; can't read the trailers yet"
}
return source.trailers ?: EMPTY_HEADERS
throw IllegalStateException("too early; can't read the trailers yet")
}

/**
Expand Down Expand Up @@ -276,10 +276,7 @@ class Http2Stream internal constructor(
this.source.receive(source, length.toLong())
}

/**
* Accept headers from the network and store them until the client calls [takeHeaders], or
* [FramingSource.read] them.
*/
/** Accept headers from the network and store them until the client calls [takeHeaders]. */
fun receiveHeaders(headers: Headers, inFinished: Boolean) {
this@Http2Stream.assertThreadDoesntHoldLock()

Expand Down Expand Up @@ -560,7 +557,7 @@ class Http2Stream internal constructor(
checkOutNotClosed() // Kick out if the stream was reset or closed while waiting.
toWrite = minOf(writeBytesMaximum - writeBytesTotal, sendBuffer.size)
writeBytesTotal += toWrite
outFinished = outFinishedOnLastFrame && toWrite == sendBuffer.size && errorCode == null
outFinished = outFinishedOnLastFrame && toWrite == sendBuffer.size
}

writeTimeout.enter()
Expand All @@ -578,6 +575,7 @@ class Http2Stream internal constructor(
synchronized(this@Http2Stream) {
checkOutNotClosed()
}
// TODO(jwilson): flush the connection?!
while (sendBuffer.size > 0L) {
emitFrame(false)
connection.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,105 @@ public final class Http2ConnectionTest {
assertThat(synStream.headerBlock).isEqualTo(headerEntries("a", "artichaut"));
}

/** A server RST_STREAM shouldn't prevent the client from consuming the response body. */
@Test public void serverResponseBodyRstStream() throws Exception {
// write the mocking script
peer.sendFrame().settings(new Settings());
peer.acceptFrame(); // ACK
peer.acceptFrame(); // SYN_STREAM
peer.acceptFrame(); // PING
peer.sendFrame().headers(false, 3, headerEntries("a", "android"));
peer.sendFrame().data(true, 3, new Buffer().writeUtf8("robot"), 5);
peer.sendFrame().rstStream(3, ErrorCode.NO_ERROR);
peer.sendFrame().ping(true, AWAIT_PING, 0); // PONG
peer.play();

// play it back
Http2Connection connection = connect(peer);
Http2Stream stream = connection.newStream(headerEntries(), false);
connection.writePingAndAwaitPong();
assertThat(stream.takeHeaders()).isEqualTo(Headers.of("a", "android"));
BufferedSource source = Okio.buffer(stream.getSource());
assertThat(source.readUtf8(5)).isEqualTo("robot");
stream.getSink().close();
assertThat(connection.openStreamCount()).isEqualTo(0);

// verify the peer received what was expected
InFrame synStream = peer.takeFrame();
assertThat(synStream.type).isEqualTo(Http2.TYPE_HEADERS);
InFrame ping = peer.takeFrame();
assertThat(ping.type).isEqualTo(Http2.TYPE_PING);
}

/** A server RST_STREAM shouldn't prevent the client from consuming trailers. */
@Test public void serverTrailersRstStream() throws Exception {
// write the mocking script
peer.sendFrame().settings(new Settings());
peer.acceptFrame(); // ACK
peer.acceptFrame(); // SYN_STREAM
peer.acceptFrame(); // PING
peer.sendFrame().headers(false, 3, headerEntries("a", "android"));
peer.sendFrame().headers(true, 3, headerEntries("z", "zebra"));
peer.sendFrame().rstStream(3, ErrorCode.NO_ERROR);
peer.sendFrame().ping(true, AWAIT_PING, 0); // PONG
peer.play();

// play it back
Http2Connection connection = connect(peer);
Http2Stream stream = connection.newStream(headerEntries(), true);
connection.writePingAndAwaitPong();
assertThat(stream.takeHeaders()).isEqualTo(Headers.of("a", "android"));
stream.getSink().close();
assertThat(stream.trailers()).isEqualTo(Headers.of("z", "zebra"));
assertThat(connection.openStreamCount()).isEqualTo(0);

// verify the peer received what was expected
InFrame synStream = peer.takeFrame();
assertThat(synStream.type).isEqualTo(Http2.TYPE_HEADERS);
InFrame ping = peer.takeFrame();
assertThat(ping.type).isEqualTo(Http2.TYPE_PING);
}

/**
* A server RST_STREAM shouldn't prevent the client from consuming the response body, even if it
* follows a truncated request body.
*/
@Test public void clientRequestBodyServerResponseBodyRstStream() throws Exception {
// write the mocking script
peer.sendFrame().settings(new Settings());
peer.acceptFrame(); // ACK
peer.acceptFrame(); // SYN_STREAM
peer.acceptFrame(); // PING
peer.sendFrame().headers(false, 3, headerEntries("a", "android"));
peer.sendFrame().data(true, 3, new Buffer().writeUtf8("robot"), 5);
peer.sendFrame().rstStream(3, ErrorCode.NO_ERROR);
peer.sendFrame().ping(true, AWAIT_PING, 0); // PONG
peer.play();

// play it back
Http2Connection connection = connect(peer);
Http2Stream stream = connection.newStream(headerEntries(), true);
connection.writePingAndAwaitPong();
BufferedSink sink = Okio.buffer(stream.getSink());
sink.writeUtf8("abc");
try {
sink.close();
fail();
} catch (StreamResetException expected) {
assertThat(expected.errorCode).isEqualTo(ErrorCode.NO_ERROR);
}
assertThat(stream.takeHeaders()).isEqualTo(Headers.of("a", "android"));
BufferedSource source = Okio.buffer(stream.getSource());
assertThat(source.readUtf8(5)).isEqualTo("robot");
assertThat(connection.openStreamCount()).isEqualTo(0);

// verify the peer received what was expected
InFrame synStream = peer.takeFrame();
assertThat(synStream.type).isEqualTo(Http2.TYPE_HEADERS);
InFrame ping = peer.takeFrame();
assertThat(ping.type).isEqualTo(Http2.TYPE_PING);
}

@Test public void serverWritesTrailersWithData() throws Exception {
// We buffer some outbound data and headers and confirm that the END_STREAM flag comes with the
// headers (and not with the data).
Expand Down