Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large reduced result size #136

Open
orenbenkiki opened this issue Jan 8, 2020 · 12 comments
Open

Large reduced result size #136

orenbenkiki opened this issue Jan 8, 2020 · 12 comments

Comments

@orenbenkiki
Copy link

Do I understand correctly that the reduce function must take two instances of some type and return a new instance of the same type?

I have cases where the operations work on large data (e.g., vectors with tens or or hundreds of thousands of elements). In such a case, it is useful to have an abstraction of an accumulator, which is initialized to a zero state, and can sum into itself the results from multiple steps.

For example, this allows allocating one such accumulator per thread, which would collect all the results of the steps that executed on that thread. Once these are complete, then the threads can perform a parallel reduction tree ending with the final result in the accumulator of the 1st thread, which would be returned.

This also scales when combining multi-processing and multi-threading (a hypothetical dtreduce). The final result from each worker thread would be sent back to the invoking process, and summed into its accumulator (this can also be done in parallel using multiple threads).

This would minimize both garbage collection churn and data movement between processes for optimal performance.

@tkf
Copy link
Member

tkf commented Jan 8, 2020

Good question. I should have clarified it in the documentation.

Do I understand correctly that the reduce function must take two instances of some type and return a new instance of the same type?

Actually, this is not the case. As long as you have "semantically associative" reducing function, you can return a different type. Also, since the basecase of reduce/dreduce is implemented as foldl, you can mutate the first argument. That is to say, you can use in-place reducing function op! as long as its non-mutating version

op(x, y) = op!(deepcopy(x), y)

is associative. For example, collect(xf, input) is implemented as something similar to

julia> reduce(append!, Map(identity), 1:10; init=OnInit(() -> Float64[]))
10-element Array{Float64,1}:
  1.0
  2.0
  3.0
  4.0
  5.0
  6.0
  7.0
  8.0
  9.0
 10.0

since (x, y) -> append!(deepcopy(x), y) is (more or less) vcat which is associative.

Note that I use init = OnInit(() -> Float64[]). This is because I need to initialize the accumulator Float64[] for each thread (actually each basecase) to avoid mutating it from all threads at once.

See also: https://tkf.github.io/Transducers.jl/dev/examples/tutorial_parallel/#Example:-ad-hoc-histogram-1

(Actual implementation of collect etc. uses BangBang.append!! so that init part is much more elegant and generic. You may find BangBang.jl useful if you need to write reducing functions on mutable objects.)

There is also an interface for using different reducing function for basecases and for combining them. But that's not documented yet. (See _push! used for append_unordered! and the explanation why it's OK to use it.)

@orenbenkiki
Copy link
Author

orenbenkiki commented Jan 11, 2020

I looked in the documentation and I didn't quite "get" it.

Looking at my lower-level code and trying to abstract it, if I try to abstract my various parallel loops, I see the following pattern:

  • Three types, which I'll call Local, Collected and Collector; Assume all types are non-trivial (say, contain large vectors or data frames or other types which require memory allocation).

  • For each thread, we need to allocate a single instance of each type. This instance needs to be properly initialized, so either it must have a constructor w/o arguments, or one must provide some argument-less function that returns a new instance.

  • At the start of each iteration/step, the Local and Collected variables need to be reset. For this one must provide another function which mutates the variable. For example, such a function may set all array elements to 0.

  • Each iteration/step then uses whatever variables it wants from the global environment, the iteration index and/or the step value being mapped. It also uses theLocal variables for intermediate state, to fill the Collected variable. This can be thought of as another function. Ideally this function performs no memory allocations whatsoever as all such allocations have already been done once per thread when creating the Local and Collected data.

  • After the iteration/step is complete, the Collected is merged into the thread's Collector which need not be of the same type. This requires another function.

  • Threads start as working. When a thread is done iterating and has no more steps/iterations to do, it has a Collector state. If it is the first such thread, it becomes pending. If there's another pending thread, our thread becomes merging - it marks the pending thread as done and merges its Collector into its own Collector using another function. This repeats as long as there's at least one other pending thread. Otherwise, if any other thread is working, our thread becomes pending. Otherwise all other threads are done and so are we, and our Collector contains the final answer.

This method maximizes parallelism and minimizes memory allocations. The interface to such a generic map/reduce loop would require:

  • data: Indices and/or Values to process
  • type: Collector
  • type: Collected (possibly the same as the Collector)
  • type: Local (possibly empty)
  • function: new_collector() :: Collector
  • function: new_collected() :: Collected
  • function: new_local() :: Local
  • function: reset_local(local :: Local)
  • function: reset_collected(collected: Collected)
  • function: compute_step(from: Index and/or Value, using_local: Local, into: Collected)
  • function: merge_into(from :: Collected, into :: Collector)
  • function: merge_into(from :: Collector, into :: Collector)

Naturally the result of the merge_into functions must not depend on the invocation order.

This describes the behavior when using multiple threads. When also using multiple processes, then once all threads in a process are done, the final Collector is sent to the process that invoked the distributed reduction. Ideally this process will use multiple threads to merge the collectors in parallel in a similar way to the above, until we get the final final Collector result of the whole loop.

In this case (also using multiple processes), we also need to add:

  • type: Global (shared between all threads in one process)
  • function: make_global() :: Global
  • And change the interface to compute_step(from: Index and/or Value, using_global: Global, using_local: Local, into: Collected). This allows a single point to connect to database(s), memory mapped file(s), etc. In general this global data is either immutable or provides a thread-safe API.

I also find myself having a second type of parallel loops where the overall result is an array, or a set of arrays, possibly memory-mapped to a file. Each iteration/step writes into different entries of the array. There's no Collector and Collected data but there might still be Local or Global data.

If the result array is just a normal in-memory array, then one simply needs to run all the iterations/steps on threads in the current process (still allocating the Local data once per iteration and resetting it once per iteration/step). We end up with:

  • data: Indices and/or Values to process
  • type: Result
  • data: Result (in-memory, shared between threads)
  • type: Local (possibly empty)
  • function: new_local() :: Local
  • function: reset_local(local :: Local)
  • function: compute_step(from: Index and/or Value, using: Local, into: Result)

Of course the compute_step function must be written such that each element of the target in-memory array(s) will only be written by one such function call.

If the result array is memory mapped from a file, we can distribute the work across multiple processes. This requires an additional Global type. Both it and the Result need to be constructed once per process (not per thread!). So we end up with:

  • data: Indices and/or Values to process
  • type: Global (possibly empty)
  • type: Result (shared between threads, using memory mapping also between processes)
  • type: Local (possibly empty)
  • function: new_global() :: Global (shared between threads on same process)
  • function: new_result() :: Result (memory map the array file(s))
  • function: new_local() :: Local
  • function: reset_local(local :: Local)
  • function: compute_step(from: Index and/or Value, using_local: Local, using_global: Global, into: Result)

Again the compute_step function must be written such that each element of the target in-memory array(s) will only be written by one such function call.

Side note: I'm worried about resource management in the distributed scenarios. File descriptors are a finite resource the GC isn't really aware of. I'd love to have additional:

  • function: release_global(global :: Global)
  • function: release_result(result :: Result)

Which would be invoked when each process is done with its work. But Julia doesn't seem to allow me to put anything useful in such functions. There's not even a hint that I know of to tell the GC "this object is probably not used by anyone anymore, please try to collect it soon to release non-memory resources associated with it".

At any rate, I found these patterns to be generic - they cropped up in many places in code base (Python, horrible, don't ask...).

At the same time, these patterns are certainly in a lower abstraction level than the classic map/reduce, fold, etc. For example, they refer to a specific process/threads topology so the threaded and the distributed API is different from the threaded API: explicit global data, constructing the memory-mapped arrays result per process, etc.

I find it difficult to figure out from the parallel Transducers documentation how to map these patterns to the API, if that is possible at all.

Would the Transducers library reduce the pain of creating an API for such patterns?

Or would it make more sense to directly implement these on top of the language primitives (@threads, @distributed, and pmap)?

@tkf
Copy link
Member

tkf commented Jan 12, 2020

  • Threads start as working. When a thread is...

What you describe here is implemented in Transducers.jl's reduce. I guess merge_into corresponds to Transducers.combine and compute_step corresponds to Transducers.next.

By the way, in Julia, the granularity of thread programming is task, not thread. So you can (or have to) let the julia runtime handle many things you described. You then can use divide-and-conquer approach to implement (I think) what you said without implementing explicit state machine.

I also find myself having a second type of parallel loops where the overall result is an array, or a set of arrays, possibly memory-mapped to a file.

You can just use side-effect in reducing function. I don't think you need a separate API for this as long as you can ensure each index is set only once. See Transducers.map! and NDReducibles.jl.

Again the compute_step function must be written such that each element of the target in-memory array(s) will only be written by one such function call.

I think Referenceables.jl used in the NDReducibles.jl examples is an interesting way to ensure this property.

Would the Transducers library reduce the pain of creating an API for such patterns?

My impression is that these are lower (or equivalent) level APIs than transducers and reduce. Also, I think it'd be better to factor out most of your API as a separate "resource pool manager" and try not to mix it with data-parallel part. Then, maybe it can be combined with Transducers.jl by acquiring per-basecase (per-task) resource in Transducers.OnInit and releasing it via Transducers.combine. (Return different type than basecase accumulator from Transducers.combine so that it occurs only when combining basecases).

@orenbenkiki
Copy link
Author

What you describe here is implemented in Transducers.jl's reduce. I guess merge_into corresponds to Transducers.combine and compute_step corresponds to Transducers.next.

Does that mean that Transducers perform a dynamically scheduled reduction tree between the threads (not necessarily a balanced tree) and that when using multiple processes, the invoking process uses its own multiple threads in a final dynamically scheduled reduction tree to obtain the final result?

By the way, in Julia, the granularity of thread programming is task, not thread. So you can (or have to) let the julia runtime handle many things you described. You then can use divide-and-conquer approach to implement (I think) what you said without implementing explicit state machine.

This may be the core of why it is unclear to me how to map my scenarios to existing APIs. To fully optimize the code I’d like to explicitly manage per-process, per-thread and of course per-task state. Per-process state is shared by tasks running on multiple threads in parallel, per-thread state is shared by tasks running serially on the same thread, and per-task state should be allocated only once and reset for each task. It isn’t clear to me how to create such a setup.

Would the Transducers library reduce the pain of creating an API for such patterns?

My impression is that these are lower (or equivalent) level APIs than transducers and reduce.

Agreed.

Also, I think it'd be better to factor out most of your API as a separate "resource pool manager" and try not to mix it with data-parallel part.

You mean, the part that deals with the per-process/thread/task state? Sure, that sounds reasonable. That should bridge the gap between what I have in mind and all sort of existing APIs. I need to figure out the exact API that makes sense here...

@tkf
Copy link
Member

tkf commented Jan 12, 2020

Does that mean that Transducers perform a dynamically scheduled reduction tree between the threads (not necessarily a balanced tree) and that when using multiple processes, the invoking process uses its own multiple threads in a final dynamically scheduled reduction tree to obtain the final result?

The end-result is correct but it's a bit different in that I don't do any scheduling in Transducers.jl and just let julia runtime do it for me. You can see that the core code for reduce is super simple. Roughly speaking, it's:

function _reduce(rf, init, xs)
    if issmall(xs)
        return foldl(rf, init, xs)  # basecase
    else
        left, right = halve(xs)
        task = @spawn _reduce(rf, init, right)
        a = _reduce(rf, init, left)
        b = fetch(task)
        return combine(rf, a, b)
    end
end

Also, I think it'd be better to factor out most of your API as a separate "resource pool manager" and try not to mix it with data-parallel part.

You mean, the part that deals with the per-process/thread/task state?

Yes, exactly.

BTW, just FYI, I think it's conceivable that a Task to be moved across OS threads in some future versions of julia (or maybe it's already the case?). This may happen when you hit some I/O in a Task. I'm just mentioning it since it may affect how you design your code. For example, I guess the approach to use "thread-local" workspace does not work well.

@orenbenkiki
Copy link
Author

The simple code described above is fine. It is an example of a "static scheduling" (balanced binary tree). If one is super-picky about optimization, then it is possible to squeeze some additional performance if one chops the loop to "small" regions, processes them in parallel (in arbitrary order), and merges their results as they arrive (in whatever order). The resulting reduction tree wouldn't be balanced, and will include merges between non-adjacent sub-ranges. I agree this is probably an overkill for many usages as the static scheduling is pretty fast.

Same issues occur when some of the reductions are done on other worker processes - merging the results returning from each such sub-process need not necessarily be done in-order (adjacent ranges) but could be done opportunistically. Again this would greatly complicate the code; the actual saving might not be worth it in many cases, but might be justified in others.

@tkf
Copy link
Member

tkf commented Jan 13, 2020

It is an example of a "static scheduling"

No. It is not static scheduling since Julia's scheduler is depth-first. See:

Thanks to depth-first scheduling, the tree does not have to be balanced. That's why I can implement efficient deterministically terminatable parallel reduce with such small amount of code.

But if you want to control over much lower scheduling detail, I understand that the current API is not enough. Maybe you are interested in: RFC: Make an abstraction for custom task parallel schedulers (GSoC proposal) - Internals & Design - JuliaLang

merges their results as they arrive (in whatever order)

Note that this requires that the reducing function to be commutative (and also associative). If you can impose this constrain, maybe JuliaLang/julia#34185 is a nice resource.

@orenbenkiki
Copy link
Author

orenbenkiki commented Jan 13, 2020

I was imprecise. When I said “static” I meant the order and structure of the reduction tree, not the timing of the operations. That is, by “dynamic” I meant performing the reductions in an arbitrary order.

This is especially important when merging the results from worker processes. The last slow such process would require additional log(number of worker) reductions after it is done when using a balanced in-order reductions tree. It only requires a single final reduction when using out-of-order reductions.

@tkf
Copy link
Member

tkf commented Jan 13, 2020

Thanks for the explanation. It clarifies a lot. Indeed, I was regarding log(number of worker) to be "not a big deal". But I understand that it can be a big deal when you have unpredictably varying workload per element.

It only requires a single final reduction when using out-of-order reductions.

Can't you still do this with two final reductions, without assuming that the reducing function to be commutative? I think you just have to track what is on the left and what is on the right.

This kind of dynamic/non-deterministic scheduler is not implemented yet. But that's an interesting addition.

@orenbenkiki
Copy link
Author

It has to be the full log(n) because you can't perform in advance any reductions on the path from the slow process result to the root (process w/ its neighbor leaf, the result with its neighbor merge of two process result, etc.). In my case, >16 machines, 5 reductions, that's not too bad, but still stings a bit; Amdahl is an unforgiving taskmaster :-)

@tkf
Copy link
Member

tkf commented Jan 13, 2020

I don't think so. Consider a map-reduce

f(x[1]) * ... * f(x[i-1]) * f(x[i]) * f(x[i+1]) * ... *  f(x[end])

where f(x[i]) is very slow. If you have non-deterministic scheduler you can schedule (as shown by parentheses) like this:

(f(x[1]) * ... * f(x[i-1])) * f(x[i]) * (f(x[i+1]) * ... *  f(x[end]))
                            ^         ^

After f(x[i]), there are only two *s marked by ^.

@orenbenkiki
Copy link
Author

You are right, I was wrong. I was thinking about an inflexible reduction tree (the trivial code you posted above). But if you only enforce the constraint the reductions are in-order, you can "rotate" the tree so that a late-arriving result would indeed only need two reductions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants