Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Commit

Permalink
Queue new API + docs (#30)
Browse files Browse the repository at this point in the history
* Merge new Queue internal impl with Queue

* Add some additional tests for new behavior

* Add more tests

* Add Concurrent syntax for Queue

* Fix compileJmhKotlin

* Fix documentation

* Update Queue tests

* Add takeAll & peekAll

* Add tryOfferAll

* Add offerAll

* First round of PR review

* Apply suggestions from doc lang review

Co-Authored-By: Alberto Ballano <aballano@users.noreply.github.com>
Co-Authored-By: Jannis <overesch.jannis@gmail.com>

* Fix offerAll cancelation

* Add docs to Dequeue

* Remove `shutdown` from `Queue` API

* Remove LinkedMap and use LinkedHashMap instead

* Refactor API to be Iterable instead of Collection

* Add tests to check all strategies

* Add examples queue strategies

* Fix unresolved A Dequeue docs

* Fix Dequeue docs some more

* Fix Sliding KDoc example

* Apply suggestions from doc review

Co-Authored-By: danieh <danimontoya_86@hotmail.com>

* Code review, remove Shutdown references

Co-authored-by: Rachel M. Carmena <rachelcarmena@users.noreply.github.com>
Co-authored-by: Alberto Ballano <aballano@users.noreply.github.com>
Co-authored-by: Jannis <overesch.jannis@gmail.com>
Co-authored-by: danieh <danimontoya_86@hotmail.com>
  • Loading branch information
5 people committed Mar 17, 2020
1 parent f7fd98f commit 2c0cdbe
Show file tree
Hide file tree
Showing 15 changed files with 1,438 additions and 880 deletions.
9 changes: 1 addition & 8 deletions arrow-benchmarks-fx/src/jmh/kotlin/arrow/benchmarks/Queue.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package arrow.benchmarks

import arrow.fx.ConcurrentQueue
import arrow.fx.ForIO
import arrow.fx.IO
import arrow.fx.IOOf
Expand All @@ -9,7 +8,6 @@ import arrow.fx.extensions.io.concurrent.concurrent
import arrow.fx.extensions.io.functor.unit
import arrow.fx.extensions.io.monad.flatMap
import arrow.fx.fix
import arrow.fx.internal.CancellableQueue
import org.openjdk.jmh.annotations.Benchmark
import org.openjdk.jmh.annotations.CompilerControl
import org.openjdk.jmh.annotations.Fork
Expand All @@ -34,12 +32,10 @@ open class Queue {
var size: Int = 0

var ConcurQueue by Delegates.notNull<Queue<ForIO, Int>>()
var CancelQueue by Delegates.notNull<Queue<ForIO, Int>>()

@Setup(Level.Trial)
fun createQueues(): Unit {
ConcurQueue = ConcurrentQueue.empty<ForIO, Int>(IO.concurrent()).fix().unsafeRunSync()
CancelQueue = CancellableQueue.empty<ForIO, Int>(IO.concurrent()).fix().unsafeRunSync()
ConcurQueue = Queue.unbounded<ForIO, Int>(IO.concurrent()).fix().unsafeRunSync()
}

fun <A> IOOf<A>.repeat(n: Int): IO<A> =
Expand All @@ -52,7 +48,4 @@ open class Queue {

@Benchmark
fun concurrentQueue(): Unit = loop(ConcurQueue)

@Benchmark
fun cancellableQueue(): Unit = loop(CancelQueue)
}
49 changes: 0 additions & 49 deletions arrow-docs/docs/queue/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,52 +162,3 @@ fun main(args: Array<String>) {
println(result.unsafeRunSync())
}
```

### Shutting down

A `Queue` also has the ability to be `shutdown`, interrupting any future calls to `take` or `offer` with a
`QueueShutdown` error and cancelling any suspended fibers waiting to `take` or `offer`.

```kotlin:ank:playground
import arrow.fx.*
import arrow.fx.extensions.fx
import arrow.fx.extensions.io.concurrent.concurrent
fun main(args: Array<String>) {
val result =
//sampleStart
IO.fx {
val q = !Queue.bounded<ForIO, Int>(10, IO.concurrent())
val t = !q.take().fork()
!q.shutdown()
!t.join() // Attempting to `join` after `shutdown` results in a `QueueShutdown` error
}
//sampleEnd
println(result.attempt().unsafeRunSync())
}
```

Consumers of the `Queue` can also track the event of a shutdown by calling `awaitShutdown` to receive a suspended
`Concurrent<F>` that will resume once the `Queue` has been shutdown.

```kotlin:ank:playground
import arrow.fx.*
import arrow.fx.extensions.fx
import arrow.fx.extensions.io.concurrent.concurrent
fun main(args: Array<String>) {
val result =
//sampleStart
IO.fx {
val q = !Queue.bounded<ForIO, Int>(10, IO.concurrent())
val onShutdown = !q.awaitShutdown().fork()
!q.offer(42)
val fortyTwo = !q.take()
!q.shutdown()
!onShutdown.join()
fortyTwo
}
//sampleEnd
println(result.unsafeRunSync())
}
```
20 changes: 20 additions & 0 deletions arrow-fx-test/src/main/kotlin/arrow/fx/test/laws/Law.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package arrow.fx.test.laws

import arrow.core.extensions.either.eq.eq
import arrow.fx.IO
import arrow.fx.IOOf
import arrow.fx.fix
import arrow.fx.extensions.io.applicative.applicative
import arrow.fx.extensions.io.concurrent.waitFor
import arrow.fx.typeclasses.Duration
import arrow.fx.typeclasses.seconds
import arrow.core.test.generators.tuple2
import arrow.core.test.generators.tuple3
import arrow.core.test.generators.tuple4
Expand All @@ -14,6 +22,18 @@ import io.kotlintest.shouldNot
fun <A> A.equalUnderTheLaw(b: A, eq: Eq<A>): Boolean =
shouldBeEq(b, eq).let { true }

fun <A> IOOf<A>.equalUnderTheLaw(b: IOOf<A>, EQA: Eq<A> = Eq.any(), timeout: Duration = 5.seconds): Boolean =
(this should object : Matcher<IOOf<A>> {
override fun test(value: IOOf<A>): Result =
arrow.core.Either.eq(Eq.any(), EQA).run {
IO.applicative().mapN(value.fix().attempt(), b.fix().attempt()) { (a, b) ->
Result(a.eqv(b), "Expected: $b but found: $a", "$b and $a should be equal")
}
.waitFor(timeout)
.unsafeRunSync()
}
}).let { true }

fun <A> A.shouldBeEq(b: A, eq: Eq<A>): Unit = this should matchUnderEq(eq, b)

fun <A> A.shouldNotBeEq(b: A, eq: Eq<A>): Unit = this shouldNot matchUnderEq(eq, b)
Expand Down
219 changes: 0 additions & 219 deletions arrow-fx/src/main/kotlin/arrow/fx/ConcurrentQueue.kt

This file was deleted.

Loading

0 comments on commit 2c0cdbe

Please sign in to comment.