Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Queue new API + docs #30

Merged
merged 38 commits into from
Mar 17, 2020
Merged

Queue new API + docs #30

merged 38 commits into from
Mar 17, 2020

Conversation

nomisRev
Copy link
Member

This PR exposes the new APIs from Queue, peek and try that do not block.
It also includes additional docs and tests for the new APIs, and a small restructuring.

It seems I have incorrectly merged ConcurrentQueue with CancelableQueue tho.
Any1 any idea how I keep the git history correctly? I'd happily re-open a new PR.

@nomisRev nomisRev requested a review from a team March 5, 2020 12:57
@aballano aballano added this to the 0.10.x milestone Mar 5, 2020
@aballano
Copy link
Member

aballano commented Mar 6, 2020

Any1 any idea how I keep the git history correctly? I'd happily re-open a new PR.

If you mean within this PR it's already done when you squash, just edit the message from here. If you mean from the master history then I don't think so because we would need to rewrite history by force-pushing, which would mean anyone with the branch might have conflicts.

@nomisRev
Copy link
Member Author

nomisRev commented Mar 7, 2020

@aballano that's not what I meant.
It looks like I renamed ConcurrentQueue to CancellableQueue, but I removed CancellableQueue and moved/refactored ConcurrentQueue.

Not sure how I can reflect that change properly in git. I retried with intermediate commits but got the same history.

Not critical, so ready for review. :)

Copy link
Member

@1Jajen1 1Jajen1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't say all that much about the code. Looks good though ^^

There are however a few nits on the documentation, which other than those reads quite well 👍

arrow-fx/src/main/kotlin/arrow/fx/Queue.kt Outdated Show resolved Hide resolved
arrow-fx/src/main/kotlin/arrow/fx/Queue.kt Outdated Show resolved Hide resolved
arrow-fx/src/main/kotlin/arrow/fx/Queue.kt Outdated Show resolved Hide resolved
arrow-fx/src/main/kotlin/arrow/fx/Queue.kt Outdated Show resolved Hide resolved
@1Jajen1
Copy link
Member

1Jajen1 commented Mar 7, 2020

Oh btw is it possible to support bulk operations like flush or offerAll? Those could be useful if you don't want to start fibers for each value to add/remove.

@nomisRev
Copy link
Member Author

nomisRev commented Mar 7, 2020

Yes, we definitely can but I haven't attempted to implement them yet.

@nomisRev
Copy link
Member Author

nomisRev commented Mar 7, 2020

@1Jajen1 what behavior would you expect for the different backpressure strategies? I think the one described below would be quite easy to add.

offerAll(la: Iterable): Kind<F, Unit>

  • Unbounded#offerAll: always adds all immediately
  • Bounded#offerAll: blocks until all are added, adds every element as soon as soon as capacity available. This way you can add more than the capacity size.
  • Dropping#offerAll: adds all in order, respecting capacity
  • Sliding#offerAll: adds all in order, respecting capacity

tryOfferAll(la: Iterable): Kind<F, Boolean>

Works same as other try operators.

flushAll(): Kind<F, List>

For all strategies returns what is currently in Queue. takeAll and peekAll might be better named for these operations.

Copy link
Member Author

@nomisRev nomisRev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doc suggestions!!

@1Jajen1
Copy link
Member

1Jajen1 commented Mar 7, 2020

I'd expect it to work as if we'd repeat offer so that: offerAll(xs) == xs.traverse_ { a -> offer(a) } or similar, which includes all the blocking/dropping etc.

Guessing from how the strategies currently work what you suggested should be the same right?

The same cannot really hold for flush it would be more like a recursive tryTake until it reaches None, so flush should never block.

@nomisRev
Copy link
Member Author

nomisRev commented Mar 7, 2020

Right, that's exactly what I was thinking.

So no need to implement anything low level, I'll add these as derived methods to Queue with tests to verify what I mentioned above.

@1Jajen1
Copy link
Member

1Jajen1 commented Mar 7, 2020

Right, that's exactly what I was thinking.

So no need to implement anything low level, I'll add these as derived methods to Queue with tests to verify what I mentioned above.

That depends, are these methods supposed to be atomic? Or better put does this hold: offerAll(xs).fork().followedBy(offer(x).fork()) => Queue(x + xs) or Queue(xs + x)?

Your name suggestions (missed them the first time) also make sense: takeAll peekAll both sound good to me.

@nomisRev
Copy link
Member Author

nomisRev commented Mar 7, 2020

xs.traverse_ { a -> offer(a) } is not atomic so no.

Shouldn't be too hard to implement atomically.

Copy link
Member Author

@nomisRev nomisRev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the extensive review @1Jajen1!!! 🙏 👏 I'll be addressing the PR comments today, and I'll improve the test suite based on your suggestions!

Comment on lines 93 to 107
"$label - empty queue takeAll is empty" {
forAll(Gen.positiveIntegers()) { n ->
IO.fx {
val q = !queue(n)
!q.takeAll()
}.equalUnderTheLaw(IO.just(emptyList()), EQ())
}
}

