-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel sequence runs consequentially (non-parallel) #74
Comments
Thx for reporting, will take a look. |
If you structure the workflow like this, the let handle i = async {
do! Async.Sleep 3000
printfn "%i" i
return () }
seq {1..100} //original data
|> AsyncSeq.ofSeq
|> AsyncSeq.indexed
|> AsyncSeq.groupBy (fun (i, _) -> i % 32L) //slicing by 32 threads
|> AsyncSeq.mapAsyncParallel
(snd >> AsyncSeq.map snd >> AsyncSeq.iterAsync handle) //parallel
|> AsyncSeq.iter ignore //iteration
|> Async.RunSynchronously Otherwise, the execution across groups is in parallel, but the groups are themselves generated sequentially. |
I see. So it is not a bug, it is intended :) Should I close it or you? I don't know |
Yes, it is intended. AsyncSeq is by default "sequential" and any parallelism imposed via an operation applies to the previous sequence only. So for example, in your workflow, the mapAsyncParallel parallelizes across the groups created by groupBy, but the groups themselves are iterated sequentially, as is the upstream sequence. In other words, the parallelism does not propagate all the way upstream. In order to get the parallelism you want, you just need to re-arrange the workflow a bit. And this type of data-driven parallelism I see used quite often. Close if you think your issue was addressed. Perhaps this can be made more clear? Or additional parallelism operations implemented? |
There is one operation which I lack: Right now there is only: Don't know why |
I ended up resolving a very similar set of issues as documented here, with a testbed by implementing module AsyncSeq =
type Async with
static member Throttle degreeOfParallelism f =
let s = new System.Threading.SemaphoreSlim(degreeOfParallelism)
fun x -> async {
let! ct = Async.CancellationToken
do! s.WaitAsync ct |> Async.AwaitTask
try return! f x
finally s.Release() |> ignore }
let mapAsyncParallelThrottled degreeOfParallelism (f: 't -> Async<'u>) : AsyncSeq<'t> -> AsyncSeq<'u> =
let throttle = Async.Throttle degreeOfParallelism
AsyncSeq.mapAsyncParallel (throttle f) @dsyme do you perhaps have an answer, or some thoughts for @Szer's question to close out the tangent this thread became?
I also have some other follow-on questions this is raising for me:
|
I'm not sure why it was left behind - it sounds like it would make perfect sense. 1 - yes 2 - that seems much like Async.Parallel with maxDegreeOfParallelism https://fsharp.github.io/fsharp-core-docs/reference/fsharp-control-fsharpasync.html#Parallel 3 - I don't think so. A separate throttling is needed at each construct that introduces parallelism, it's not compositional throughout Async computations 4 - no, we don't introduce new methods just to support pipelining/currying |
Thanks for the follow-up. Some rebuttals / clarifications; apologies for the walls of text:
Hm, mea culpa: it seems as of
It would provide a similar throttling effect by having the computation first acquire a semaphore, and release it on exit. It would be true to say that it would not provide any useful function when compared to simply using the However the idea would be that this operator can be applied generically over any So the question, for me, remains: would an orthogonal operator to govern the degree of parallelism without introducing that parallelism be acceptable in
I'm not sure what you mean by this - I appreciate that you can't and/or wouldnt want to limit parallelism in a compositional way. I can appreciate that each of: For each, there are related ways of constraining that parallelism, respectively:
I'm happy to accept a no on the decoupling of the throttling aspect from the parallizing aspect (and the general principle that anything that introduces parallelism should also offer a mechanism to optionally limit that too). However I do believe that having an operator whose function is solely to throttle within a situation where the parallelism is already present for other reasons is useful so wanted to explain myself properly so its not ruled out without the goal being thoroughly understood.
I can appreciate that in general providing pipelining-friendly sister methods for every case would not be a good design. My reason for suggesting it in this instance is that, as illustrated in my expansion of point 3, its the only one Async/AsyncSeq operator introduces parallelism that is not a) pipelining friendly and b) explicitly named with the term It is my experience that:
While I can appreciate that In summary, its my belief that an exception should be made to the general principle for an explicit So, the same questions again with my assumed answers:
|
Description
I've tried to implement some of my work with asyncSeq.
I have a big job list which I want to process in parallel with some throttling (32 threads maximum for example) and then feed results as stream to other async tasks (saving to disk etc).
Async.Parallel is not good for me because it doesn't have max thread setting
I've also tried slicing job list myself but it was very memory inefficient so that's the moment when I switched to AsyncSeq
Repro steps
Just run the code below
Expected behavior
Expected to see numbers 2,4,6...64 after 3 seconds almost simultaneously (in any order), after 3 more seconds there should be numbers 66,68,70..128 etc...
Actual behavior
Number appears one by one with 3 sec interval (no parallel at all)
Known workarounds
Do parallelism myself is only known workaround
Related information
Win10
FSharp.Control.AsyncSeq (2.0.16)
The text was updated successfully, but these errors were encountered: