Skip to content

Commit

Permalink
channels: fixes 'sink' annotations; fixes #19105
Browse files Browse the repository at this point in the history
  • Loading branch information
Araq committed Nov 11, 2021
1 parent a2fcc44 commit 8bd3e3b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 16 deletions.
3 changes: 1 addition & 2 deletions tests/tchannels_simple.nim
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ while true:
if tried:
messages.add move(msg)
break

messages.add "Pretend I'm doing useful work..."
# For this example, sleep in order not to flood stdout with the above
# message.
Expand All @@ -55,7 +55,6 @@ while true:
worker2.joinThread()

# Clean up the channel.
chan.close()
doAssert messages[^1] == "Another message"
doAssert messages.len >= 2

Expand Down
31 changes: 17 additions & 14 deletions threading/channels.nim
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ template isEmptyUnbuf(chan: ChannelRaw): bool =
# ChannelRaw kinds
# ----------------------------------------------------------------------------------

func isUnbuffered(chan: ChannelRaw): bool =
proc isUnbuffered(chan: ChannelRaw): bool =
chan.size - 1 == 0

# ChannelRaw status and properties
Expand Down Expand Up @@ -191,7 +191,7 @@ proc freeChannel(chan: ChannelRaw) =
# MPMC Channels (Multi-Producer Multi-Consumer)
# ----------------------------------------------------------------------------------

proc sendUnbufferedMpmc(chan: ChannelRaw, data: sink pointer, size: int, nonBlocking: bool): bool =
proc sendUnbufferedMpmc(chan: ChannelRaw, data: pointer, size: int, nonBlocking: bool): bool =
if nonBlocking and chan.isFullUnbuf:
return false

Expand All @@ -215,7 +215,7 @@ proc sendUnbufferedMpmc(chan: ChannelRaw, data: sink pointer, size: int, nonBloc
signal(chan.notEmptyCond)
result = true

proc sendMpmc(chan: ChannelRaw, data: sink pointer, size: int, nonBlocking: bool): bool =
proc sendMpmc(chan: ChannelRaw, data: pointer, size: int, nonBlocking: bool): bool =
assert not chan.isNil
assert not data.isNil

Expand Down Expand Up @@ -337,7 +337,7 @@ proc `=`*[T](dest: var Channel[T], src: Channel[T]) =
`=destroy`(dest)
dest.d = src.d

proc channelSend[T](chan: Channel[T], data: sink T, size: int, nonBlocking: bool): bool {.inline.} =
proc channelSend[T](chan: Channel[T], data: T, size: int, nonBlocking: bool): bool {.inline.} =
## Send item to the channel (FIFO queue)
## (Insert at last)
sendMpmc(chan.d, data.unsafeAddr, size, nonBlocking)
Expand All @@ -347,7 +347,7 @@ proc channelReceive[T](chan: Channel[T], data: ptr T, size: int, nonBlocking: bo
## (Remove the first item)
recvMpmc(chan.d, data, size, nonBlocking)

func trySend*[T](c: Channel[T], src: var Isolated[T]): bool {.inline.} =
proc trySend*[T](c: Channel[T], src: var Isolated[T]): bool {.inline.} =
## Sends item to the channel(non blocking).
var data = src.extract
result = channelSend(c, data, sizeof(data), true)
Expand All @@ -358,36 +358,39 @@ template trySend*[T](c: Channel[T], src: T): bool =
## Helper templates for `trySend`.
trySend(c, isolate(src))

func tryRecv*[T](c: Channel[T], dst: var T): bool {.inline.} =
proc tryRecv*[T](c: Channel[T], dst: var T): bool {.inline.} =
## Receives item from the channel(non blocking).
channelReceive(c, dst.addr, sizeof(dst), true)

func send*[T](c: Channel[T], src: sink Isolated[T]) {.inline.} =
proc send*[T](c: Channel[T], src: sink Isolated[T]) {.inline.} =
## Sends item to the channel(blocking).
var data = src.extract
when defined(gcOrc) and defined(nimSafeOrcSend):
GC_runOrc()
discard channelSend(c, data, sizeof(data), false)
wasMoved(data)

template send*[T](c: var Channel[T]; src: T) =
## Helper templates for `send`.
send(c, isolate(src))

func recv*[T](c: Channel[T], dst: var T) {.inline.} =
proc recv*[T](c: Channel[T], dst: var T) {.inline.} =
## Receives item from the channel(blocking).
discard channelReceive(c, dst.addr, sizeof(dst), false)

func recvIso*[T](c: Channel[T]): Isolated[T] {.inline.} =
proc recvIso*[T](c: Channel[T]): Isolated[T] {.inline.} =
var dst: T
discard channelReceive(c, dst.addr, sizeof(dst), false)
result = isolate(dst)

func open*[T](c: Channel[T]) {.inline.} =
store(c.d.closed, false, moRelaxed)
when false:
proc open*[T](c: Channel[T]) {.inline.} =
store(c.d.closed, false, moRelaxed)

func close*[T](c: Channel[T]) {.inline.} =
store(c.d.closed, true, moRelaxed)
proc close*[T](c: Channel[T]) {.inline.} =
store(c.d.closed, true, moRelaxed)

func peek*[T](c: Channel[T]): int {.inline.} = peek(c.d)
proc peek*[T](c: Channel[T]): int {.inline.} = peek(c.d)

proc newChannel*[T](elements = 30): Channel[T] =
assert elements >= 1, "Elements must be positive!"
Expand Down

0 comments on commit 8bd3e3b

Please sign in to comment.