-
Notifications
You must be signed in to change notification settings - Fork 604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reimplemented Channel
in terms of Queue
#2856
Reimplemented Channel
in terms of Queue
#2856
Conversation
Working on this PR uncovered a bug in Cats Effect 3.4.0-RC1 (fixed in typelevel/cats-effect#3180), so this PR is based on a snapshot. If it's accepted, we'll do an RC2. |
// allocate once | ||
@inline private final def closed[A]: Either[Closed, A] = _closed | ||
private[this] final val _closed: Either[Closed, Nothing] = Left(Closed) | ||
private final val rightUnit: Either[Closed, Unit] = Right(()) | ||
private final val rightTrue: Either[Closed, Boolean] = Right(true) | ||
private final val rightFalse: Either[Closed, Boolean] = Right(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh, why dump all of these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can bring them back. I think I chopped them out a while ago.
closedR.complete(()).map { | ||
case false => Left(Channel.Closed) | ||
case true => Right(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also was going to suggest you use the cached left, but I see they're gone 😅
closedR.complete(()).map { | |
case false => Left(Channel.Closed) | |
case true => Right(()) | |
} | |
closedR.complete(()).ifF(Either.unit, Left(Channel.Closed)) |
def sendAll: Pipe[F, A, Nothing] = | ||
_.evalMapChunk(send(_)) | ||
.takeWhile(_.isRight) | ||
.onComplete(Stream.exec(close.void)) | ||
.drain |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was reminded of #2890.
// you can do this more efficiently, just proves a point | ||
Stream.eval(takeN).repeat.takeWhile(!_.isEmpty).flatMap(Stream.chunk(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// you can do this more efficiently, just proves a point | |
Stream.eval(takeN).repeat.takeWhile(!_.isEmpty).flatMap(Stream.chunk(_)) | |
// you can do this more efficiently, just proves a point | |
Stream.eval(takeN).repeat.takeWhile(!_.isEmpty).unchunks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh nice. Is that new?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was one of my first PRs to FS2 :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've honestly wanted that convenience function for so long. I spend my whole life doing it.
|
Wow. How many elements were you trying to stick into that queue?? |
Co-authored-by: Arman Bilge <armanbilge@gmail.com>
Probably this? fs2/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala Lines 46 to 48 in ab256c3
fs2/core/shared/src/main/scala/fs2/concurrent/Topic.scala Lines 160 to 162 in 9ac3e43
|
So it seems like anyone who was using |
Yes. They shouldn't do that. :-P I mean, we can special case large bounds, but genuinely this just isn't a case which should be supported. It only worked before by coincidence. |
This reverts commit 3ff9ee2.
This is being caused by typelevel/cats-effect#3187 |
Getting better! Not quite better across the board yet, but doing really well. Because of how things are optimized, I'm pretty sure that the new implementation gets better the faster the producers run relative to the consumer, though the numbers don't seem to intuitively bear that out. Still fiddling with things, and the upstream Before
After
|
…and a little better:
|
@SystemFw I think this is ready for another look. Your Depends on a hash snapshot for the time being, hence why this is marked as draft. |
Updated with benchmark results. The simple cases got faster with the consolidated
|
Fiddled around some more with the performance cases. I took a flier at porting one of the most dramatic performance deltas from the Cats Effect
I suspect this is still a worthwhile change for a few reasons (e.g. the GC tends to do a lot better with contiguous array structures rather than linked-list equivalents, and this is not well measured in microbenchmarks), and I have some ideas for how to improve it a bit further, but I think we can probably foreclose seeing the kinds of huge performance leaps in |
mm, I don't want to be extremely gatekeeper-y, but this is the type of worst case scenario I feared: changing something that works, complicating the implementation (double refs, complex sentinel logic, Btw, what do you mean by linked list here? If it the fact that the current impl is based on List? That was benchmarked against a couple of alternatives, but it doesn't seem fundamental one way or another |
That's the benchmark results of List vs Vector: #2751 (comment) In |
I did my final set of experiments! Tldr, I'm closing this PR. So my last set of experiments looked at GC pressure (using The results were the exact opposite. The implementation on this PR imposes about double the heap pressure, GC count, GC time, and survivor count. I genuinely have no intuition for why, but at this point I think we can safely conclude that the performance of this implementation is somewhere between "worse" and "much worse" than the performance of the old version. Full output attached.
As an aside, I have separately verified that the performance improvements in CE's All in all, I think we can conclude a few things:
I'll open a new PR bringing over the tests from this PR. :-) |
Cherry-picked additional tests from #2856
Do not merge. Depends on a snapshot of Cats Effect 3.4 (well, technically could be implemented without the snapshot, but it's better to wait for. The snapshot in question includes the (still unmerged) improvements totryTakeN
)Queue
on anyAsync[F]
(typelevel/cats-effect#2885 and typelevel/cats-effect#2914), but critically does not include an implementation of typelevel/cats-effect#2890, which is unquestionably the main bottleneck here:Before:
After:
To me, the fact that this is even close to the hand-rolled version is pretty cool, and a slightly less naive
stream
would probably put it almost on-par even without the optimizedtryTakeN
. Once we get the latter though this should be head and shoulders above the prior implementation.Closes #2852