Skip to content

Commit

Permalink
Avoid ValueError effect in varargs race/one (#520)
Browse files Browse the repository at this point in the history
We can check at compile-time that at least one parameter is passed

* clean up closure environment explicitly in some callbacks to release
memory earlier
  • Loading branch information
arnetheduck authored Mar 6, 2024
1 parent f6c7ecf commit 03d8247
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 52 deletions.
140 changes: 94 additions & 46 deletions chronos/internal/asyncfutures.nim
Original file line number Diff line number Diff line change
Expand Up @@ -734,8 +734,8 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {.
retFuture.fail(fut2.error)
else:
retFuture.complete()
fut1.callback = cb
fut2.callback = cb
fut1.addCallback(cb)
fut2.addCallback(cb)

proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
Expand Down Expand Up @@ -1086,12 +1086,14 @@ proc allFutures*(futs: varargs[FutureBase]): Future[void] {.
inc(finishedFutures)
if finishedFutures == totalFutures:
retFuture.complete()
reset(nfuts)

proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
reset(nfuts)

for fut in nfuts:
if not(fut.finished()):
Expand Down Expand Up @@ -1148,13 +1150,14 @@ proc allFinished*[F: SomeFuture](futs: varargs[F]): Future[seq[F]] {.
if not(retFuture.finished()):
inc(finishedFutures)
if finishedFutures == totalFutures:
retFuture.complete(nfuts)
retFuture.complete(move(nfuts))

proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for fut in nfuts.mitems():
if not(fut.finished()):
fut.removeCallback(cb)
reset(nfuts)

for fut in nfuts:
if not(fut.finished()):
Expand All @@ -1168,124 +1171,169 @@ proc allFinished*[F: SomeFuture](futs: varargs[F]): Future[seq[F]] {.

return retFuture

proc one*[F: SomeFuture](futs: varargs[F]): Future[F] {.
async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete and return completed Future[T] inside,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
## If the argument is empty, the returned future FAILS immediately.
##
## On success returned Future will hold finished Future[T].
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
var retFuture = newFuture[F]("chronos.one()")

if len(futs) == 0:
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
return retFuture

template oneImpl =
# If one of the Future[T] already finished we return it as result
for fut in futs:
if fut.finished():
retFuture.complete(fut)
return retFuture

# Because we can't capture varargs[T] in closures we need to create copy.
var nfuts = @futs
var nfuts =
when declared(fut0):
@[fut0] & @futs
else:
@futs

var cb: proc(udata: pointer) {.gcsafe, raises: [].}
cb = proc(udata: pointer) {.gcsafe, raises: [].} =
if not(retFuture.finished()):
var res: F
var rfut = cast[FutureBase](udata)
for i in 0..<len(nfuts):
if cast[FutureBase](nfuts[i]) != rfut:
if cast[pointer](nfuts[i]) != udata:
nfuts[i].removeCallback(cb)
else:
res = nfuts[i]
res = move(nfuts[i])
retFuture.complete(res)
reset(nfuts)
reset(cb)

proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
reset(nfuts)
reset(cb)

for fut in nfuts:
when declared(fut0):
fut0.addCallback(cb)
for fut in futs:
fut.addCallback(cb)

retFuture.cancelCallback = cancellation
return retFuture

proc race*(futs: varargs[FutureBase]): Future[FutureBase] {.
proc one*[F: SomeFuture](fut0: F, futs: varargs[F]): Future[F] {.
async: (raw: true, raises: [CancelledError]).} =
## Returns a future which will complete and return completed Future[T] inside,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
## On success returned Future will hold finished Future[T].
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
let retFuture = newFuture[F]("chronos.one()")
if fut0.finished():
retFuture.complete(fut0)
return retFuture

oneImpl

proc one*[F: SomeFuture](futs: openArray[F]): Future[F] {.
async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete and return completed FutureBase,
## Returns a future which will complete and return completed Future[T] inside,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
## If the argument is empty, the returned future FAILS immediately.
##
## On success returned Future will hold finished FutureBase.
## On success returned Future will hold finished Future[T].
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
let retFuture = newFuture[FutureBase]("chronos.race()")
let retFuture = newFuture[F]("chronos.one()")

if len(futs) == 0:
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
return retFuture

oneImpl

template raceImpl =
# If one of the Future[T] already finished we return it as result
for fut in futs:
if fut.finished():
retFuture.complete(fut)
return retFuture

# Because we can't capture varargs[T] in closures we need to create copy.
var nfuts = @futs
# Because we can't capture openArray/varargs in closures we need to create copy.
var nfuts =
when declared(fut0):
@[fut0] & @futs
else:
@futs

var cb: proc(udata: pointer) {.gcsafe, raises: [].}
cb = proc(udata: pointer) {.gcsafe, raises: [].} =
if not(retFuture.finished()):
var res: FutureBase
var rfut = cast[FutureBase](udata)
for i in 0..<len(nfuts):
if nfuts[i] != rfut:
if cast[pointer](nfuts[i]) != udata:
nfuts[i].removeCallback(cb)
else:
res = nfuts[i]
res = move(nfuts[i])
retFuture.complete(res)
reset(nfuts)
reset(cb)

proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
reset(nfuts)
reset(cb)

for fut in nfuts:
when declared(fut0):
fut0.addCallback(cb, cast[pointer](fut0))
for fut in futs:
fut.addCallback(cb, cast[pointer](fut))

retFuture.cancelCallback = cancellation

return retFuture

proc race*[T](futs: varargs[Future[T]]): Future[FutureBase] {.
proc race*(fut0: FutureBase, futs: varargs[FutureBase]): Future[FutureBase] {.
async: (raw: true, raises: [CancelledError]).} =
## Returns a future which will complete and return finished FutureBase,
## when one of the given futures will be completed, failed or canceled.
##
## On success returned Future will hold finished FutureBase.
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
let retFuture = newFuture[FutureBase]("chronos.race()")
if fut0.finished:
retFuture.complete(fut0)
return retFuture

raceImpl

proc race*(futs: openArray[FutureBase]): Future[FutureBase] {.
async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete only when all futures in ``futs``
## will be completed, failed or canceled.
## Returns a future which will complete and return finished FutureBase,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
## If the argument is empty, the returned future COMPLETES immediately.
## If the argument is empty, the returned future FAILS immediately.
##
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled.
# Because we can't capture varargs[T] in closures we need to create copy.
race(futs.mapIt(FutureBase(it)))
## On success returned Future will hold finished FutureBase.
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
let retFuture = newFuture[FutureBase]("chronos.race()")

if len(futs) == 0:
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
return retFuture

raceImpl

proc race*[T, E](futs: varargs[InternalRaisesFuture[T, E]]): Future[FutureBase] {.
proc race*(futs: openArray[SomeFuture]): Future[FutureBase] {.
async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete only when all futures in ``futs``
## will be completed, failed or canceled.
## Returns a future which will complete and return completed FutureBase,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
## If the argument is empty, the returned future COMPLETES immediately.
## If the argument is empty, the returned future FAILS immediately.
##
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled.
## On success returned Future will hold finished FutureBase.
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
# Because we can't capture varargs[T] in closures we need to create copy.
race(futs.mapIt(FutureBase(it)))

Expand Down
31 changes: 25 additions & 6 deletions tests/testfut.nim
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,14 @@ suite "Future[T] behavior test suite":
not(fut2.failed())
fut2.read() == f21

asyncTest "one() exception effect":
proc checkraises() {.async: (raises: [CancelledError]).} =
let f = Future[void].Raising([CancelledError]).init()
f.complete()
one(f).cancelSoon()

await checkraises()

asyncTest "or() test":
proc client1() {.async.} =
await sleepAsync(200.milliseconds)
Expand Down Expand Up @@ -1220,7 +1228,10 @@ suite "Future[T] behavior test suite":

test "location test":
# WARNING: This test is very sensitive to line numbers and module name.
template start(): int =
instantiationInfo().line

const first = start()
proc macroFuture() {.async.} =
let someVar {.used.} = 5 # LINE POSITION 1
let someOtherVar {.used.} = 4
Expand Down Expand Up @@ -1258,12 +1269,12 @@ suite "Future[T] behavior test suite":
(loc.procedure == procedure)

check:
chk(loc10, "testfut.nim", 1225, "macroFuture")
chk(loc11, "testfut.nim", 1228, "")
chk(loc20, "testfut.nim", 1237, "template")
chk(loc21, "testfut.nim", 1240, "")
chk(loc30, "testfut.nim", 1234, "procedure")
chk(loc31, "testfut.nim", 1241, "")
chk(loc10, "testfut.nim", first + 2, "macroFuture")
chk(loc11, "testfut.nim", first + 5, "")
chk(loc20, "testfut.nim", first + 14, "template")
chk(loc21, "testfut.nim", first + 17, "")
chk(loc30, "testfut.nim", first + 11, "procedure")
chk(loc31, "testfut.nim", first + 18, "")

asyncTest "withTimeout(fut) should wait cancellation test":
proc futureNeverEnds(): Future[void] =
Expand Down Expand Up @@ -1507,6 +1518,14 @@ suite "Future[T] behavior test suite":
f2.finished()
f3.finished()

asyncTest "race() exception effect":
proc checkraises() {.async: (raises: [CancelledError]).} =
let f = Future[void].Raising([CancelledError]).init()
f.complete()
race(f).cancelSoon()

await checkraises()

test "Unsigned integer overflow test":
check:
0xFFFF_FFFF_FFFF_FFFF'u64 + 1'u64 == 0'u64
Expand Down

0 comments on commit 03d8247

Please sign in to comment.