Skip to content

Commit

Permalink
Add join() operation to wait for future completion. (#525)
Browse files Browse the repository at this point in the history
* Add `join()` operation to wait for future completion without cancelling it when `join()` got cancelled.

* Start using join() operation.
  • Loading branch information
cheatfate authored Apr 3, 2024
1 parent 402914f commit 2d85229
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 64 deletions.
18 changes: 1 addition & 17 deletions chronos/apps/http/httpserver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1187,23 +1187,7 @@ proc closeWait*(server: HttpServerRef) {.async: (raises: []).} =
proc join*(server: HttpServerRef): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Wait until HTTP server will not be closed.
var retFuture = newFuture[void]("http.server.join")

proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()

proc cancellation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
server.lifetime.removeCallback(continuation, cast[pointer](retFuture))

if server.state == ServerClosed:
retFuture.complete()
else:
server.lifetime.addCallback(continuation, cast[pointer](retFuture))
retFuture.cancelCallback = cancellation

retFuture
server.lifetime.join()

proc getMultipartReader*(req: HttpRequestRef): HttpResult[MultiPartReaderRef] =
## Create new MultiPartReader interface for specific request.
Expand Down
33 changes: 33 additions & 0 deletions chronos/internal/asyncfutures.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,39 @@ proc wait*[T](fut: Future[T], timeout = -1): Future[T] {.
else:
wait(fut, timeout.milliseconds())

proc join*(future: FutureBase): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Returns a future which will complete once future ``future`` completes.
##
## This primitive helps to carefully monitor ``future`` state, in case of
## cancellation ``join`` operation it will not going to cancel ``future``.
##
## If ``future`` is already completed - ``join`` will return completed
## future immediately.
let retFuture = newFuture[void]("chronos.join()")

proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete()

proc cancellation(udata: pointer) {.gcsafe.} =
future.removeCallback(continuation, cast[pointer](retFuture))

if not(future.finished()):
future.addCallback(continuation, cast[pointer](retFuture))
retFuture.cancelCallback = cancellation
else:
retFuture.complete()

retFuture

proc join*(future: SomeFuture): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Returns a future which will complete once future ``future`` completes.
##
## This primitive helps to carefully monitor ``future`` state, in case of
## cancellation ``join`` operation it will not going to cancel ``future``.
join(FutureBase(future))

when defined(windows):
import ../osdefs

Expand Down
19 changes: 1 addition & 18 deletions chronos/streams/asyncstream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -836,24 +836,7 @@ proc join*(rw: AsyncStreamRW): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Get Future[void] which will be completed when stream become finished or
## closed.
when rw is AsyncStreamReader:
var retFuture = newFuture[void]("async.stream.reader.join")
else:
var retFuture = newFuture[void]("async.stream.writer.join")

proc continuation(udata: pointer) {.gcsafe, raises:[].} =
retFuture.complete()

proc cancellation(udata: pointer) {.gcsafe, raises:[].} =
rw.future.removeCallback(continuation, cast[pointer](retFuture))

if not(rw.future.finished()):
rw.future.addCallback(continuation, cast[pointer](retFuture))
retFuture.cancelCallback = cancellation
else:
retFuture.complete()

return retFuture
rw.future.join()

proc close*(rw: AsyncStreamRW) =
## Close and frees resources of stream ``rw``.
Expand Down
16 changes: 1 addition & 15 deletions chronos/transports/datagram.nim
Original file line number Diff line number Diff line change
Expand Up @@ -827,21 +827,7 @@ proc newDatagramTransport6*[T](cbproc: UnsafeDatagramCallback,
proc join*(transp: DatagramTransport): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Wait until the transport ``transp`` will be closed.
let retFuture = newFuture[void]("datagram.transport.join")

proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete()

proc cancel(udata: pointer) {.gcsafe.} =
transp.future.removeCallback(continuation, cast[pointer](retFuture))

if not(transp.future.finished()):
transp.future.addCallback(continuation, cast[pointer](retFuture))
retFuture.cancelCallback = cancel
else:
retFuture.complete()

return retFuture
transp.future.join()

proc closed*(transp: DatagramTransport): bool {.inline.} =
## Returns ``true`` if transport in closed state.
Expand Down
15 changes: 1 addition & 14 deletions chronos/transports/stream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1780,20 +1780,7 @@ proc stop*(server: StreamServer) {.raises: [TransportOsError].} =
proc join*(server: StreamServer): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Waits until ``server`` is not closed.
var retFuture = newFuture[void]("stream.transport.server.join")

proc continuation(udata: pointer) =
retFuture.complete()

proc cancel(udata: pointer) =
server.loopFuture.removeCallback(continuation, cast[pointer](retFuture))

if not(server.loopFuture.finished()):
server.loopFuture.addCallback(continuation, cast[pointer](retFuture))
retFuture.cancelCallback = cancel
else:
retFuture.complete()
return retFuture
server.loopFuture.join()

proc connect*(address: TransportAddress,
bufferSize = DefaultStreamBufferSize,
Expand Down
106 changes: 106 additions & 0 deletions tests/testfut.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2048,6 +2048,112 @@ suite "Future[T] behavior test suite":
future1.cancelled() == true
future2.cancelled() == true

asyncTest "join() test":
proc joinFoo0(future: FutureBase) {.async.} =
await join(future)

proc joinFoo1(future: Future[void]) {.async.} =
await join(future)

proc joinFoo2(future: Future[void]) {.
async: (raises: [CancelledError]).} =
await join(future)

let
future0 = newFuture[void]()
future1 = newFuture[void]()
future2 = Future[void].Raising([CancelledError]).init()

let
resfut0 = joinFoo0(future0)
resfut1 = joinFoo1(future1)
resfut2 = joinFoo2(future2)

check:
resfut0.finished() == false
resfut1.finished() == false
resfut2.finished() == false

future0.complete()
future1.complete()
future2.complete()

let res =
try:
await noCancel allFutures(resfut0, resfut1, resfut2).wait(1.seconds)
true
except AsyncTimeoutError:
false

check:
res == true
resfut0.finished() == true
resfut1.finished() == true
resfut2.finished() == true
future0.finished() == true
future1.finished() == true
future2.finished() == true

asyncTest "join() cancellation test":
proc joinFoo0(future: FutureBase) {.async.} =
await join(future)

proc joinFoo1(future: Future[void]) {.async.} =
await join(future)

proc joinFoo2(future: Future[void]) {.
async: (raises: [CancelledError]).} =
await join(future)

let
future0 = newFuture[void]()
future1 = newFuture[void]()
future2 = Future[void].Raising([CancelledError]).init()

let
resfut0 = joinFoo0(future0)
resfut1 = joinFoo1(future1)
resfut2 = joinFoo2(future2)

check:
resfut0.finished() == false
resfut1.finished() == false
resfut2.finished() == false

let
cancelfut0 = cancelAndWait(resfut0)
cancelfut1 = cancelAndWait(resfut1)
cancelfut2 = cancelAndWait(resfut2)

let res =
try:
await noCancel allFutures(cancelfut0, cancelfut1,
cancelfut2).wait(1.seconds)
true
except AsyncTimeoutError:
false

check:
res == true
cancelfut0.finished() == true
cancelfut1.finished() == true
cancelfut2.finished() == true
resfut0.cancelled() == true
resfut1.cancelled() == true
resfut2.cancelled() == true
future0.finished() == false
future1.finished() == false
future2.finished() == false

future0.complete()
future1.complete()
future2.complete()

check:
future0.finished() == true
future1.finished() == true
future2.finished() == true

test "Sink with literals":
# https://github.com/nim-lang/Nim/issues/22175
let fut = newFuture[string]()
Expand Down

0 comments on commit 2d85229

Please sign in to comment.