Skip to content

Commit

Permalink
fix(rt): propagate crt stream errors to response body consumer (#510)
Browse files Browse the repository at this point in the history
  • Loading branch information
aajtodd committed Mar 15, 2022
1 parent 5a5b93b commit e0c21ee
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ internal abstract class AbstractBufferedReadChannel(
val success = _closed.compareAndSet(null, ClosedSentinel(cause))
if (!success) return false

segments.close()
segments.close(cause)

readOp.getAndSet(null)?.let { cont ->
if (cause != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ package aws.sdk.kotlin.runtime.http.engine.crt
import aws.sdk.kotlin.crt.http.*
import aws.sdk.kotlin.crt.io.byteArrayBuffer
import aws.sdk.kotlin.runtime.testing.runSuspendTest
import aws.smithy.kotlin.runtime.ClientException
import aws.smithy.kotlin.runtime.http.HttpBody
import aws.smithy.kotlin.runtime.http.HttpStatusCode
import io.kotest.matchers.string.shouldContain
import kotlinx.coroutines.launch
import kotlin.test.*

Expand Down Expand Up @@ -135,4 +137,33 @@ class SdkStreamResponseHandlerTest {

assertEquals(data, respChan.readRemaining().decodeToString())
}

@Test
fun testStreamError(): Unit = runSuspendTest {
val handler = SdkStreamResponseHandler(mockConn)
val stream = MockHttpStream(200)
val data = "foo bar"
val socketClosedEc = 1051
launch {
val headers = listOf(
HttpHeader("Content-Length", "${data.length}")
)
handler.onResponseHeaders(stream, 200, HttpHeaderBlock.MAIN.blockType, headers)
handler.onResponseHeadersDone(stream, HttpHeaderBlock.MAIN.blockType)
handler.onResponseBody(stream, byteArrayBuffer("foo".encodeToByteArray()))
handler.onResponseComplete(stream, socketClosedEc)
}

// should be signalled as soon as headers are available
val resp = handler.waitForResponse()
assertEquals(HttpStatusCode.OK, resp.status)

assertEquals(data.length.toLong(), resp.body.contentLength)
val respChan = (resp.body as HttpBody.Streaming).readFrom()

assertTrue(respChan.isClosedForWrite)
assertFailsWith<ClientException> {
respChan.readRemaining()
}.message.shouldContain("CrtHttpEngine::response failed: ec=$socketClosedEc")
}
}

0 comments on commit e0c21ee

Please sign in to comment.