Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use bounded rather than synchronous in parJoin #3027

Closed
wants to merge 1 commit into from

Conversation

djspiewak
Copy link
Member

Copy/pasting some of my explanation from this Discord discussion:

The problem with synchronous is it basically eliminates parallel execution once you fill to maxOpen. As a reminder, even with synchronous, you will still have maxOpen chunks in memory since the streams have all run at least one step, it's just that these chunks are in memory in the flatMap on the Deferred, rather than directly in the Channel. But the parallel execution thing is what I'm thinking right now. Imagine the downstream runs strictly slower than the upstreams. You will have the following stages in order:

  1. Nothing has happened yet
  2. Upstreams start running in parallel (parallelism factor = maxOpen)
  3. Upstreams all produce their first chunk and block
  4. Downstream consumes a single chunk
  5. One upstream begins executing on its second chunk
  6. Downstream consumes a second chunk
  7. One other upstream begins executing on its second chunk
  8. etc etc…

In other words, due to synchronous, parJoin is not actually parallel in a lot of cases! It is parallel resource consumption, but it is not parallel execution.

bounded fixes this because it allows downstream to consume up to maxOpen chunks at once, which in turn unblocks up to maxOpen of the upstreams. With bounded(maxOpen), you get a sequence which looks more like the following:

  1. Nothing has happened yet
  2. Upstreams start running in parallel (parallelism factor = maxOpen)
  3. Upstreams all produce their first chunk and proceed to their second ones
  4. Downstream consumes up to maxOpen chunks
  5. More upstreams produce chunks, maybe getting blocked because they're running too fast, maybe not

Maximum parallelism factor is maxOpen + 1 (all upstreams + the downstream). Maximum memory footprint is maxOpen * 2. At steady-state, if all are proceeding at the same rate, the parallelism will equal the maximum parallelism. If the downstream is slower than the upstreams, then the parallelism will converge to maxOpen (whereas in the synchronous implementation, when the downstream is slow the parallelism converges to 1).

@armanbilge
Copy link
Member

armanbilge commented Oct 27, 2022

Some related commits I found digging through the blame.

Edit: Looks like the synchronous queue was introduced in:

@djspiewak
Copy link
Member Author

Paul Chiusano in 2016. Wow. This is genuinely among the oldest lines of code remaining in the project.

What I find fascinating is, if you look in scalaz-stream, there's a predecessor to parJoin which has the same signature and functionality. I wrote that version back sometime in 2014, and it used queue with a bound of maxOpen to accept results. :-P So I think Paul's rewrite was the point at which it got intentionally adjusted. I wish he had written more about his rationale at the time.

@djspiewak
Copy link
Member Author

In the interest of reporting negative results, I modified the test to Blackhole tunably in the upstreams or the downstreams. The hypothesis being that this change is most impactful when the upstreams are faster than the downstream. This corresponds to a flow bias of downstream in the following:

Before

[info] Benchmark                    (concurrent)  (flowBias)   Mode  Cnt   Score   Error  Units
[info] ConcurrentBenchmark.parJoin             1    upstream  thrpt   10  28.166 ± 0.208  ops/s
[info] ConcurrentBenchmark.parJoin             1  downstream  thrpt   10  27.395 ± 0.410  ops/s
[info] ConcurrentBenchmark.parJoin            16    upstream  thrpt   10  45.193 ± 0.574  ops/s
[info] ConcurrentBenchmark.parJoin            16  downstream  thrpt   10  44.430 ± 0.984  ops/s
[info] ConcurrentBenchmark.parJoin           256    upstream  thrpt   10  44.172 ± 0.717  ops/s
[info] ConcurrentBenchmark.parJoin           256  downstream  thrpt   10  43.388 ± 0.575  ops/s

After

[info] Benchmark                    (concurrent)  (flowBias)   Mode  Cnt   Score   Error  Units
[info] ConcurrentBenchmark.parJoin             1    upstream  thrpt   10  27.524 ± 0.362  ops/s
[info] ConcurrentBenchmark.parJoin             1  downstream  thrpt   10  29.582 ± 3.111  ops/s
[info] ConcurrentBenchmark.parJoin            16    upstream  thrpt   10  43.907 ± 0.796  ops/s
[info] ConcurrentBenchmark.parJoin            16  downstream  thrpt   10  43.086 ± 0.627  ops/s
[info] ConcurrentBenchmark.parJoin           256    upstream  thrpt   10  44.689 ± 0.792  ops/s
[info] ConcurrentBenchmark.parJoin           256  downstream  thrpt   10  44.322 ± 0.777  ops/s

After + #2856

[info] Benchmark                    (concurrent)  (flowBias)   Mode  Cnt   Score   Error  Units
[info] ConcurrentBenchmark.parJoin             1    upstream  thrpt   10  26.651 ± 0.304  ops/s
[info] ConcurrentBenchmark.parJoin             1  downstream  thrpt   10  25.519 ± 0.201  ops/s
[info] ConcurrentBenchmark.parJoin            16    upstream  thrpt   10  44.321 ± 0.717  ops/s
[info] ConcurrentBenchmark.parJoin            16  downstream  thrpt   10  45.148 ± 0.575  ops/s
[info] ConcurrentBenchmark.parJoin           256    upstream  thrpt   10  45.709 ± 0.826  ops/s
[info] ConcurrentBenchmark.parJoin           256  downstream  thrpt   10  43.606 ± 0.634  ops/s

Summary

It uh… doesn't matter? None of it? I think we're bounded by something else here and it's not intuitive to me what that might be.

@mpilquist
Copy link
Member

So keep as is for now until we can find better motivation?

@djspiewak
Copy link
Member Author

Yeah I still feel like this is right on first principles, but I can't measure an improvement, so let's close this for now.

@djspiewak djspiewak closed this Nov 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants