Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

["Discussion"] Stream TerminalOps #245

Closed
nomisRev opened this issue Aug 6, 2020 · 4 comments · Fixed by #260
Closed

["Discussion"] Stream TerminalOps #245

nomisRev opened this issue Aug 6, 2020 · 4 comments · Fixed by #260
Assignees
Labels
discussion Discussion topic for improvement of Arrow enhancement New feature or request help wanted Extra attention is needed

Comments

@nomisRev
Copy link
Member

nomisRev commented Aug 6, 2020

What version are you currently using?

0.11.0-SNAPSHOT

What would you like to see?

Less redundant solution than TerminalOps, we now duplicate some operations in TerminalOps and in the Stream API.
Both offer the same behavior, with slightly different signatures. See #244

I.e. s.foldChunk/s.compile().foldChunks, s.fold/s.compile().fold , etc

Discussed proposal with @pablisco on Slack.

Currently some operations such as fold, foldChunks, etc are defined both on Stream<A> and TerminalOps<A>.

fun <A, B> Stream<A>.fold(b: B, f: (A, B) -> B): Stream<B>
suspend fun <A, B> TerminalOps<A>.fold(b: B, f: (A, B) -> B): B

The first signatures is not properly typed since it doesn't return a Stream<B> but a single value B wrapped in a Stream, so exposing as suspend fun <A, B> Stream<A>.fold(b: B, f: (A, B) -> B): B would be more accurate.

This would allow us to remove TerminalOps, TerminalResourceOps and remove duplicated operators between the 3.

@nomisRev nomisRev added enhancement New feature or request help wanted Extra attention is needed discussion Discussion topic for improvement of Arrow labels Aug 6, 2020
@pablisco
Copy link
Contributor

pablisco commented Aug 6, 2020

One other possibility could be to mark terminal operations with namespacing:

suspend fun <A> Stream<A>drain() : Unit
suspend fun <A, B> Stream<A>.drainFold(b: B, f: (A, B) -> B) : B 
suspend fun <A> Stream<A>.drainEach((A) -> Unit) : Unit
suspend fun <A> Stream<A>.drainToList() : List<A>
suspend fun <A> Stream<A>.drainToList(f: (A) -> B) : List<B>
suspend fun <A> Stream<A>.drainToSet() : Set<A>
suspend fun <A> Stream<A>.drainToSet(f: (A) -> B) : Set<B>

etc.

I guess the caveat of having to do compile().terminalOp() is that it is arguably easy to forget to do a terminal operation. Especially for newcomers to the API that come from other components like sequence or flow from the stdlib. On these, it's quite common to toList() as a terminal operation of a sequence. The potential downside of this is that it's not particularly explicit that we are eagerly processing the stream and blocking the thread.

Having a namespace with prefixes would mean that as the user types fold then both options show up on the autocomplete from the IDE.

@nomisRev
Copy link
Member Author

nomisRev commented Aug 6, 2020

@pablisco I am not sure I 100% follow your comment. Perhaps fold wasn't the best example.
If we use toList as an example, we'll find following signatures.

fun <A> Stream<A>.toList(): Stream<List<A>>
suspend fun <A> TerminalOps<A>.toList(): List<A>
suspend fun <A> ResourceTerminalOps<A>.toList(): Resource<List<A>>

This exposes the same functionality in 3 different ways, and we can cover the same behavior with.

-fun <A> Stream<A>.toList(): Stream<List<A>>
-suspend fun <A> TerminalOps<A>.toList(): List<A>
-suspend fun <A> ResourceTerminalOps<A>.toList(): Resource<List<A>>
+suspend fun <A> Stream<A>.toList(): List<A>

I believe this will less confusing, and would also be more aligned with the standard library Sequence (and also KotlinX's Flow).

A summary with the problematics of the original signatures:

  • fun <A> Stream<A>.toList(): Stream<List<A>>: doesn't correctly model it's behavior since this doesn't return a Stream, or rather ir returns Stream.just or a single element Stream. which is more accuretly described as suspend () -> A.
  • suspend fun <A> TerminalOps<A>.toList(): List<A>: models the behavior correctly but is defined on an intermediate class. Making the API more complex, and dramatically increasing API surface
  • suspend fun <A> ResourceTerminalOps<A>.toList(): Resource<List<A>>: dramatically increases API surface to expose duplicated APIs.

@1Jajen1
Copy link
Member

1Jajen1 commented Aug 6, 2020

This might be naive since I have not used Stream a lot, but isn't every operation Stream<A> -> B terminal and eager by type already, if so what is the point of the indirection with TerminalOps?

This means we'd most likely just need one primitive which is a fold over all results/chunks a stream produces.

@nomisRev
Copy link
Member Author

nomisRev commented Aug 6, 2020

You're absolutely correct @1Jajen1, that's the reason for this ticket.

In practice this is not always the case tho, since most libraries expose Stream<A> -> Stream<List<A>> and Stream<A> -> List<A>. Which results in quite different behavior, and can result in some gotchas. I.e. Stream<A> -> Stream<List<A>> can be safely called from a non-suspending context but doesn't do anything. Yet you might expect Stream.toList() to collect into a List.

Since our Stream is a DSL around suspend, users can however easily use s.effectMap { otherStream.toList() } with a suspend fun toList(), instead of s.flatMap { otherStream.toList() }. So there is no strong need for us to expose both IMO.
So there is no very strong point for TerminalOps, but it was included in the original design which was inspired by FS2. In FS2 compilation is used to go from Stream<F, A> to F[_].

EDIT: I wanted to bring this up for discussion for those already trying Arrow Fx Streams before doing the refactor.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
discussion Discussion topic for improvement of Arrow enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants