Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid ValueError effect in varargs race/one #520

Merged
merged 1 commit into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 94 additions & 46 deletions chronos/internal/asyncfutures.nim
Original file line number Diff line number Diff line change
Expand Up @@ -734,8 +734,8 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {.
retFuture.fail(fut2.error)
else:
retFuture.complete()
fut1.callback = cb
fut2.callback = cb
fut1.addCallback(cb)
fut2.addCallback(cb)

proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
Expand Down Expand Up @@ -1086,12 +1086,14 @@ proc allFutures*(futs: varargs[FutureBase]): Future[void] {.
inc(finishedFutures)
if finishedFutures == totalFutures:
retFuture.complete()
reset(nfuts)

proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
reset(nfuts)

for fut in nfuts:
if not(fut.finished()):
Expand Down Expand Up @@ -1148,13 +1150,14 @@ proc allFinished*[F: SomeFuture](futs: varargs[F]): Future[seq[F]] {.
if not(retFuture.finished()):
inc(finishedFutures)
if finishedFutures == totalFutures:
retFuture.complete(nfuts)
retFuture.complete(move(nfuts))

proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for fut in nfuts.mitems():
if not(fut.finished()):
fut.removeCallback(cb)
reset(nfuts)

for fut in nfuts:
if not(fut.finished()):
Expand All @@ -1168,124 +1171,169 @@ proc allFinished*[F: SomeFuture](futs: varargs[F]): Future[seq[F]] {.

return retFuture

proc one*[F: SomeFuture](futs: varargs[F]): Future[F] {.
async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete and return completed Future[T] inside,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
## If the argument is empty, the returned future FAILS immediately.
##
## On success returned Future will hold finished Future[T].
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
var retFuture = newFuture[F]("chronos.one()")

if len(futs) == 0:
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
return retFuture

template oneImpl =
# If one of the Future[T] already finished we return it as result
for fut in futs:
if fut.finished():
retFuture.complete(fut)
return retFuture

# Because we can't capture varargs[T] in closures we need to create copy.
var nfuts = @futs
var nfuts =
when declared(fut0):
@[fut0] & @futs
else:
@futs

var cb: proc(udata: pointer) {.gcsafe, raises: [].}
cb = proc(udata: pointer) {.gcsafe, raises: [].} =
if not(retFuture.finished()):
var res: F
var rfut = cast[FutureBase](udata)
for i in 0..<len(nfuts):
if cast[FutureBase](nfuts[i]) != rfut:
if cast[pointer](nfuts[i]) != udata:
nfuts[i].removeCallback(cb)
else:
res = nfuts[i]
res = move(nfuts[i])
retFuture.complete(res)
reset(nfuts)
reset(cb)

proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
reset(nfuts)
reset(cb)

for fut in nfuts:
when declared(fut0):
fut0.addCallback(cb)
for fut in futs:
fut.addCallback(cb)

retFuture.cancelCallback = cancellation
return retFuture

proc race*(futs: varargs[FutureBase]): Future[FutureBase] {.
proc one*[F: SomeFuture](fut0: F, futs: varargs[F]): Future[F] {.
async: (raw: true, raises: [CancelledError]).} =
## Returns a future which will complete and return completed Future[T] inside,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
## On success returned Future will hold finished Future[T].
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
let retFuture = newFuture[F]("chronos.one()")
if fut0.finished():
retFuture.complete(fut0)
return retFuture

oneImpl

proc one*[F: SomeFuture](futs: openArray[F]): Future[F] {.
async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete and return completed FutureBase,
## Returns a future which will complete and return completed Future[T] inside,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
## If the argument is empty, the returned future FAILS immediately.
##
## On success returned Future will hold finished FutureBase.
## On success returned Future will hold finished Future[T].
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
let retFuture = newFuture[FutureBase]("chronos.race()")
let retFuture = newFuture[F]("chronos.one()")

if len(futs) == 0:
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
return retFuture

oneImpl

template raceImpl =
# If one of the Future[T] already finished we return it as result
for fut in futs:
if fut.finished():
retFuture.complete(fut)
return retFuture

# Because we can't capture varargs[T] in closures we need to create copy.
var nfuts = @futs
# Because we can't capture openArray/varargs in closures we need to create copy.
var nfuts =
when declared(fut0):
@[fut0] & @futs
else:
@futs

var cb: proc(udata: pointer) {.gcsafe, raises: [].}
cb = proc(udata: pointer) {.gcsafe, raises: [].} =
if not(retFuture.finished()):
var res: FutureBase
var rfut = cast[FutureBase](udata)
for i in 0..<len(nfuts):
if nfuts[i] != rfut:
if cast[pointer](nfuts[i]) != udata:
nfuts[i].removeCallback(cb)
else:
res = nfuts[i]
res = move(nfuts[i])
retFuture.complete(res)
reset(nfuts)
reset(cb)

proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
reset(nfuts)
reset(cb)

for fut in nfuts:
when declared(fut0):
fut0.addCallback(cb, cast[pointer](fut0))
for fut in futs:
fut.addCallback(cb, cast[pointer](fut))

retFuture.cancelCallback = cancellation

return retFuture

proc race*[T](futs: varargs[Future[T]]): Future[FutureBase] {.
proc race*(fut0: FutureBase, futs: varargs[FutureBase]): Future[FutureBase] {.
async: (raw: true, raises: [CancelledError]).} =
## Returns a future which will complete and return finished FutureBase,
## when one of the given futures will be completed, failed or canceled.
##
## On success returned Future will hold finished FutureBase.
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
let retFuture = newFuture[FutureBase]("chronos.race()")
if fut0.finished:
retFuture.complete(fut0)
return retFuture

raceImpl

proc race*(futs: openArray[FutureBase]): Future[FutureBase] {.
async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete only when all futures in ``futs``
## will be completed, failed or canceled.
## Returns a future which will complete and return finished FutureBase,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
## If the argument is empty, the returned future COMPLETES immediately.
## If the argument is empty, the returned future FAILS immediately.
##
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled.
# Because we can't capture varargs[T] in closures we need to create copy.
race(futs.mapIt(FutureBase(it)))
## On success returned Future will hold finished FutureBase.
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
let retFuture = newFuture[FutureBase]("chronos.race()")

if len(futs) == 0:
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
return retFuture

raceImpl

proc race*[T, E](futs: varargs[InternalRaisesFuture[T, E]]): Future[FutureBase] {.
proc race*(futs: openArray[SomeFuture]): Future[FutureBase] {.
async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete only when all futures in ``futs``
## will be completed, failed or canceled.
## Returns a future which will complete and return completed FutureBase,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
## If the argument is empty, the returned future COMPLETES immediately.
## If the argument is empty, the returned future FAILS immediately.
##
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled.
## On success returned Future will hold finished FutureBase.
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
# Because we can't capture varargs[T] in closures we need to create copy.
race(futs.mapIt(FutureBase(it)))

Expand Down
31 changes: 25 additions & 6 deletions tests/testfut.nim
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,14 @@ suite "Future[T] behavior test suite":
not(fut2.failed())
fut2.read() == f21

asyncTest "one() exception effect":
proc checkraises() {.async: (raises: [CancelledError]).} =
let f = Future[void].Raising([CancelledError]).init()
f.complete()
one(f).cancelSoon()

await checkraises()

asyncTest "or() test":
proc client1() {.async.} =
await sleepAsync(200.milliseconds)
Expand Down Expand Up @@ -1220,7 +1228,10 @@ suite "Future[T] behavior test suite":

test "location test":
# WARNING: This test is very sensitive to line numbers and module name.
template start(): int =
instantiationInfo().line

const first = start()
proc macroFuture() {.async.} =
let someVar {.used.} = 5 # LINE POSITION 1
let someOtherVar {.used.} = 4
Expand Down Expand Up @@ -1258,12 +1269,12 @@ suite "Future[T] behavior test suite":
(loc.procedure == procedure)

check:
chk(loc10, "testfut.nim", 1225, "macroFuture")
chk(loc11, "testfut.nim", 1228, "")
chk(loc20, "testfut.nim", 1237, "template")
chk(loc21, "testfut.nim", 1240, "")
chk(loc30, "testfut.nim", 1234, "procedure")
chk(loc31, "testfut.nim", 1241, "")
chk(loc10, "testfut.nim", first + 2, "macroFuture")
chk(loc11, "testfut.nim", first + 5, "")
chk(loc20, "testfut.nim", first + 14, "template")
chk(loc21, "testfut.nim", first + 17, "")
chk(loc30, "testfut.nim", first + 11, "procedure")
chk(loc31, "testfut.nim", first + 18, "")

asyncTest "withTimeout(fut) should wait cancellation test":
proc futureNeverEnds(): Future[void] =
Expand Down Expand Up @@ -1507,6 +1518,14 @@ suite "Future[T] behavior test suite":
f2.finished()
f3.finished()

asyncTest "race() exception effect":
proc checkraises() {.async: (raises: [CancelledError]).} =
let f = Future[void].Raising([CancelledError]).init()
f.complete()
race(f).cancelSoon()

await checkraises()

test "Unsigned integer overflow test":
check:
0xFFFF_FFFF_FFFF_FFFF'u64 + 1'u64 == 0'u64
Expand Down
Loading