-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix several streaming issues #106
Conversation
Update conformance tests to add a new test exercising client side streaming, which exposed several issues in streaming call implementations. The first issue only affected client streaming (it stopped attempting to read a response from the server one the send side closed). The second issue resulted from not calling close on the channel after the completion message was received, which lead to hangs consuming from resultChannel() (it would never complete). After this fix, both examples (for java and javalite) were updated and fixed to correctly exit when finished. Additionally, several cleanups were made to the API (since the current API for client streaming was non-functional - it would only return the initial Headers result and not the message or completion result). The API updates are as follows: com.connectrpc.BidirectionalStreamInterface Removed: * close() * Use sendClose() instead. This may have confused callers that the close() method would close both send and receive sides of the connection when it was only closing the send side. com.connectrpc.ClientOnlyStreamInterface Added: * sendClose() * This shouldn't typically need to be called as receiveAndClose() already closes the send side of the stream. * isSendClosed() Changed: * receiveAndClose() * Changed to return a ResponseMessage instead of a StreamResult. This allows callers to easily get access to the response as if they were calling a unary response. Previously, the StreamResult would only return the first result retrieved by the call, which typically was a Headers result (leaving callers unable to access the Message or Completion contents). Removed: * close() * Replaced with sendClose(). com.connectrpc.ServerOnlyStreamInterface Added: * receiveClose() * isReceiveClosed() Removed: * close() * This closed both the send and receive side of the stream (unlike close() in other interfaces which just closed the send side). If needed, callers should invoke receiveClose() instead (although this isn't necessary in normal use). * send() * Callers should invoke sendAndClose() instead. Otherwise, reading results from resultChannel() will hang since the send side of the stream should be closed before reading responses. com.connectrpc.StreamResult: * Removed the 'error' field from the base StreamResult class. It was never set by Headers or Message StreamResult types and only used on the Complete type. This should make it easier for callers to handle Headers/Message result types since they don't need to worry about additional error handling.
@@ -754,6 +750,94 @@ class Conformance( | |||
assertThat(countDownLatch.count).isZero() | |||
} | |||
|
|||
@Test | |||
fun clientStreaming(): Unit = runBlocking { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New test for client streaming (used to find several issues with streaming calls and API definitions).
val error: Throwable?, | ||
) | ||
|
||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should really revisit the APIs around streaming calls to make this sort of code unnecessary. For now however, this will at least allow us to easily consume all of the data from a server/bidi streaming response and perform assertions on headers, messages, trailers, and errors.
@@ -9,10 +9,17 @@ plugins: | |||
- plugin: java | |||
out: generated-google-java/build/generated/sources/bufgen | |||
protoc_path: .tmp/bin/protoc | |||
- plugin: kotlin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generate kotlin code so our examples don't look so verbose.
try { | ||
connectStreaming(elizaServiceClient) | ||
} finally { | ||
okHttpClient.dispatcher.executorService.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is necessary for the client to cleanly shut down (the dispatcher thread pool uses non-daemon threads, so we need to shut it down manually for the app to shut down cleanly.
} | ||
} | ||
|
||
private suspend fun connectStreaming(elizaServiceClient: ElizaServiceClient) { | ||
val stream = elizaServiceClient.converse() | ||
withContext(Dispatchers.IO) { | ||
// Add the message the user is sending to the views. | ||
stream.send(ConverseRequest.newBuilder().setSentence("hello").build()) | ||
stream.send(converseRequest { sentence = "hello" }) | ||
stream.sendClose() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't close the bidi side of the stream, we won't ever complete.
if (connectErr != null) { | ||
throw connectErr | ||
} | ||
throw ConnectError(code = result.code, metadata = result.trailers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this to at least show how errors need to be handled today in the current API. In a follow up, I'll see if this can be improved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just print the error details as a response?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to illustrate how you might want to handle errors in real code (instead of printing code/cause/trailers).
I think it also points out opportunities to make it even easier for consumers - if the Code is ever not OK, we should always have a ConnectError (even if it wraps another cause).
) : ResponseMessage<Output>(code, headers, trailers) | ||
) : ResponseMessage<Output>(code, headers, trailers) { | ||
override fun toString(): String { | ||
return "Success{message=$message,code=$code,headers=$headers,trailers=$trailers}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added for easier debugging.
@@ -211,6 +211,9 @@ class ProtocolClient( | |||
} | |||
} | |||
channel.send(result) | |||
if (isComplete) { | |||
channel.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a major issue - not closing the channel would lead to hangs in streaming clients trying to read results.
@@ -61,7 +61,7 @@ internal fun OkHttpClient.initializeStream( | |||
} | |||
val callRequest = builder.build() | |||
val call = newCall(callRequest) | |||
call.enqueue(ResponseCallback(onResult, isSendClosed)) | |||
call.enqueue(ResponseCallback(onResult, isReceiveClosed)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the other big error for client streaming calls - this was using isSendClosed
instead of isReceiveClosed
, so we would stop trying to read messages from a stream once the send side was closed.
conformance/google-java/src/test/kotlin/com/connectrpc/conformance/Conformance.kt
Outdated
Show resolved
Hide resolved
conformance/google-java/src/test/kotlin/com/connectrpc/conformance/Conformance.kt
Outdated
Show resolved
Hide resolved
if (connectErr != null) { | ||
throw connectErr | ||
} | ||
throw ConnectError(code = result.code, metadata = result.trailers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just print the error details as a response?
Co-authored-by: Michael Rebello <me@michaelrebello.com>
Update conformance tests to add a new test exercising client side streaming, which exposed several issues in streaming call implementations.
The first issue only affected client streaming (it stopped attempting to read a response from the server once the send side closed - it should have stopped only if the receive side closed).
The second issue resulted from not calling close on the channel after the completion message was received, which lead to hangs consuming from
resultChannel()
(it would never complete). After this fix, both examples (for java and javalite) were updated and fixed to correctly exit when finished.Additionally, several cleanups were made to the API (since the current API for client streaming was non-functional - it would only return the initial Headers result and not the message or completion result).
This should help resolve reported streaming issues like #100.
API Updates
com.connectrpc.BidirectionalStreamInterface
Removed
close()
sendClose()
instead. This may have confused callers that the close() method would close both send and receive sides of the connection when it was only closing the send side.com.connectrpc.ClientOnlyStreamInterface
Added
sendClose()
isSendClosed()
Changed
receiveAndClose()
Removed
close()
sendClose()
.com.connectrpc.ServerOnlyStreamInterface
Added
receiveClose()
isReceiveClosed()
Removed
close()
receiveClose()
instead (although this isn't necessary in normal use).send()
sendAndClose()
instead. Otherwise, reading results fromresultChannel()
will hang since the send side of the stream should be closed before reading responses.com.connectrpc.StreamResult
Removed
error
field from the baseStreamResult
class. It was never used by theHeaders
orMessage
subclasses and only used on theComplete
type. This should make it easier for callers to useHeaders
andMessage
types since they don't need to worry about handlingerror
.