You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am new to both Kotlin and Vert.x, but it appears to me that the current design of WriteStream.toChannel() may easily lead to the client either hanging (suspending) indefinitely or terminating prematurely. I suggest to improve the API and documentation so that clients have predictable and flexible control over when and how the data transfer coroutine terminates.
Hear me out.
Preface
WriteStream.toChannel() provides a SendChannel facade to a vertx WriteStream by starting a coroutine that simply reads the channel as long as possible, forwarding each message to the wrapped stream. The coroutine can terminate in three ways:
Case 1: normal termination: after sending messages, client closes SendChannel normally; after which the coroutine still has to fetch and write each sent message into the WriteStream
Case 2: abnormal termination at destination: writing into the stream fails for whatever reason (e.g. IOException from AsyncFile)
Case 3: abnormal termination at source: client closes SendChannel with an exception
Motivation
Forgetting Case 3 for now, we can see that the client has little control over when the coroutine actually finishes its job.
In Case 1, when the client closes the channel, some messages might not have been written yet. If, say, the client uses toChannel() to write to an AsyncFile, then they would reasonably want to know when the file has been fully committed to disk; so they might want to call asynchFile.closeAwait(). However, it must only be called once the coroutine has finished forwarding all messages from the channel to the stream. The client currently has no way of knowing when this point has been reached; it may be too soon to close the file right away when closing the channel. (Even worse, now the coroutine automatically calls stream.end(), which translates to asynchFile.close(), making a future call of asynchFile.closeAwait() impossible).
In Case 2, the coroutine stops after encountering the failure; the client, however, is not properly notified. The coroutine does call channel.close(ex) with the exception from the stream, which does indeed allow any futuresend() attempts to fail with the same exception. However, if the client has already suspended its previoussend() attempt due to the channel being full, then it will not be resumed by closing the channel, and the client will hang forever. (Note that ReceiveChannel.cancel() would resume the suspended client, but it would not propagate the exception; so while slightly nicer in letting the client know about the abnormal termination, it still does not inform the client about what went wrong.)
Suggested improvement
One possible solution would be to add something like an optional/default argument completionHandler: Handler<AsyncResult<Void>> that could be used to asynchronously indicate when the coroutine terminates, and whether it encountered an exception.
If, for normal termination, stream.end() is replaced with a successful completion of this handler, then Case 1 is solved neatly: the client can await the handler and then closeAwait() the AsyncFile safe in the knowledge that all data has been passed to it. If the user provides no completion handler, the default handler may keep the current behaviour of calling stream.end().
In Case 2, the coroutine shall fail the completion handler with the exception received from the stream in addition to closing (or, even better, cancel()-ling) the channel. The client, awaiting the completion handler, would receive this exception, which gives it the chance to (a) print/log it, (b) propagate it so that its own coroutine scope is cancelled, along with the suspended sender (if any). Note that this still requires detailed documentation on the API and careful implementation on the client side; especially if the coroutine implementation uses channel.close(ex) instead of the idiomatic cancel(), clients must be reminded to await the completion handler concurrently to sending (not after sending).
What do you think? Does this make sense?
The text was updated successfully, but these errors were encountered:
I am new to both Kotlin and Vert.x, but it appears to me that the current design of
WriteStream.toChannel()
may easily lead to the client either hanging (suspending) indefinitely or terminating prematurely. I suggest to improve the API and documentation so that clients have predictable and flexible control over when and how the data transfer coroutine terminates.Hear me out.
WriteStream.toChannel()
provides aSendChannel
facade to a vertxWriteStream
by starting a coroutine that simply reads the channel as long as possible, forwarding each message to the wrapped stream. The coroutine can terminate in three ways:SendChannel
normally; after which the coroutine still has to fetch and write each sent message into theWriteStream
IOException
fromAsyncFile
)SendChannel
with an exceptionForgetting Case 3 for now, we can see that the client has little control over when the coroutine actually finishes its job.
In Case 1, when the client closes the channel, some messages might not have been written yet. If, say, the client uses
toChannel()
to write to anAsyncFile
, then they would reasonably want to know when the file has been fully committed to disk; so they might want to callasynchFile.closeAwait()
. However, it must only be called once the coroutine has finished forwarding all messages from the channel to the stream. The client currently has no way of knowing when this point has been reached; it may be too soon to close the file right away when closing the channel. (Even worse, now the coroutine automatically callsstream.end()
, which translates toasynchFile.close()
, making a future call ofasynchFile.closeAwait()
impossible).In Case 2, the coroutine stops after encountering the failure; the client, however, is not properly notified. The coroutine does call
channel.close(ex)
with the exception from the stream, which does indeed allow any futuresend()
attempts to fail with the same exception. However, if the client has already suspended its previoussend()
attempt due to the channel being full, then it will not be resumed by closing the channel, and the client will hang forever. (Note thatReceiveChannel.cancel()
would resume the suspended client, but it would not propagate the exception; so while slightly nicer in letting the client know about the abnormal termination, it still does not inform the client about what went wrong.)One possible solution would be to add something like an optional/default argument
completionHandler: Handler<AsyncResult<Void>>
that could be used to asynchronously indicate when the coroutine terminates, and whether it encountered an exception.If, for normal termination,
stream.end()
is replaced with a successful completion of this handler, then Case 1 is solved neatly: the client can await the handler and thencloseAwait()
theAsyncFile
safe in the knowledge that all data has been passed to it. If the user provides no completion handler, the default handler may keep the current behaviour of callingstream.end()
.In Case 2, the coroutine shall fail the completion handler with the exception received from the stream in addition to closing (or, even better,
cancel()
-ling) the channel. The client, awaiting the completion handler, would receive this exception, which gives it the chance to (a) print/log it, (b) propagate it so that its own coroutine scope is cancelled, along with the suspended sender (if any). Note that this still requires detailed documentation on the API and careful implementation on the client side; especially if the coroutine implementation useschannel.close(ex)
instead of the idiomaticcancel()
, clients must be reminded to await the completion handler concurrently to sending (not after sending).What do you think? Does this make sense?
The text was updated successfully, but these errors were encountered: