Skip to content

Commit aabf4b4

Browse files
committed
fix(slotqueue): asyncSpawns futures correctly
- asyncSpawns `run` and worker `dispatch` in slotqueue. - removes usage of `then` from slotqueue.
1 parent 7c804b0 commit aabf4b4

File tree

3 files changed

+33
-36
lines changed

3 files changed

+33
-36
lines changed

Diff for: codex/sales.nim

+1-1
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ proc startSlotQueue(sales: Sales) {.async.} =
491491
trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex
492492
sales.processSlot(item, done)
493493

494-
asyncSpawn slotQueue.start()
494+
slotQueue.start()
495495

496496
proc onAvailabilityAdded(availability: Availability) {.async.} =
497497
await sales.onAvailabilityAdded(availability)

Diff for: codex/sales/slotqueue.nim

+25-26
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import ../rng
1010
import ../utils
1111
import ../contracts/requests
1212
import ../utils/asyncheapqueue
13-
import ../utils/then
1413
import ../utils/trackedfutures
1514

1615
logScope:
@@ -333,7 +332,7 @@ proc addWorker(self: SlotQueue): ?!void =
333332

334333
proc dispatch(self: SlotQueue,
335334
worker: SlotQueueWorker,
336-
item: SlotQueueItem) {.async.} =
335+
item: SlotQueueItem) {.async: (raises: []).} =
337336
logScope:
338337
requestId = item.requestId
339338
slotIndex = item.slotIndex
@@ -380,22 +379,7 @@ proc clearSeenFlags*(self: SlotQueue) =
380379

381380
trace "all 'seen' flags cleared"
382381

383-
proc start*(self: SlotQueue) {.async.} =
384-
if self.running:
385-
return
386-
387-
trace "starting slot queue"
388-
389-
self.running = true
390-
391-
# must be called in `start` to avoid sideeffects in `new`
392-
self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers)
393-
394-
# Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its
395-
# task, a new worker will be pushed to the queue
396-
for i in 0..<self.maxWorkers:
397-
if err =? self.addWorker().errorOption:
398-
error "start: error adding new worker", error = err.msg
382+
proc run(self: SlotQueue) {.async: (raises: []).} =
399383

400384
while self.running:
401385
try:
@@ -405,8 +389,8 @@ proc start*(self: SlotQueue) {.async.} =
405389
# block until unpaused is true/fired, ie wait for queue to be unpaused
406390
await self.unpaused.wait()
407391

408-
let worker = await self.workers.popFirst().track(self) # if workers saturated, wait here for new workers
409-
let item = await self.queue.pop().track(self) # if queue empty, wait here for new items
392+
let worker = await self.workers.popFirst() # if workers saturated, wait here for new workers
393+
let item = await self.queue.pop() # if queue empty, wait here for new items
410394

411395
logScope:
412396
reqId = item.requestId
@@ -434,19 +418,34 @@ proc start*(self: SlotQueue) {.async.} =
434418

435419
trace "processing item"
436420

437-
self.dispatch(worker, item)
438-
.track(self)
439-
.catch(proc (e: ref CatchableError) =
440-
error "Unknown error dispatching worker", error = e.msg
441-
)
421+
asyncSpawn self.dispatch(worker, item).track(self)
442422

443423
await sleepAsync(1.millis) # poll
444424
except CancelledError:
445425
trace "slot queue cancelled"
446-
return
426+
break
447427
except CatchableError as e: # raised from self.queue.pop() or self.workers.pop()
448428
warn "slot queue error encountered during processing", error = e.msg
449429

430+
proc start*(self: SlotQueue) =
431+
if self.running:
432+
return
433+
434+
trace "starting slot queue"
435+
436+
self.running = true
437+
438+
# must be called in `start` to avoid sideeffects in `new`
439+
self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers)
440+
441+
# Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its
442+
# task, a new worker will be pushed to the queue
443+
for i in 0..<self.maxWorkers:
444+
if err =? self.addWorker().errorOption:
445+
error "start: error adding new worker", error = err.msg
446+
447+
asyncSpawn self.run().track(self)
448+
450449
proc stop*(self: SlotQueue) {.async.} =
451450
if not self.running:
452451
return

Diff for: tests/codex/sales/testslotqueue.nim

+7-9
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,21 @@ suite "Slot queue start/stop":
2727
check not queue.running
2828

2929
test "can call start multiple times, and when already running":
30-
asyncSpawn queue.start()
31-
asyncSpawn queue.start()
30+
queue.start()
31+
queue.start()
3232
check queue.running
3333

3434
test "can call stop when alrady stopped":
3535
await queue.stop()
3636
check not queue.running
3737

3838
test "can call stop when running":
39-
asyncSpawn queue.start()
39+
queue.start()
4040
await queue.stop()
4141
check not queue.running
4242

4343
test "can call stop multiple times":
44-
asyncSpawn queue.start()
44+
queue.start()
4545
await queue.stop()
4646
await queue.stop()
4747
check not queue.running
@@ -62,8 +62,6 @@ suite "Slot queue workers":
6262
queue = SlotQueue.new(maxSize = 5, maxWorkers = 3)
6363
queue.onProcessSlot = onProcessSlot
6464

65-
proc startQueue = asyncSpawn queue.start()
66-
6765
teardown:
6866
await queue.stop()
6967

@@ -79,7 +77,7 @@ suite "Slot queue workers":
7977
discard SlotQueue.new(maxSize = 1, maxWorkers = 2)
8078

8179
test "does not surpass max workers":
82-
startQueue()
80+
queue.start()
8381
let item1 = SlotQueueItem.example
8482
let item2 = SlotQueueItem.example
8583
let item3 = SlotQueueItem.example
@@ -97,7 +95,7 @@ suite "Slot queue workers":
9795

9896
queue.onProcessSlot = processSlot
9997

100-
startQueue()
98+
queue.start()
10199
let item1 = SlotQueueItem.example
102100
let item2 = SlotQueueItem.example
103101
let item3 = SlotQueueItem.example
@@ -122,7 +120,7 @@ suite "Slot queue":
122120
onProcessSlotCalled = true
123121
onProcessSlotCalledWith.add (item.requestId, item.slotIndex)
124122
done.complete()
125-
asyncSpawn queue.start()
123+
queue.start()
126124

127125
setup:
128126
onProcessSlotCalled = false

0 commit comments

Comments
 (0)