Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Commit

Permalink
Small Queue improvements (#235)
Browse files Browse the repository at this point in the history
* Remove suspend from tryOffer

* Add unsafe constructors for Queue

* Queue - refactor offer1 to tryOffer1

* Remove first in last out strategy from Queue

Co-authored-by: danieh <daniel.montoya@47deg.com>
Co-authored-by: Alberto Ballano <aballano@users.noreply.github.com>
  • Loading branch information
3 people committed Jul 30, 2020
1 parent 92f437d commit 78f8e34
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ internal interface Publish<A> {
* Evaluates to `false` if element was not published.
* Evaluates to `true` if element was published successfully.
*/
suspend fun tryPublish(a: A): Boolean
fun tryPublish(a: A): Boolean
}

internal interface Subscribe<A, Selector> {
Expand Down Expand Up @@ -563,7 +563,14 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<

val state = atomic(initial)

suspend fun <X> update(f: (PubSub.PubSubState<I, O, QS, S>) -> Pair<PubSub.PubSubState<I, O, QS, S>, suspend () -> X>): X =
fun <X> update(f: (PubSub.PubSubState<I, O, QS, S>) -> Pair<PubSub.PubSubState<I, O, QS, S>, X>): X =
state.modify { ps ->
val (ps1, result) = f(ps)
val (ps2, _) = loop(ps1) { Unit }
Pair(ps2, result)
}

suspend fun <X> modify(f: (PubSub.PubSubState<I, O, QS, S>) -> Pair<PubSub.PubSubState<I, O, QS, S>, suspend () -> X>): X =
state.modify { ps ->
val (ps1, result) = f(ps)
val (ps2, action) = loop(ps1) { Unit }
Expand Down Expand Up @@ -681,7 +688,7 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
ps.copy(queue = strategy.publish(i, ps.queue))

override suspend fun publish(a: I) =
update { ps ->
modify { ps ->
if (strategy.accepts(a, ps.queue)) {
val ps1 = publish_(a, ps)
Pair(ps1, suspend { Unit })
Expand All @@ -698,17 +705,17 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
}
}

override suspend fun tryPublish(a: I): Boolean =
override fun tryPublish(a: I): Boolean =
update { ps ->
if (!strategy.accepts(a, ps.queue)) Pair(ps, suspend { false })
if (!strategy.accepts(a, ps.queue)) Pair(ps, false)
else {
val ps1 = publish_(a, ps)
Pair(ps1, suspend { true })
Pair(ps1, true)
}
}

override suspend fun get(selector: S): O =
update { ps ->
modify { ps ->
val (ps, option) = tryGet_(selector, ps)
when (option) {
None -> {
Expand All @@ -726,7 +733,7 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
}

private suspend fun streamingGet(token: Token, selector: S) =
update<O> { ps ->
modify<O> { ps ->
val (ps, option) = tryGet_(selector, ps)
when (option) {
is Some -> Pair(ps, suspend { option.t })
Expand All @@ -743,19 +750,19 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
}

override suspend fun tryGet(selector: S): Option<O> =
update { ps ->
modify { ps ->
val (ps1, result) = tryGet_(selector, ps)
Pair(ps1, suspend { result })
}

override suspend fun subscribe(selector: S): Boolean =
update { ps ->
modify { ps ->
val (queue, success) = strategy.subscribe(selector, ps.queue)
Pair(ps.copy(queue = queue), suspend { success })
}

override suspend fun unsubscribe(selector: S): Unit =
update { ps ->
modify { ps ->
Pair(ps.copy(queue = strategy.unsubscribe(selector, ps.queue)), suspend { Unit })
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ interface Enqueue<A> {
*
* @param a `A` to enqueue
*/
suspend fun offer1(a: A): Boolean
fun tryOffer1(a: A): Boolean
}

/** Provides the ability to dequeue individual elements from a `Queue`. */
Expand Down Expand Up @@ -97,7 +97,7 @@ interface Queue<A> : Enqueue<A>, Dequeue1<A>, Dequeue<A> {
fun <B> imap(f: (A) -> B, g: (B) -> A): Queue<B> =
object : Queue<B> {
override suspend fun enqueue1(a: B) = enqueue1(g(a))
override suspend fun offer1(a: B): Boolean = offer1(g(a))
override fun tryOffer1(a: B): Boolean = tryOffer1(g(a))
override suspend fun dequeue1(): B = f(this@Queue.dequeue1())
override suspend fun tryDequeue1(): Option<B> = this@Queue.tryDequeue1().map(f)
override suspend fun dequeueChunk1(maxSize: Int): Chunk<B> = this@Queue.dequeueChunk1(maxSize).map(f)
Expand All @@ -111,51 +111,71 @@ interface Queue<A> : Enqueue<A>, Dequeue1<A>, Dequeue<A> {
companion object {

/** Creates a queue from the supplied strategy. */
private suspend fun <S, A> fromStrategy(strategy: PubSub.Strategy<A, Chunk<A>, S, Int>): Queue<A> {
val pubSub = PubSub.from(strategy)
private fun <S, A> fromStrategy(strategy: PubSub.Strategy<A, Chunk<A>, S, Int>): Queue<A> {
val pubSub = PubSub.unsafe(strategy)
return DefaultQueue(pubSub)
}

/** Creates a queue from the supplied strategy. */
private suspend fun <S, A> fromStrategyNoneTerminated(strategy: PubSub.Strategy<Option<A>, Option<Chunk<A>>, S, Int>): NoneTerminatedQueue<A> {
val pubSub = PubSub.from(strategy)
private fun <S, A> fromStrategyNoneTerminated(strategy: PubSub.Strategy<Option<A>, Option<Chunk<A>>, S, Int>): NoneTerminatedQueue<A> {
val pubSub = PubSub.unsafe(strategy)
return DefaultNoneTerminatedQueue(pubSub)
}

/** Creates a FIFO queue with no size bound. */
suspend fun <A> unbounded(): Queue<A> =
fromStrategy(Strategy.fifo())

fun <A> unsafeUnbounded(): Queue<A> =
fromStrategy(Strategy.fifo())

/** Creates an unbounded FIFO queue that distributed always at max `fairSize` elements to any subscriber. */
suspend fun <A> fairUnbounded(fairSize: Int): Queue<A> =
fromStrategy(Strategy.fifo<A>().transformSelector { size, _ -> min(size, fairSize) })

fun <A> unsafeFairUnbounded(fairSize: Int): Queue<A> =
fromStrategy(Strategy.fifo<A>().transformSelector { size, _ -> min(size, fairSize) })

/** Creates a FIFO queue with the specified size bound. */
suspend fun <A> bounded(maxSize: Int): Queue<A> =
fromStrategy(Strategy.boundedFifo(maxSize))

/** Creates a FILO queue with the specified size bound. */
suspend fun <A> boundedLife(maxSize: Int): Queue<A> =
fromStrategy(Strategy.boundedLifo(maxSize))
fun <A> unsafeBounded(maxSize: Int): Queue<A> =
fromStrategy(Strategy.boundedFifo(maxSize))

/** Creates a queue which stores the last `maxSize` enqueued elements and which never blocks on enqueue. */
suspend fun <A> circularBuffer(maxSize: Int): Queue<A> =
fromStrategy(Strategy.circularBuffer(maxSize))

fun <A> unsafeCircularBuffer(maxSize: Int): Queue<A> =
fromStrategy(Strategy.circularBuffer(maxSize))

/** Created a bounded queue that distributed always at max `fairSize` elements to any subscriber. */
suspend fun <A> fairBounded(maxSize: Int, fairSize: Int): Queue<A> =
fromStrategy(Strategy.boundedFifo<A>(maxSize).transformSelector { size, _ -> min(size, fairSize) })

fun <A> unsafeFairBounded(maxSize: Int, fairSize: Int): Queue<A> =
fromStrategy(Strategy.boundedFifo<A>(maxSize).transformSelector { size, _ -> min(size, fairSize) })

/** Creates a queue which allows at most a single element to be enqueued at any time. */
suspend fun <A> synchronous(): Queue<A> =
fromStrategy(Strategy.synchronous())

fun <A> unsafeSynchronous(): Queue<A> =
fromStrategy(Strategy.synchronous())

/** Like [synchronous], except that any enqueue of `None` will never block and cancels any dequeue operation. */
suspend fun <A> synchronousNoneTerminated(): NoneTerminatedQueue<A> {
val strategy = Strategy.synchronous<A>()
val pubSub = PubSub.Strategy.closeNowOption(strategy)
return fromStrategyNoneTerminated(pubSub)
}

fun <A> unsafeSynchronousNoneTerminated(): NoneTerminatedQueue<A> {
val strategy = Strategy.synchronous<A>()
val pubSub = PubSub.Strategy.closeNowOption(strategy)
return fromStrategyNoneTerminated(pubSub)
}
}
}

Expand Down Expand Up @@ -276,7 +296,7 @@ internal class DefaultQueue<A>(private val pubSub: PubSub<A, Chunk<A>, Int>) : Q
override suspend fun enqueue1(a: A) =
pubSub.publish(a)

override suspend fun offer1(a: A): Boolean =
override fun tryOffer1(a: A): Boolean =
pubSub.tryPublish(a)

override suspend fun dequeue1(): A =
Expand Down Expand Up @@ -309,7 +329,7 @@ internal class DefaultNoneTerminatedQueue<A>(
override suspend fun enqueue1(a: Option<A>) =
pubSub.publish(a)

override suspend fun offer1(a: Option<A>): Boolean =
override fun tryOffer1(a: Option<A>): Boolean =
pubSub.tryPublish(a)

override suspend fun dequeue1(): Option<A> =
Expand Down

0 comments on commit 78f8e34

Please sign in to comment.