"$label - empty queue peekAll is empty" {
forAll(Gen.positiveIntegers()) { n ->
IO.fx {
val q = !queue(n)
!q.peekAll()
}.equalUnderTheLaw(IO.just(emptyList()), EQ())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh cool Github marks the range of your comments "Comment on lines +93 to +107" 😍

But this only makes sense if you can verify that it generated at least one empty queue (you'd have to generate lists of ints in that case).

All these queues are empty, I'll update for more clarity but n here is the capacity of the Queue.

arrow-fx/src/test/kotlin/arrow/fx/QueueTest.kt Outdated Show resolved Hide resolved
arrow-fx/src/test/kotlin/arrow/fx/QueueTest.kt Outdated Show resolved Hide resolved
arrow-fx/src/test/kotlin/arrow/fx/QueueTest.kt Outdated Show resolved Hide resolved
arrow-fx/src/test/kotlin/arrow/fx/QueueTest.kt Outdated Show resolved Hide resolved
arrow-fx/src/test/kotlin/arrow/fx/QueueTest.kt Outdated Show resolved Hide resolved
arrow-fx/src/test/kotlin/arrow/fx/QueueTest.kt Outdated Show resolved Hide resolved
arrow-fx/src/test/kotlin/arrow/fx/QueueTest.kt Outdated Show resolved Hide resolved
arrow-fx/src/test/kotlin/arrow/fx/QueueTest.kt Outdated Show resolved Hide resolved
}
}

"$label - can take and offer at capacity" {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we're testing here is that you can tryOfferAll 2 items for a dropping queue with capacity 1 and 1 taker.

Without the taker being registered, tryOfferAll would have to drop one of the items offered and thus tryOfferAll would fail and return false.

Co-Authored-By: Alberto Ballano <aballano@users.noreply.github.com>
Co-Authored-By: Jannis <overesch.jannis@gmail.com>
@@ -67,13 +67,13 @@ interface Dequeue<F, A> {
fun peekAll(): Kind<F, List<A>>
}

/** A polymoprhic effect typeclass that allows [Dequeue]'ing values from a [Queue]. */
/** A polymorphic effect typeclass that allows [Enqueue]'ing values from a [Queue]. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** A polymorphic effect typeclass that allows [Enqueue]'ing values from a [Queue]. */
/** A polymorphic effect typeclass that allows [Enqueue]'ing values to a [Queue]. */

Missed that on the first read through

@nomisRev
Copy link
Member Author

nomisRev commented Mar 11, 2020

@1Jajen1 after some discussion with @raulraja I decided we should not expose shutdown at this point.

It's not a pattern we follow in the other concurrency primitives, nor is it a pattern that we actively support. Which is exceptions for control flow, in this case cancelation flow.

On top of that Arrow Fx offers automatic cancelation support, and powerful combinators to prevent cancelation breaking and leaking to occur. So we should stay close to that.

Additionally, we can always come back to this and add shutdown. The other way around is more difficult.

EDIT: not all tests descriptions you gave were clear to me, but this should now cover all behavior I believe. Please also review the docs again, I added some more documentation. cc\ @aballano.

@aballano
Copy link
Member

aballano commented Mar 12, 2020

Saw a test failing in between commits, might be flaky

 arrow.fx.QueueTest > BoundedQueue - takeAll returns emptyList with waiting suspended takers FAILED
     arrow.fx.internal.TimeoutException: Duration(amount=5, timeUnit=SECONDS)

Copy link
Contributor

@danimontoya danimontoya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nits 🎉 😄

arrow-fx/src/main/kotlin/arrow/fx/Queue.kt Outdated Show resolved Hide resolved
Comment on lines +483 to +484
if (take != null) later { take(value) }.map { true }
else just(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (take != null) later { take(value) }.map { true }
else just(true)
take?.let { later { take(value) }.map { true } } ?: just(true)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't find ?: clearer than if-else expression here.

* [Surplus]: Contains a queue of values and two maps of registered id & offer/shutdown callbacks waiting to
* offer once there is room (if the queue is bounded, dropping or sliding).
*
* [Shutdown]: Holds no values, an offer or take in Shutdown state creates a QueueShutdown error.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this case missing? and QueueShutdown ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed shutdown support, thanks for noticing!

Copy link
Member

@aballano aballano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave a look at the tests and they look good IMO

@@ -14,6 +22,18 @@ import io.kotlintest.shouldNot
fun <A> A.equalUnderTheLaw(b: A, eq: Eq<A>): Boolean =
shouldBeEq(b, eq).let { true }

fun <A> IOOf<A>.equalUnderTheLaw(b: IOOf<A>, EQA: Eq<A> = Eq.any(), timeout: Duration = 5.seconds): Boolean =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this explicit equalUnderTheLaw? Cannot we test like before using the one above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's for a proper assertion message. The one implement through IO.toString is not very meaningful.

#30 (comment)

@aballano aballano merged commit 2c0cdbe into master Mar 17, 2020
@aballano aballano deleted the sv-queue-part-2 branch March 17, 2020 11:47
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants