Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: a queue thingy for batching work with multiple threads + tunable throughput #34185

Closed
wants to merge 1 commit into from

Conversation

jrevels
Copy link
Member

@jrevels jrevels commented Dec 23, 2019

This is a draft (EDIT: Do'h...I forgot to actually switch to draft mode and it seems like GitHub won't let you switch once opened) RFC PR because I'm not sure whether it should be in Base or not (or even in this file); I'll bother with actually adding tests etc. if the consensus is that it is appropriate to live here. Otherwise, I'll just plop it into its own package.

I'm not sure what to actually name this; we've been calling it batch_channel but I kind of hate that name. Jameson called it a fair-scheduled queue, so maybe there's a better name that can be derived from that.

Anyway, I've found this function to be a pretty nice little self-contained abstraction for stringing together throughput-limited, multithreaded work pipelines. It really makes building batch-process pipelines convenient, for example see the following psuedocode:

# asynchronously fetch EEGs to work on, but keep no more than
# 40 in memory at once and use no more than 5 concurrent tasks
eegs = what_should_i_name_this(fetch_eeg, locations, channel_size=40, 
                               max_concurrent_tasks=5)

# asynchronously compute spectrograms, but keep no more than
# 20 in memory at once and use no more than 10 concurrent tasks;
# in a real application the function here could also use a cache
spectrograms = what_should_i_name_this(eeg -> DSP.spectrogram(mean(eachrow(eeg))), eegs; 
                                       channel_size=20, max_concurrent_tasks=10)

# do some other work as spectrograms become available to work on
results = what_should_i_name_this(do_more_work, spectrograms; channel_size=10, 
                                  max_concurrent_tasks=3)

collect(results)

Is there any desire to make this available as a built-in threading abstraction? Regardless, would love help bikeshedding the name 😁

Thanks to @vtjnash + @SimonDanisch for tweaks to the original implementation to render this amenable to thread migration (once that's a thing) and @ararslan for replacing my little destructuring hack with proper Some usage.

@jrevels jrevels changed the title a queue thingy for batching work with multiple threads + tunable throughput RFC: a queue thingy for batching work with multiple threads + tunable throughput Dec 23, 2019
@tkf
Copy link
Member

tkf commented Dec 23, 2019

Is this a threaded lazy unordered map? If there is going to be Threads.map (order-preserving), I think it makes sense to call it Threads.map_unordered or Threads.imap_unordered. I borrowed the naming from multiprocessing.pool.Pool.imap_unordered in Python.

It really makes building batch-process pipelines convenient

Out of curiosity, why not stitch those functions together and turn them into a pipeline using IterTools.imap or Base.Generator and then feed it to a threaded map in the end?

put!(channel, f(something(x)))
return false
end
fetch(task) && break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to @spawn if you fetch(task) right away?

Copy link
Member Author

@jrevels jrevels Dec 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been a while since I reasoned through this so hopefully I don't screw it up 😅

This is basically our trick to allow for migration of work between threads for each work item
(though I don't know what the status is w.r.t. the scheduler actually being able to do this right now). The inner spawn for each work item keeps chunks of work from being locked to the specific thread of the outer spawn, while the outer spawn provides the actual parallelism and is throughput-limited in the desired manner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. I didn't know this trick.

put!(channel, f(initial[1]))
state = initial[2]
isdone = false
next = () -> begin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better to do this with another Channel ("share memory by communicating")? Is this for performance?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better to do this with another Channel

Ah, that does sound like a better idea! I'll try it out

@ararslan ararslan added the multithreading Base.Threads and related functionality label Dec 23, 2019
@jrevels
Copy link
Member Author

jrevels commented Dec 23, 2019

threaded lazy unordered map

Kind of. I guess the main detail is that it's throughput-limited by picking a given channel size; the channel is otherwise filled up as eagerly as possible.

Out of curiosity, why not stitch those functions together and turn them into a pipeline using IterTools.imap or Base.Generator and then feed it to a threaded map in the end?

You definitely could just fuse together a single large pipeline and then pass it to this thing. The benefit of stringing them together is just being able to manually tune a desired resource profile (esp. w.r.t. memory consumption) for each stage.

@tkf
Copy link
Member

tkf commented Dec 24, 2019

manually tune a desired resource profile (esp. w.r.t. memory consumption) for each stage

Hmm... I guess I still don't understand this. If you fuse the pipeline then there is no need to sacrifice the memory consumption as the intermediate objects can be GC'ed right away. Why would you want to tune something when you can have ideal memory consumption for free? You can also limit the CPU usage in the threaded map which recessives a fused pipeline (so no extra memory consumption).

@vchuravy
Copy link
Member

I think it makes sense to call it Threads.map_unordered or Threads.imap_unordered. I borrowed the naming from multiprocessing.pool.Pool.imap_unordered in Python.

I would rather borrow from the C++ parallel STL, which gives you execution policies as arguments to functions, semantically this is still a map, and it seems to me to go against Julia's grain to differentiate different implementation by name and not by dispatch. (This is the reason why I can't stand pmap)

@tkf
Copy link
Member

tkf commented Dec 26, 2019

differentiate different implementation by name and not by dispatch

I think I half agree. But I'd argue that, for higher order functions like map, algebraic properties of given function interact a lot with the execution policy so I wouldn't discard them as "implementation details." I guess my point is clearer if you consider that map is (at least conceptually) derived from reduce; i.e., map(f, xs) = mapreduce(x -> [f(x)], vcat, xs; init=Unit{}[]). There are mapfoldl(f, op, xs), mapreduce(f, op, xs), and hypothetically mapreduce_unordered(f, op, xs) which require op to be any binary function, associative binary function, and commutative binary function, respectively. They then give you three variants of map automatically. Since a caller needs to assert the algebraic properties at the call site, assigning different names to the different reduce-based functions expecting different algebraic properties does not sound so bad to me. Of course, it would be great if we have a trait system for querying and asserting algebraic properties (and other properties like purity/thread-safety) of op and then dispatch on that. But, I think short verbs like foldl and reduce for common set of algebraic properties are useful as high-level interfaces on top of such hypothetical lower-level trait-based interface. From this point of view, adding "unordered reduce" and hence "unordered map" is pretty much a natural extension to foldl and reduce.

@tkf
Copy link
Member

tkf commented Dec 26, 2019

I started experimenting "unordered reduce" route in JuliaFolds/Transducers.jl#112

@jrevels
Copy link
Member Author

jrevels commented Jan 3, 2020

If you fuse the pipeline then there is no need to sacrifice the memory consumption as the intermediate objects can be GC'ed right away.

I think maybe this makes more sense if you don't necessarily know what your total resource pool/workload is going to be up front (e.g. if your computation backs a service or whatever), and you have the possibility of producers queueing up results far faster than consumers can consume them, especially if the producers are computationally cheap + produce large intermediate artifacts compared to consumers.

If you have a sense of the "granularity" of the workload/resource pool (e.g. you know upfront you can allocate a new node with X threads and Y RAM if you need to or if an EC2 spot instance becomes available, etc.), then these throughput-limited work queue thingies just become a mechanism to allocate consumers/producers in a manner that matches that granularity, e.g. you can scale up the number of consumers feeding on a producer post-hoc as other resources become available, or allocate more producers overtop upstream work queues, etc. The size/task limits make it easy to tune that granularity and provide manual bounds/guarantees for per-consumer/producer resource utilization.

I recently saw dask/distributed#2602 (linked from that backpressure blog post that's been making the rounds), and was like "hey, that's the thing I'm very, very naively trying to allow callers to manually configure via this channel_size business" 😛

@tkf
Copy link
Member

tkf commented Jan 3, 2020

I get that backpressure is important but I still don't get why you want to introduce it at intermediate processing levels. Isn't it like you are introducing the problem (possible excessive memory use due to buffering) and then solving it right away (by bounded Channel)? Isn't it better to avoid introducing the problem in the first place? In a way, fused functions are like "instant back pressure" as you can't push a new item until the last item is processed through the whole pipeline.

I find thinking in terms of consumer/producer is not really an optimal mindset when it comes to data parallel computation. This may just be my skewed view, but it is probably a byproduct of stretching iterator framework to parallel computation. I think this is a very limiting approach as iterators are inherently "stateful." Rather, I think switching to the transducer approach is much better as parallelism is pretty straightforward since you do fusion/composition on the function side as opposed to the data side. For example, you can automatically get a program that respect data locality if you fuse the pipeline. On the other hand, you may be moving around data constantly across CPUs or machines if you do it with the consumer/producer approach. I'd imagine working it around would require you to build a pretty non-trivial scheduling system.

Having said that, transducer-based approaches like JuliaFolds/Transducers.jl#112 do not disallow finer throttling. You can always shove Channels at intermediate stages. I still don't think this is necessary if everything is CPU-bound. But I can see that you'd want it if some of the pipelines are I/O-bound and you need to fine-tune the performance. OTOH, I don't think this should be the default approach. I think using the number of workers larger than nthreads with fused pipelines should work most of the time (unless you already know the memory requirement is dangerously close to the physical RAM).

@tkf
Copy link
Member

tkf commented Jan 6, 2020

The latest Transducers.jl has channel_unordered which is inspired by this PR. You can do something like

using Transducers

makeinput() = Channel() do ch
    for i in 1:3
        put!(ch, i)
    end
end

output = channel_unordered(Map(x -> x + 1), makeinput())
output = channel_unordered(x + 1 for x in makeinput()) # equivalent

You can also combine filter and flatten like channel_unordered(Filter(p) |> MapCat(f), xs) or equivalently channel_unordered(y for x in xs if p(x) for y in f(x)). Mapping, filtering, flattening, and any stateless transducers are executed in parallel.

@jrevels
Copy link
Member Author

jrevels commented Jan 15, 2020

Awesome stuff @tkf!

I'm perfectly happy with this living in Transducers, seems a natural fit :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
multithreading Base.Threads and related functionality
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants