Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ jobs:

- name: Set up a PureScript toolchain
uses: purescript-contrib/setup-purescript@main
with:
purs-tidy: "latest"

- name: Cache PureScript dependencies
uses: actions/cache@v2
Expand All @@ -32,3 +34,6 @@ jobs:

- name: Run tests
run: spago test --no-install

- name: Check formatting
run: purs-tidy check src test
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
!.gitignore
!.github
!.editorconfig
!.tidyrc.json

output
generated-docs
Expand Down
10 changes: 10 additions & 0 deletions .tidyrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"importSort": "source",
"importWrap": "source",
"indent": 2,
"operatorsFile": null,
"ribbon": 1,
"typeArrowPlacement": "first",
"unicode": "never",
"width": null
}
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ New features:
Bugfixes:

Other improvements:
- Added `purs-tidy` formatter (#13 by @thomashoneyman)

## [v2.0.0](https://github.com/purescript-contrib/purescript-concurrent-queues/releases/tag/v2.0.0) - 2021-02-26

Expand Down
44 changes: 22 additions & 22 deletions src/Concurrent/BoundedQueue.purs
Original file line number Diff line number Diff line change
Expand Up @@ -24,52 +24,52 @@ import Effect.Aff.AVar as AVar
import Partial.Unsafe (unsafePartial)

-- | Creates a new `BoundedQueue` with the given capacity,
new ∷ ∀ a. Int Aff (BoundedQueue a)
new :: forall a. Int -> Aff (BoundedQueue a)
new size = do
contents replicateA size AVar.empty
readPos AVar.new 0
writePos AVar.new 0
contents <- replicateA size AVar.empty
readPos <- AVar.new 0
writePos <- AVar.new 0
pure (BoundedQueue { size, contents, readPos, writePos })

-- | Writes an element to the given queue. Will block if the queue is full until
-- | someone reads from it.
write ∷ ∀ a. BoundedQueue a a Aff Unit
write :: forall a. BoundedQueue a -> a -> Aff Unit
write (BoundedQueue q) a = do
w AVar.take q.writePos
w <- AVar.take q.writePos
AVar.put a (unsafePartial unsafeIndex q.contents w)
AVar.put ((w + 1) `mod` q.size) q.writePos

-- | Reads an element from the given queue, will block if the queue is empty,
-- | until someone writes to it.
read ∷ ∀ a. BoundedQueue a Aff a
read :: forall a. BoundedQueue a -> Aff a
read (BoundedQueue q) = do
r AVar.take q.readPos
v AVar.take (unsafePartial unsafeIndex q.contents r)
r <- AVar.take q.readPos
v <- AVar.take (unsafePartial unsafeIndex q.contents r)
AVar.put ((r + 1) `mod` q.size) q.readPos
pure v

-- | Checks whether the given queue is empty. Never blocks.
isEmpty ∷ ∀ a. BoundedQueue a Aff Boolean
isEmpty :: forall a. BoundedQueue a -> Aff Boolean
isEmpty (BoundedQueue q) = do
AVar.tryRead q.readPos >>= case _ of
Nothing pure true
Just r AVar.tryRead (unsafePartial unsafeIndex q.contents r) <#> case _ of
Nothing true
Just _ false
Nothing -> pure true
Just r -> AVar.tryRead (unsafePartial unsafeIndex q.contents r) <#> case _ of
Nothing -> true
Just _ -> false

-- | Attempts to read an element from the given queue. If the queue is empty,
-- | returns `Nothing`.
-- |
-- | *Careful!* If other readers are blocked on the queue `tryRead` will also
-- | block.
tryRead ∷ ∀ a. BoundedQueue a Aff (Maybe a)
tryRead :: forall a. BoundedQueue a -> Aff (Maybe a)
tryRead (BoundedQueue q) = do
r AVar.take q.readPos
r <- AVar.take q.readPos
AVar.tryTake (unsafePartial unsafeIndex q.contents r) >>= case _ of
Just v do
Just v -> do
AVar.put ((r + 1) `mod` q.size) q.readPos
pure (Just v)
Nothing do
Nothing -> do
AVar.put r q.readPos
pure Nothing

Expand All @@ -78,11 +78,11 @@ tryRead (BoundedQueue q) = do
-- |
-- | *Careful!* If other writers are blocked on the queue `tryWrite` will also
-- | block.
tryWrite ∷ ∀ a. BoundedQueue a a Aff Boolean
tryWrite :: forall a. BoundedQueue a -> a -> Aff Boolean
tryWrite (BoundedQueue q) a = do
w AVar.take q.writePos
AVar.tryPut a (unsafePartial unsafeIndex q.contents w) >>= if _
then do
w <- AVar.take q.writePos
AVar.tryPut a (unsafePartial unsafeIndex q.contents w) >>=
if _ then do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on this style?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm personally fine with it!

AVar.put ((w + 1) `mod` q.size) q.writePos
pure true
else do
Expand Down
26 changes: 13 additions & 13 deletions src/Concurrent/BoundedQueue/Internal.purs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
module Concurrent.BoundedQueue.Internal
( BoundedQueue(..)
) where
import Effect.AVar (AVar)
newtype BoundedQueue a =
BoundedQueue
{ size Int
, contents Array (AVar a)
, readPos AVar Int
, writePos AVar Int
}
module Concurrent.BoundedQueue.Internal
( BoundedQueue(..)
) where

import Effect.AVar (AVar)

newtype BoundedQueue a =
BoundedQueue
{ size :: Int
, contents :: Array (AVar a)
, readPos :: AVar Int
, writePos :: AVar Int
}
137 changes: 68 additions & 69 deletions src/Concurrent/BoundedQueue/Sync.purs
Original file line number Diff line number Diff line change
@@ -1,69 +1,68 @@
module Concurrent.BoundedQueue.Sync
( new
, isEmpty
, tryRead
, tryWrite
, module Export
)
where

import Prelude

import Concurrent.BoundedQueue.Internal (BoundedQueue(..))
import Concurrent.BoundedQueue.Internal (BoundedQueue) as Export
import Data.Array (unsafeIndex)
import Data.Maybe (Maybe(..))
import Data.Unfoldable (replicateA)
import Effect (Effect)
import Effect.AVar as AVarEff
import Partial.Unsafe (unsafePartial)

-- | Synchronously creates a new `BoundedQueue` with the given capacity.
new ∷ ∀ a. Int → Effect (BoundedQueue a)
new size = do
contents ← replicateA size AVarEff.empty
readPos ← AVarEff.new 0
writePos ← AVarEff.new 0
pure (BoundedQueue { size, contents, readPos, writePos })

-- | Synchronously checks whether the given queue is empty. Never blocks.
isEmpty ∷ ∀ a. BoundedQueue a → Effect Boolean
isEmpty (BoundedQueue q) = do
AVarEff.tryRead q.readPos >>= case _ of
Nothing → pure true
Just r → AVarEff.tryRead (unsafePartial unsafeIndex q.contents r) <#>
case _ of
Nothing → true
Just _ → false

-- | Synchronously attempts to read an element from the given queue. If the
-- | queue is empty, or there is a concurrent reader, returns `Nothing`.
tryRead ∷ ∀ a. BoundedQueue a -> Effect (Maybe a)
tryRead (BoundedQueue q) = do
mr ← AVarEff.tryTake q.readPos
case mr of
Just r → do
AVarEff.tryTake (unsafePartial unsafeIndex q.contents r) >>= case _ of
Just v → do
_ <- AVarEff.tryPut ((r + 1) `mod` q.size) q.readPos
pure (Just v)
Nothing → do
_ <- AVarEff.tryPut r q.readPos
pure Nothing
Nothing → pure Nothing

-- | Attempts to write an element into the given queue. If the queue is full,
-- | or there is a concurrent writer, returns `false` otherwise `true`.
tryWrite ∷ ∀ a. BoundedQueue a → a → Effect Boolean
tryWrite (BoundedQueue q) a = do
mw ← AVarEff.tryTake q.writePos
case mw of
Just w → do
AVarEff.tryPut a (unsafePartial unsafeIndex q.contents w) >>= if _
then do
_ ← AVarEff.tryPut ((w + 1) `mod` q.size) q.writePos
pure true
else do
_ ← AVarEff.tryPut w q.writePos
pure false
Nothing → pure false
module Concurrent.BoundedQueue.Sync
( new
, isEmpty
, tryRead
, tryWrite
, module Export
) where

import Prelude

import Concurrent.BoundedQueue.Internal (BoundedQueue(..))
import Concurrent.BoundedQueue.Internal (BoundedQueue) as Export
import Data.Array (unsafeIndex)
import Data.Maybe (Maybe(..))
import Data.Unfoldable (replicateA)
import Effect (Effect)
import Effect.AVar as AVarEff
import Partial.Unsafe (unsafePartial)

-- | Synchronously creates a new `BoundedQueue` with the given capacity.
new :: forall a. Int -> Effect (BoundedQueue a)
new size = do
contents <- replicateA size AVarEff.empty
readPos <- AVarEff.new 0
writePos <- AVarEff.new 0
pure (BoundedQueue { size, contents, readPos, writePos })

-- | Synchronously checks whether the given queue is empty. Never blocks.
isEmpty :: forall a. BoundedQueue a -> Effect Boolean
isEmpty (BoundedQueue q) = do
AVarEff.tryRead q.readPos >>= case _ of
Nothing -> pure true
Just r -> AVarEff.tryRead (unsafePartial unsafeIndex q.contents r) <#>
case _ of
Nothing -> true
Just _ -> false

-- | Synchronously attempts to read an element from the given queue. If the
-- | queue is empty, or there is a concurrent reader, returns `Nothing`.
tryRead :: forall a. BoundedQueue a -> Effect (Maybe a)
tryRead (BoundedQueue q) = do
mr <- AVarEff.tryTake q.readPos
case mr of
Just r -> do
AVarEff.tryTake (unsafePartial unsafeIndex q.contents r) >>= case _ of
Just v -> do
_ <- AVarEff.tryPut ((r + 1) `mod` q.size) q.readPos
pure (Just v)
Nothing -> do
_ <- AVarEff.tryPut r q.readPos
pure Nothing
Nothing -> pure Nothing

-- | Attempts to write an element into the given queue. If the queue is full,
-- | or there is a concurrent writer, returns `false` otherwise `true`.
tryWrite :: forall a. BoundedQueue a -> a -> Effect Boolean
tryWrite (BoundedQueue q) a = do
mw <- AVarEff.tryTake q.writePos
case mw of
Just w -> do
AVarEff.tryPut a (unsafePartial unsafeIndex q.contents w) >>=
if _ then do
_ <- AVarEff.tryPut ((w + 1) `mod` q.size) q.writePos
pure true
else do
_ <- AVarEff.tryPut w q.writePos
pure false
Nothing -> pure false
32 changes: 16 additions & 16 deletions src/Concurrent/Queue.purs
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,35 @@ import Effect.Aff.AVar as AVar

-- | An unbounded Queue fit for concurrent access.
newtype Queue a = Queue
{ readEnd AVar (Stream a)
, writeEnd AVar (Stream a)
{ readEnd :: AVar (Stream a)
, writeEnd :: AVar (Stream a)
}

type Stream a = AVar (QItem a)
data QItem a = QItem a (Stream a)

-- | Creates a new `Queue`.
new ∷ ∀ a. Aff (Queue a)
new :: forall a. Aff (Queue a)
new = do
hole AVar.empty
readEnd AVar.new hole
writeEnd AVar.new hole
hole <- AVar.empty
readEnd <- AVar.new hole
writeEnd <- AVar.new hole
pure (Queue { readEnd, writeEnd })

-- | Writes a new value into the queue
write ∷ ∀ a. Queue a a Aff Unit
write :: forall a. Queue a -> a -> Aff Unit
write (Queue q) a = do
newHole AVar.empty
oldHole AVar.take q.writeEnd
newHole <- AVar.empty
oldHole <- AVar.take q.writeEnd
AVar.put (QItem a newHole) oldHole
AVar.put newHole q.writeEnd

-- | Reads a value from the queue. Blocks if the queue is empty, and resumes
-- | when it has been written to.
read ∷ ∀ a. Queue a Aff a
read :: forall a. Queue a -> Aff a
read (Queue q) = do
readEnd AVar.take q.readEnd
QItem a newRead AVar.read readEnd
readEnd <- AVar.take q.readEnd
QItem a newRead <- AVar.read readEnd
AVar.put newRead q.readEnd
pure a

Expand All @@ -53,13 +53,13 @@ read (Queue q) = do
-- |
-- | *CAREFUL!* This will block if other readers are blocked on the
-- | queue.
tryRead ∷ ∀ a. Queue a Aff (Maybe a)
tryRead :: forall a. Queue a -> Aff (Maybe a)
tryRead (Queue q) = do
readEnd AVar.take q.readEnd
readEnd <- AVar.take q.readEnd
AVar.tryRead readEnd >>= case _ of
Just (QItem a newRead) do
Just (QItem a newRead) -> do
AVar.put newRead q.readEnd
pure (Just a)
Nothing do
Nothing -> do
AVar.put readEnd q.readEnd
pure Nothing
Loading