Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Small Queue improvements #235

Merged
merged 6 commits into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ internal interface Publish<A> {
* Evaluates to `false` if element was not published.
* Evaluates to `true` if element was published successfully.
*/
suspend fun tryPublish(a: A): Boolean
fun tryPublish(a: A): Boolean
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was it suspend?

Copy link
Member Author

@nomisRev nomisRev Jul 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the underlying implementation was still relying on suspension, which has now been rewritten to work without suspension.

EDIT: it was sharing code with suspend fun publish.

}

internal interface Subscribe<A, Selector> {
Expand Down Expand Up @@ -563,7 +563,14 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<

val state = atomic(initial)

suspend fun <X> update(f: (PubSub.PubSubState<I, O, QS, S>) -> Pair<PubSub.PubSubState<I, O, QS, S>, suspend () -> X>): X =
fun <X> update(f: (PubSub.PubSubState<I, O, QS, S>) -> Pair<PubSub.PubSubState<I, O, QS, S>, X>): X =
state.modify { ps ->
val (ps1, result) = f(ps)
val (ps2, _) = loop(ps1) { Unit }
Pair(ps2, result)
}

suspend fun <X> modify(f: (PubSub.PubSubState<I, O, QS, S>) -> Pair<PubSub.PubSubState<I, O, QS, S>, suspend () -> X>): X =
state.modify { ps ->
val (ps1, result) = f(ps)
val (ps2, action) = loop(ps1) { Unit }
Expand Down Expand Up @@ -681,7 +688,7 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
ps.copy(queue = strategy.publish(i, ps.queue))

override suspend fun publish(a: I) =
update { ps ->
modify { ps ->
if (strategy.accepts(a, ps.queue)) {
val ps1 = publish_(a, ps)
Pair(ps1, suspend { Unit })
Expand All @@ -698,17 +705,17 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
}
}

override suspend fun tryPublish(a: I): Boolean =
override fun tryPublish(a: I): Boolean =
update { ps ->
if (!strategy.accepts(a, ps.queue)) Pair(ps, suspend { false })
if (!strategy.accepts(a, ps.queue)) Pair(ps, false)
else {
val ps1 = publish_(a, ps)
Pair(ps1, suspend { true })
Pair(ps1, true)
}
}

override suspend fun get(selector: S): O =
update { ps ->
modify { ps ->
val (ps, option) = tryGet_(selector, ps)
when (option) {
None -> {
Expand All @@ -726,7 +733,7 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
}

private suspend fun streamingGet(token: Token, selector: S) =
update<O> { ps ->
modify<O> { ps ->
val (ps, option) = tryGet_(selector, ps)
when (option) {
is Some -> Pair(ps, suspend { option.t })
Expand All @@ -743,19 +750,19 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
}

override suspend fun tryGet(selector: S): Option<O> =
update { ps ->
modify { ps ->
val (ps1, result) = tryGet_(selector, ps)
Pair(ps1, suspend { result })
}

override suspend fun subscribe(selector: S): Boolean =
update { ps ->
modify { ps ->
val (queue, success) = strategy.subscribe(selector, ps.queue)
Pair(ps.copy(queue = queue), suspend { success })
}

override suspend fun unsubscribe(selector: S): Unit =
update { ps ->
modify { ps ->
Pair(ps.copy(queue = strategy.unsubscribe(selector, ps.queue)), suspend { Unit })
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ interface Enqueue<A> {
*
* @param a `A` to enqueue
*/
suspend fun offer1(a: A): Boolean
fun tryOffer1(a: A): Boolean
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JorgeCastilloPrz breaking change in Arrow Fx Coroutine Stream's Queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was fast!! thank youu!! 🎉

}

/** Provides the ability to dequeue individual elements from a `Queue`. */
Expand Down Expand Up @@ -97,7 +97,7 @@ interface Queue<A> : Enqueue<A>, Dequeue1<A>, Dequeue<A> {
fun <B> imap(f: (A) -> B, g: (B) -> A): Queue<B> =
object : Queue<B> {
override suspend fun enqueue1(a: B) = enqueue1(g(a))
override suspend fun offer1(a: B): Boolean = offer1(g(a))
override fun tryOffer1(a: B): Boolean = tryOffer1(g(a))
override suspend fun dequeue1(): B = f(this@Queue.dequeue1())
override suspend fun tryDequeue1(): Option<B> = this@Queue.tryDequeue1().map(f)
override suspend fun dequeueChunk1(maxSize: Int): Chunk<B> = this@Queue.dequeueChunk1(maxSize).map(f)
Expand All @@ -111,51 +111,78 @@ interface Queue<A> : Enqueue<A>, Dequeue1<A>, Dequeue<A> {
companion object {

/** Creates a queue from the supplied strategy. */
private suspend fun <S, A> fromStrategy(strategy: PubSub.Strategy<A, Chunk<A>, S, Int>): Queue<A> {
val pubSub = PubSub.from(strategy)
private fun <S, A> fromStrategy(strategy: PubSub.Strategy<A, Chunk<A>, S, Int>): Queue<A> {
val pubSub = PubSub.unsafe(strategy)
return DefaultQueue(pubSub)
}

/** Creates a queue from the supplied strategy. */
private suspend fun <S, A> fromStrategyNoneTerminated(strategy: PubSub.Strategy<Option<A>, Option<Chunk<A>>, S, Int>): NoneTerminatedQueue<A> {
val pubSub = PubSub.from(strategy)
private fun <S, A> fromStrategyNoneTerminated(strategy: PubSub.Strategy<Option<A>, Option<Chunk<A>>, S, Int>): NoneTerminatedQueue<A> {
val pubSub = PubSub.unsafe(strategy)
return DefaultNoneTerminatedQueue(pubSub)
}

/** Creates a FIFO queue with no size bound. */
suspend fun <A> unbounded(): Queue<A> =
fromStrategy(Strategy.fifo())

fun <A> unsafeUnbounded(): Queue<A> =
fromStrategy(Strategy.fifo())

/** Creates an unbounded FIFO queue that distributed always at max `fairSize` elements to any subscriber. */
suspend fun <A> fairUnbounded(fairSize: Int): Queue<A> =
fromStrategy(Strategy.fifo<A>().transformSelector { size, _ -> min(size, fairSize) })

fun <A> unsafeFairUnbounded(fairSize: Int): Queue<A> =
fromStrategy(Strategy.fifo<A>().transformSelector { size, _ -> min(size, fairSize) })

/** Creates a FIFO queue with the specified size bound. */
suspend fun <A> bounded(maxSize: Int): Queue<A> =
fromStrategy(Strategy.boundedFifo(maxSize))

fun <A> unsafeBounded(maxSize: Int): Queue<A> =
fromStrategy(Strategy.boundedFifo(maxSize))

/** Creates a FILO queue with the specified size bound. */
suspend fun <A> boundedLife(maxSize: Int): Queue<A> =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JorgeCastilloPrz Second breaking change

suspend fun <A> boundedLifo(maxSize: Int): Queue<A> =
fromStrategy(Strategy.boundedLifo(maxSize))

fun <A> unsafeBoundedLifo(maxSize: Int): Queue<A> =
fromStrategy(Strategy.boundedLifo(maxSize))

/** Creates a queue which stores the last `maxSize` enqueued elements and which never blocks on enqueue. */
suspend fun <A> circularBuffer(maxSize: Int): Queue<A> =
fromStrategy(Strategy.circularBuffer(maxSize))

fun <A> unsafeCircularBuffer(maxSize: Int): Queue<A> =
fromStrategy(Strategy.circularBuffer(maxSize))

/** Created a bounded queue that distributed always at max `fairSize` elements to any subscriber. */
suspend fun <A> fairBounded(maxSize: Int, fairSize: Int): Queue<A> =
fromStrategy(Strategy.boundedFifo<A>(maxSize).transformSelector { size, _ -> min(size, fairSize) })

fun <A> unsafeFairBounded(maxSize: Int, fairSize: Int): Queue<A> =
fromStrategy(Strategy.boundedFifo<A>(maxSize).transformSelector { size, _ -> min(size, fairSize) })

/** Creates a queue which allows at most a single element to be enqueued at any time. */
suspend fun <A> synchronous(): Queue<A> =
fromStrategy(Strategy.synchronous())

fun <A> unsafeSynchronous(): Queue<A> =
fromStrategy(Strategy.synchronous())

/** Like [synchronous], except that any enqueue of `None` will never block and cancels any dequeue operation. */
suspend fun <A> synchronousNoneTerminated(): NoneTerminatedQueue<A> {
val strategy = Strategy.synchronous<A>()
val pubSub = PubSub.Strategy.closeNowOption(strategy)
return fromStrategyNoneTerminated(pubSub)
}

fun <A> unsafeSynchronousNoneTerminated(): NoneTerminatedQueue<A> {
val strategy = Strategy.synchronous<A>()
val pubSub = PubSub.Strategy.closeNowOption(strategy)
return fromStrategyNoneTerminated(pubSub)
}
}
}

Expand Down Expand Up @@ -276,7 +303,7 @@ internal class DefaultQueue<A>(private val pubSub: PubSub<A, Chunk<A>, Int>) : Q
override suspend fun enqueue1(a: A) =
pubSub.publish(a)

override suspend fun offer1(a: A): Boolean =
override fun tryOffer1(a: A): Boolean =
pubSub.tryPublish(a)

override suspend fun dequeue1(): A =
Expand Down Expand Up @@ -309,7 +336,7 @@ internal class DefaultNoneTerminatedQueue<A>(
override suspend fun enqueue1(a: Option<A>) =
pubSub.publish(a)

override suspend fun offer1(a: Option<A>): Boolean =
override fun tryOffer1(a: Option<A>): Boolean =
pubSub.tryPublish(a)

override suspend fun dequeue1(): Option<A> =
Expand Down