Skip to content

Commit

Permalink
async: minor refactorings (#15354)
Browse files Browse the repository at this point in the history
  • Loading branch information
Araq authored Sep 18, 2020
1 parent e56d50d commit 2671efa
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 66 deletions.
29 changes: 14 additions & 15 deletions lib/pure/asyncdispatch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ when defined(windows) or defined(nimdoc):

var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher

proc setGlobalDispatcher*(disp: owned PDispatcher) =
proc setGlobalDispatcher*(disp: sink PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
Expand Down Expand Up @@ -1217,10 +1217,12 @@ else:
withData(selector, fd.int, fdData):
case event
of Event.Read:
shallowCopy(curList, fdData.readList)
#shallowCopy(curList, fdData.readList)
curList = move fdData.readList
fdData.readList = newSeqOfCap[Callback](InitCallbackListSize)
of Event.Write:
shallowCopy(curList, fdData.writeList)
#shallowCopy(curList, fdData.writeList)
curList = move fdData.writeList
fdData.writeList = newSeqOfCap[Callback](InitCallbackListSize)
else:
assert false, "Cannot process callbacks for " & $event
Expand All @@ -1232,8 +1234,7 @@ else:
for cb in curList:
if eventsExtinguished:
newList.add(cb)
continue
if not cb(fd):
elif not cb(fd):
# Callback wants to be called again.
newList.add(cb)
# This callback has returned with EAGAIN, so we don't need to
Expand All @@ -1259,15 +1260,15 @@ else:
result.readCbListCount = -1
result.writeCbListCount = -1

template processCustomCallbacks(ident: untyped) =
proc processCustomCallbacks(p: PDispatcher; fd: AsyncFD) =
# Process pending custom event callbacks. Custom events are
# {Event.Timer, Event.Signal, Event.Process, Event.Vnode}.
# There can be only one callback registered with one descriptor,
# so there is no need to iterate over list.
var curList: seq[Callback]

withData(p.selector, ident.int, adata) do:
shallowCopy(curList, adata.readList)
withData(p.selector, fd.int, adata) do:
curList = move adata.readList
adata.readList = newSeqOfCap[Callback](InitCallbackListSize)

let newLength = len(curList)
Expand All @@ -1277,7 +1278,7 @@ else:
if not cb(fd.AsyncFD):
newList.add(cb)

withData(p.selector, ident.int, adata) do:
withData(p.selector, fd.int, adata) do:
# descriptor still present in queue.
adata.readList = newList & adata.readList
if len(adata.readList) == 0:
Expand Down Expand Up @@ -1308,10 +1309,6 @@ else:

proc runOnce(timeout = 500): bool =
let p = getGlobalDispatcher()
when ioselSupportedPlatform:
let customSet = {Event.Timer, Event.Signal, Event.Process,
Event.Vnode}

if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
raise newException(ValueError,
"No handles or timers registered in dispatcher.")
Expand Down Expand Up @@ -1346,9 +1343,11 @@ else:
result = true

when ioselSupportedPlatform:
const customSet = {Event.Timer, Event.Signal, Event.Process,
Event.Vnode}
if (customSet * events) != {}:
isCustomEvent = true
processCustomCallbacks(fd)
processCustomCallbacks(p, fd)
result = true

# because state `data` can be modified in callback we need to update
Expand Down Expand Up @@ -1612,7 +1611,7 @@ proc drain*(timeout = 500) =
var curTimeout = timeout
let start = now()
while hasPendingOperations():
discard runOnce(curTimeout)
discard runOnce(curTimeout)
curTimeout -= (now() - start).inMilliseconds.int
if curTimeout < 0:
break
Expand Down
34 changes: 8 additions & 26 deletions lib/pure/asyncfutures.nim
Original file line number Diff line number Diff line change
Expand Up @@ -157,33 +157,15 @@ proc checkFinished[T](future: Future[T]) =
raise err

proc call(callbacks: var CallbackList) =
when not defined(nimV2):
# strictly speaking a little code duplication here, but we strive
# to minimize regressions and I'm not sure I got the 'nimV2' logic
# right:
var current = callbacks
while true:
if not current.function.isNil:
callSoon(current.function)

if current.next.isNil:
break
else:
current = current.next[]
else:
var currentFunc = unown callbacks.function
var currentNext = unown callbacks.next

while true:
if not currentFunc.isNil:
callSoon(currentFunc)

if currentNext.isNil:
break
else:
currentFunc = currentNext.function
currentNext = unown currentNext.next
var current = callbacks
while true:
if not current.function.isNil:
callSoon(current.function)

if current.next.isNil:
break
else:
current = current.next[]
# callback will be called only once, let GC collect them now
callbacks.next = nil
callbacks.function = nil
Expand Down
44 changes: 22 additions & 22 deletions lib/pure/asyncstreams.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ proc `callback=`*[T](future: FutureStream[T],
##
## If the future stream already has data or is finished then ``cb`` will be
## called immediately.
future.cb = proc () = cb(future)
proc named() = cb(future)
future.cb = named
if future.queue.len > 0 or future.finished:
callSoon(future.cb)

Expand Down Expand Up @@ -90,27 +91,26 @@ proc read*[T](future: FutureStream[T]): owned(Future[(bool, T)]) =
## ``FutureStream``.
var resFut = newFuture[(bool, T)]("FutureStream.take")
let savedCb = future.cb
var newCb =
proc (fs: FutureStream[T]) =
# Exit early if `resFut` is already complete. (See #8994).
if resFut.finished: return

# We don't want this callback called again.
#future.cb = nil

# The return value depends on whether the FutureStream has finished.
var res: (bool, T)
if finished(fs):
# Remember, this callback is called when the FutureStream is completed.
res[0] = false
else:
res[0] = true
res[1] = fs.queue.popFirst()

resFut.complete(res)

# If the saved callback isn't nil then let's call it.
if not savedCb.isNil: savedCb()
proc newCb(fs: FutureStream[T]) =
# Exit early if `resFut` is already complete. (See #8994).
if resFut.finished: return

# We don't want this callback called again.
#future.cb = nil

# The return value depends on whether the FutureStream has finished.
var res: (bool, T)
if finished(fs):
# Remember, this callback is called when the FutureStream is completed.
res[0] = false
else:
res[0] = true
res[1] = fs.queue.popFirst()

resFut.complete(res)

# If the saved callback isn't nil then let's call it.
if not savedCb.isNil: savedCb()

if future.queue.len > 0 or future.finished:
newCb(future)
Expand Down
4 changes: 2 additions & 2 deletions lib/pure/ioselects/ioselectors_epoll.nim
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
let fdi = int(fd)
s.checkFd(fdi)
if fdi in s:
var value = addr(s.getData(fdi))
var value = addr(s.fds[fdi].data)
body

template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
Expand All @@ -523,7 +523,7 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
let fdi = int(fd)
s.checkFd(fdi)
if fdi in s:
var value = addr(s.getData(fdi))
var value = addr(s.fds[fdi].data)
body1
else:
body2
Expand Down
2 changes: 1 addition & 1 deletion lib/pure/selectors.nim
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ else:
IOSelectorsException* = object of CatchableError

ReadyKey* = object
fd* : int
fd*: int
events*: set[Event]
errorCode*: OSErrorCode

Expand Down

0 comments on commit 2671efa

Please sign in to comment.