From 38220412bc0fa4d0cf1d8c6d75930126217d1626 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Wed, 29 Jul 2020 13:15:56 +0200 Subject: [PATCH 1/4] Remove suspend from tryOffer --- .../fx/coroutines/stream/concurrent/PubSub.kt | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/PubSub.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/PubSub.kt index 73d1a01c5..b7f64a8c8 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/PubSub.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/PubSub.kt @@ -32,7 +32,7 @@ internal interface Publish { * Evaluates to `false` if element was not published. * Evaluates to `true` if element was published successfully. */ - suspend fun tryPublish(a: A): Boolean + fun tryPublish(a: A): Boolean } internal interface Subscribe { @@ -563,7 +563,14 @@ internal class DefaultPubSub(private val strategy: PubSub.Strategy< val state = atomic(initial) - suspend fun update(f: (PubSub.PubSubState) -> Pair, suspend () -> X>): X = + fun update(f: (PubSub.PubSubState) -> Pair, X>): X = + state.modify { ps -> + val (ps1, result) = f(ps) + val (ps2, _) = loop(ps1) { Unit } + Pair(ps2, result) + } + + suspend fun modify(f: (PubSub.PubSubState) -> Pair, suspend () -> X>): X = state.modify { ps -> val (ps1, result) = f(ps) val (ps2, action) = loop(ps1) { Unit } @@ -681,7 +688,7 @@ internal class DefaultPubSub(private val strategy: PubSub.Strategy< ps.copy(queue = strategy.publish(i, ps.queue)) override suspend fun publish(a: I) = - update { ps -> + modify { ps -> if (strategy.accepts(a, ps.queue)) { val ps1 = publish_(a, ps) Pair(ps1, suspend { Unit }) @@ -698,17 +705,17 @@ internal class DefaultPubSub(private val strategy: PubSub.Strategy< } } - override suspend fun tryPublish(a: I): Boolean = + override fun tryPublish(a: I): Boolean = update { ps -> - if (!strategy.accepts(a, ps.queue)) Pair(ps, suspend { false }) + if (!strategy.accepts(a, ps.queue)) Pair(ps, false) else { val ps1 = publish_(a, ps) - Pair(ps1, suspend { true }) + Pair(ps1, true) } } override suspend fun get(selector: S): O = - update { ps -> + modify { ps -> val (ps, option) = tryGet_(selector, ps) when (option) { None -> { @@ -726,7 +733,7 @@ internal class DefaultPubSub(private val strategy: PubSub.Strategy< } private suspend fun streamingGet(token: Token, selector: S) = - update { ps -> + modify { ps -> val (ps, option) = tryGet_(selector, ps) when (option) { is Some -> Pair(ps, suspend { option.t }) @@ -743,19 +750,19 @@ internal class DefaultPubSub(private val strategy: PubSub.Strategy< } override suspend fun tryGet(selector: S): Option = - update { ps -> + modify { ps -> val (ps1, result) = tryGet_(selector, ps) Pair(ps1, suspend { result }) } override suspend fun subscribe(selector: S): Boolean = - update { ps -> + modify { ps -> val (queue, success) = strategy.subscribe(selector, ps.queue) Pair(ps.copy(queue = queue), suspend { success }) } override suspend fun unsubscribe(selector: S): Unit = - update { ps -> + modify { ps -> Pair(ps.copy(queue = strategy.unsubscribe(selector, ps.queue)), suspend { Unit }) } } From 44cda54a55c29212d7810a86e636a10093392834 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Wed, 29 Jul 2020 14:02:22 +0200 Subject: [PATCH 2/4] Add unsafe constructors for Queue --- .../fx/coroutines/stream/concurrent/Queue.kt | 37 ++++++++++++++++--- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt index 4bff1d205..a3166cdf9 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt @@ -111,14 +111,14 @@ interface Queue : Enqueue, Dequeue1, Dequeue { companion object { /** Creates a queue from the supplied strategy. */ - private suspend fun fromStrategy(strategy: PubSub.Strategy, S, Int>): Queue { - val pubSub = PubSub.from(strategy) + private fun fromStrategy(strategy: PubSub.Strategy, S, Int>): Queue { + val pubSub = PubSub.unsafe(strategy) return DefaultQueue(pubSub) } /** Creates a queue from the supplied strategy. */ - private suspend fun fromStrategyNoneTerminated(strategy: PubSub.Strategy, Option>, S, Int>): NoneTerminatedQueue { - val pubSub = PubSub.from(strategy) + private fun fromStrategyNoneTerminated(strategy: PubSub.Strategy, Option>, S, Int>): NoneTerminatedQueue { + val pubSub = PubSub.unsafe(strategy) return DefaultNoneTerminatedQueue(pubSub) } @@ -126,36 +126,63 @@ interface Queue : Enqueue, Dequeue1, Dequeue { suspend fun unbounded(): Queue = fromStrategy(Strategy.fifo()) + fun unsafeUnbounded(): Queue = + fromStrategy(Strategy.fifo()) + /** Creates an unbounded FIFO queue that distributed always at max `fairSize` elements to any subscriber. */ suspend fun fairUnbounded(fairSize: Int): Queue = fromStrategy(Strategy.fifo().transformSelector { size, _ -> min(size, fairSize) }) + fun unsafeFairUnbounded(fairSize: Int): Queue = + fromStrategy(Strategy.fifo().transformSelector { size, _ -> min(size, fairSize) }) + /** Creates a FIFO queue with the specified size bound. */ suspend fun bounded(maxSize: Int): Queue = fromStrategy(Strategy.boundedFifo(maxSize)) + fun unsafeBounded(maxSize: Int): Queue = + fromStrategy(Strategy.boundedFifo(maxSize)) + /** Creates a FILO queue with the specified size bound. */ - suspend fun boundedLife(maxSize: Int): Queue = + suspend fun boundedLifo(maxSize: Int): Queue = + fromStrategy(Strategy.boundedLifo(maxSize)) + + fun unsafeBoundedLifo(maxSize: Int): Queue = fromStrategy(Strategy.boundedLifo(maxSize)) /** Creates a queue which stores the last `maxSize` enqueued elements and which never blocks on enqueue. */ suspend fun circularBuffer(maxSize: Int): Queue = fromStrategy(Strategy.circularBuffer(maxSize)) + fun unsafeCircularBuffer(maxSize: Int): Queue = + fromStrategy(Strategy.circularBuffer(maxSize)) + /** Created a bounded queue that distributed always at max `fairSize` elements to any subscriber. */ suspend fun fairBounded(maxSize: Int, fairSize: Int): Queue = fromStrategy(Strategy.boundedFifo(maxSize).transformSelector { size, _ -> min(size, fairSize) }) + fun unsafeFairBounded(maxSize: Int, fairSize: Int): Queue = + fromStrategy(Strategy.boundedFifo(maxSize).transformSelector { size, _ -> min(size, fairSize) }) + /** Creates a queue which allows at most a single element to be enqueued at any time. */ suspend fun synchronous(): Queue = fromStrategy(Strategy.synchronous()) + fun unsafeSynchronous(): Queue = + fromStrategy(Strategy.synchronous()) + /** Like [synchronous], except that any enqueue of `None` will never block and cancels any dequeue operation. */ suspend fun synchronousNoneTerminated(): NoneTerminatedQueue { val strategy = Strategy.synchronous() val pubSub = PubSub.Strategy.closeNowOption(strategy) return fromStrategyNoneTerminated(pubSub) } + + fun unsafeSynchronousNoneTerminated(): NoneTerminatedQueue { + val strategy = Strategy.synchronous() + val pubSub = PubSub.Strategy.closeNowOption(strategy) + return fromStrategyNoneTerminated(pubSub) + } } } From 9cc31a76dabee2dbdb011e25174ad6fbeb863243 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Wed, 29 Jul 2020 14:03:53 +0200 Subject: [PATCH 3/4] Queue - refactor offer1 to tryOffer1 --- .../kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt index a3166cdf9..9ecc8bfa6 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt @@ -39,7 +39,7 @@ interface Enqueue { * * @param a `A` to enqueue */ - suspend fun offer1(a: A): Boolean + fun tryOffer1(a: A): Boolean } /** Provides the ability to dequeue individual elements from a `Queue`. */ @@ -97,7 +97,7 @@ interface Queue : Enqueue, Dequeue1, Dequeue { fun imap(f: (A) -> B, g: (B) -> A): Queue = object : Queue { override suspend fun enqueue1(a: B) = enqueue1(g(a)) - override suspend fun offer1(a: B): Boolean = offer1(g(a)) + override fun tryOffer1(a: B): Boolean = tryOffer1(g(a)) override suspend fun dequeue1(): B = f(this@Queue.dequeue1()) override suspend fun tryDequeue1(): Option = this@Queue.tryDequeue1().map(f) override suspend fun dequeueChunk1(maxSize: Int): Chunk = this@Queue.dequeueChunk1(maxSize).map(f) @@ -303,7 +303,7 @@ internal class DefaultQueue(private val pubSub: PubSub, Int>) : Q override suspend fun enqueue1(a: A) = pubSub.publish(a) - override suspend fun offer1(a: A): Boolean = + override fun tryOffer1(a: A): Boolean = pubSub.tryPublish(a) override suspend fun dequeue1(): A = @@ -336,7 +336,7 @@ internal class DefaultNoneTerminatedQueue( override suspend fun enqueue1(a: Option) = pubSub.publish(a) - override suspend fun offer1(a: Option): Boolean = + override fun tryOffer1(a: Option): Boolean = pubSub.tryPublish(a) override suspend fun dequeue1(): Option = From 3f892cf28c7037cfef68fc7d74b6aa1412808c8e Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Wed, 29 Jul 2020 14:05:30 +0200 Subject: [PATCH 4/4] Remove first in last out strategy from Queue --- .../kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt | 7 ------- 1 file changed, 7 deletions(-) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt index 9ecc8bfa6..9df91f9bd 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt @@ -143,13 +143,6 @@ interface Queue : Enqueue, Dequeue1, Dequeue { fun unsafeBounded(maxSize: Int): Queue = fromStrategy(Strategy.boundedFifo(maxSize)) - /** Creates a FILO queue with the specified size bound. */ - suspend fun boundedLifo(maxSize: Int): Queue = - fromStrategy(Strategy.boundedLifo(maxSize)) - - fun unsafeBoundedLifo(maxSize: Int): Queue = - fromStrategy(Strategy.boundedLifo(maxSize)) - /** Creates a queue which stores the last `maxSize` enqueued elements and which never blocks on enqueue. */ suspend fun circularBuffer(maxSize: Int): Queue = fromStrategy(Strategy.circularBuffer(maxSize))