-
-
Notifications
You must be signed in to change notification settings - Fork 373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel Run Adaptive Sampler #3028
Conversation
Tagging parties interested @wds15 @yizhang-yiz @mitzimorris @bbbales2 once we have the API sorted out this should be decently easy peasy |
The isolation thing is easy to implement. Let‘s just do an issue for that and fix it. That‘s anyway good to have to make things safe whenever Stan chains run in parallel environments (within a single arena). |
One point for consideration is the thread allocation / requesting. So far we have (by design of one chain per process) a by-chain thread allocation controlled by the environment variable |
Yeah @wds15 I think what you are saying makes sense. What we could do is finally add the
Making sure I read your comment right, I think we are both on the same page that the user should just tell stan the total amount of available threads and on the backend we can sort out how to use those. I think Stan's preference would always be to use a thread for each chain since the overall chain is the more expensive thing to run. Then any other available threads can be used in the model |
Other than segfaults, I guess the only thing I worry about here is I expect work done outside of reduce_sum to scale more efficiently than work in reduce_sum (because reduce_sum has the shared argument copy overhead). If there's lots of outside-reduce_sum work and lots of inside-reduce_sum work, I'd hope that the outside work gets priority cause the inside work scales worse. I assume that would show up quickly with experiments though so probably just easier to see if it happens than worry about it too much ahead of time.
So there's a separate output file for each chain? Question there is what would the interface look like. It seems awkward to do:
The other option would be do some sort of basename or automatic name-rewriting. So if the user specified:
They would get myfile.csv.0, myfile.csv.1, etc. or something. My guess is we should just have a meeting with @mitzimorris and @rok-cesnovar and talk that one through. I like the name re-writing which looks like what is being done in the prototyping. There's probably also some extra output that should be written in the not-quite-csv output headers. It looks like this is something that can remain compatible with the single chain interface which is neat.
Seems simpler to think about to have multiple RNGs unless they aren't threadsafe or something. Like, if you wanted to reproduce a calculation and you only wanted to run one thread, then no need to reproduce in parallel. If seed isn't specified then I assume picking a seed however we currently do is the way to go. We'd probably need to be able to specify the seed as a list or something now. There are probably other arguments that may make sense to be optionally-lists? Inits? I'm not sure. That's a question to coordinate with cmdstanr and cmdstanpy I guess.
Sounds like it shouldn't negatively affect behavior so yeah def if it makes this easier. What is the file_stream_writer here doing? I searched for it but it didn't show up in the run_adaptive thing for me. Could have missed something. Looks like the basic coding is under control. I am happy to review, help write the tests, or help with the cmdstanr/cmdstanpy stuff for this. |
The work anyway needs to get done. ... the only risk there is that the TBB scheduler slices the nested reduce_sum work into more slices than what is healthy if given more cores, but I don‘t think that this is the case. All that said... let‘s just do a basic benchmark, sure.
The rng topic needs to be handled careful. It should not make a difference how many threads are being used as to what the numerical result is. The numerical result should always be the same ideally. |
Yeah I think we would notice if we are on a bad beat here
I like this option
Though I think I'd just prefer
Agree!
Yep!
So the PRNG thing, I think we will be fine if we set the seed to be just
So that
Much appreciated! I think at this point I still need to figure out some patterns etc. that we can probs hash out over a call. But besides that I think the testing etc. I can handle for now and can holler otherwise
We can def have reproducibility no sweat. The main issue is just making sure we have good random numbers (imagine spinning up 8 chains but then each chain has correlated random numbers :-x) I got a test up and running, but running the address sanitizer over it I'm seeing leaks happening after the test finishes inside of https://gist.github.com/SteveBronder/7c2b2c371d4ce805fbd301fca32c088f |
I agree that the user should just set the max number of cores to use (threads to spawn) and let us deal with how that is used. |
I am a bit confused here. The name of the PR is "Parallel Adaptive Sampler". Does that mean the first step of multi chain in service will already include the campfire-communication-between-chains-in-warmup things? I think step 1 should be just multi chains in services. I am probably overthinking the title :) |
No sorry tbc this is just running |
Hmm, running 3 chains with two threads also gives some nasty stuff https://gist.github.com/SteveBronder/b414e37cb081ba2be7cb5da512557c7c |
Actually above may be something about stan-dev/math#2428 |
Okay more confusion, if I compile that test with
Then run the test with
I no longer get the vptr undefined error and just get the error about the end of the application not cleaning up memory. |
Can u use an external up to date version of the one tbb? That should be possible with our makefiles and hopefully resolves the cleanup issue. |
I'll try that later today or tomorrow. But even if that solves it folks will still not be able to use this because of how RcppParallel is on the old version right? Tbh I wonder if we could just submit |
Jenkins Console Log Machine informationProductName: Mac OS X ProductVersion: 10.11.6 BuildVersion: 15G22010CPU: G++: Clang: |
Ugh so I'd like to just bump up if we can, then if that's possible we can talk to the RcppParallel folks about whether they should update, whether we should make a new library, etc. But it looks like the new oneTBB must use cmake so we will have that as a depedency as well. It feels onerous to ask users to supply their own version of tbb, though I'm not sure what's normal here. I have a branch of math below with the new tbb version, but need to sort out how to hook it up to our makefile stuff. Maybe it's just time to use cmake? https://github.com/stan-dev/math/compare/update/onetbb |
Pinging @hsbadr as I feel like he might have a better idea of what route to take here |
@SteveBronder So, we may remove the TBB source code from We could use external If/when needed, we can maintain code compatibility via I haven't reviewed the code in this PR yet, but I'll take a look ASAP. |
So far we have always kept all required libraries together which are required to get Stan running. I don't think we would want to move away from that. @SteveBronder Maybe things are not so desperate. Either we can work around the issue or maybe this problem is not really an issue as things anyway get cleaned up. Let's see. To correct what I said earlier: If we start 4 chains with 4 threads, then any nested reduce_sum should not be given priority. We then still want the 4 chains to run in parallel instead of running 1 chain at a time with reduce_sum using 4 cores simultaneously. In order to test this I suggest that we code up a version of this thing where the different chain are seeded with independent RNGs, but those run in sync. This way we fire up 4 times the same work and we should get their results in basically the same time. For the final release it should not be possible to fire up 4 exactly equal chains, of course. |
We can use single seed if we are smart about it. If we initialize one PRNG with user provided seed and use that to generate seeds for PRNGs in threads. Obviously this kind of initialization must use a different PRNG than threads.
For usage on a single CPU period of 2^61 seems adequate. For example if we run 200 threads and generate 10^7 random numbers per thread that gives up probability that any streams overlap of 10^-6. With more threads (thinking about GPUs or clusters) we should use a PRNG with longer period. I already did some testing of PRNGs in parallel (with focus on GPUs, but it should be applicable here): https://link.springer.com/article/10.1007/s11227-019-02756-2 |
Q for @hsbadr : What is the last version of TBB/oneTBB that we can just plug into Math (so just replace sources and expect things to work with our current makefiles). Maybe we should move to that and help RcppParallel move to that as well? So we have a bit of extra time to figure out what to do next. |
And for replication purposes I have in my make/local
and am doing
|
Alright so after looking around I found some docs and this thread which describes the behavior here and I think there's enough evidence to say that, for the version that uses the new tbb, this is a leak but not a dangerous leak Cleanup for the ad tape in each thread normally happens from the global
So what's happening is that before But the other memory error with our current version of the tbb about the vptr being incorrect I think is an actual bug inside of that version of the tbb. So I think if we bump up the tbb version to 2020-3 (which uses the makefiles still so no cmake) and use |
So the good news is that for the generated quantities the signature actually supplies an RNG, so for that the current impl I have with multiple PRNGs should be alrighty. Multiple PRNG for the transformed data might have to come some other time. One nice benefit here is that the single model is shared across all the processes which cuts out memory overhead, but if we want multiple PRNG for the transformed data then we'll also have to make multiple models to have different transformed data. Maybe we could have an option at the cmdstan level to support sbc which would then give each thread it's own model? Though I think we should do that another time as the stuff here so far seems like a nice layup (I need to read the stuff you and @t4c1 posted but so far I think my concerns were overblown and we should be okay). |
I think so. Just need to test it on Windows.
That sounds like a great transition to the new TBB interface, with |
Awesome! Just put up a pr with the new tbb version in math |
Great! It'd be optimal to update TBB in |
Jenkins Console Log Machine informationProductName: Mac OS X ProductVersion: 10.11.6 BuildVersion: 15G22010CPU: G++: Clang: |
Alright I think this is ready for review! I'm going to open up another PR on top of this one soon that has the signature for nuts with a diagonal metric |
Is this a cmdstan PR or is there something else in stan that needs changed? Edit: Also does this need to wait on the TBB math thing: stan-dev/math#2447 or can it go ahead (sorry all the TBB discussion in this thread was a whirlwind and I did not keep up lol) |
This is just a Stan PR, once #3033 goes in we can do the cmdstan stuff.
It was a whirlwind lol. For review no, but it would be a good idea to wait to merge until the tbb pr is in |
Okay so this is a services function, and then we'll do a samplers pull, and then we'll move on to cmdstan? |
Yep! #3033 is the sampler pull |
…4.1 (tags/RELEASE_600/final)
Jenkins Console Log Machine informationProductName: Mac OS X ProductVersion: 10.11.6 BuildVersion: 15G22010CPU: G++: Clang: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments. I'm gonna go read the sampler PR. Gotta piece more of this together.
: output_(std::move(output)), comment_prefix_(comment_prefix) {} | ||
|
||
file_stream_writer(); | ||
file_stream_writer(file_stream_writer& other) = delete; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do these two lines mean?:
file_stream_writer();
file_stream_writer(file_stream_writer& other) = delete;
Because the output_
is a unique ptr we don't allow copy constructors only move constructors or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh okay I looked back at what you previously wrote:
So that file_stream_writer holds unique pointers to each of the file streams. I had some
issues using stream_writer where somewhere a reference was being dereferenced or
something was falling out of scope (stream_writer just holds it's ostream by refrerence)
so I thought the easiest solution was just to wrap that ostream into an std::unique_ptr.
Then we also get promises that no-one ever copies that thing.
It seems cleaner to replace stream_writer with something that behaves better. Presumably the other use cases of stream_writer can still work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to replace stream writer, but I was having trouble with backwards compatibility. I think once cmdstan uses the file_stream_writer
we can remove stream_writer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it difficult because it's one of these cross-repo things where we'd need to change cmdstan and stan at the same time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's the main issue
[num_warmup, num_samples, num_thin, refresh, save_warmup, &samplers, | ||
&model, &rngs, &interrupt, &logger, &sample_writers, &cont_vectors, | ||
&diagnostic_writers](const tbb::blocked_range<size_t>& r) { | ||
for (size_t i = r.begin(); i != r.end(); ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now can we just call run_adaptive_sampler
n_chains times in parallel like:
run_adaptive_sampler(samplers[i], model, cont_vectors[i], num_warmup,
num_samples, num_thin, refresh, save_warmup, rngs[i],
interrupt, logger, sample_writers[i],
diagnostic_writers[i]);
Long term these could behave differently (if we did custom adaptation or something) but they seem the same for now? Or is there some difference I'm missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I didn't realize this until I started working on #3033 lol. Yeah I think that would work? Let me try to do that in #3033. Though I will say I kind of would like to keep this code only as an example of how to do parallel_for()
at this level which is what the auto warmup size thing will need to do. For the auto warmup stuff you mostly just need to take this code, break it up into two parallel for loops, then add another loop around the warmup parallel_for()
to do the warmup in chunk sizes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright so just ran this locally and it gives back good answers. If we are going to cut this code out then what I can actually do it remove the changes to run_adaptive_sampler in #3033 and just do the simpler parallel loop and then close this PR
for (int m = 0; m < num_iterations; ++m) { | ||
callback(); | ||
|
||
if (refresh > 0 | ||
&& (start + m + 1 == finish || m == 0 || (m + 1) % refresh == 0)) { | ||
int it_print_width = std::ceil(std::log10(static_cast<double>(finish))); | ||
std::stringstream message; | ||
if (n_chain > 0) { | ||
message << "Chain [" << (n_chain + 1) << "]"; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense but we can run it by interfaces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah anyone that is depending on the current behavior shouldn't be effected by this so I think it's fine
callbacks::logger& logger, | ||
std::vector<SampT>& sample_writers, | ||
std::vector<DiagnoseT>& diagnostic_writers, | ||
size_t n_chain) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the model, interrupt, and logger bits need to be threadsafe.
Is every function in the model (other than constructors/destructors) thread safe? I think this threading stuff will mess up the profiles, so we gotta figure out what to do there.
For the interrupts and loggers, I see that the implementations in the test had to change but shouldn't there be changes in the main source too? Or were these already threadsafe or something?
I see that interrupt is for the interfaces to capture keystrokes. Are the interrupts signals generated at the interfaces and transmitted in here -- or are they signals generated in the samplers and transmitted back to the interfaces?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the logger goes to std::cout
which is threadsafe (see here)
Unless sync_with_stdio(false) has been issued, it is safe to concurrently access these objects from multiple threads for both formatted and unformatted output.
The interrupts, at least in cmdstan, are actually just the base impl stan::callbacks::interrupt
which doesn't actually do anything. I'm looking at the R interrupt and it seems like that would be threadsafe.
Is every function in the model (other than constructors/destructors) thread safe?
Yep! I've looked the model class over. The model is essentially immutable after construction so it's safe to pass around.
I think this threading stuff will mess up the profiles, so we gotta figure out what to do there.
Oooh hm idk how this will effect profiling. @rok-cesnovar looking at the profiling gen code
{
profile<local_scalar_t__> profile__("testerrr", const_cast<profile_map&>(profiles__));
current_statement__ = 3;
assign(transf_testp, stan::math::exp(testp),
"assigning variable transf_testp");
}
I think we might need a mutex in the profile constructor here
profile(std::string name, profile_map& profiles)
: key_({name, std::this_thread::get_id()}) {
{
std::lock_guard<std::mutex> map_lock(profile_mutex);
// idt search is thread safe
profile_map::iterator p = profiles.find(key_);
if (p == profiles.end()) {
// idt insertion is thread safe either
profiles[key_] = profile_info();
}
// rest business as usual
}
I see that interrupt is for the interfaces to capture keystrokes. Are the interrupts signals generated at the interfaces and transmitted in here -- or are they signals generated in the samplers and transmitted back to the interfaces?
They are called in generate_transitions
which is for the keystroke thing for R / Python / higher level systems that need to tell the C++ to stop. It's not used in cmdstan since you can just call ctrl+c on a binary to tell the system to stop whatever program is running atm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it an option to run the profiler in only one thread or avoid the need for a mutex by using multiple model copies? We do not want a mutex unless we have to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want separate profiles for each thread if we want profiling to act like it currently does (a separate profile for each chain). Is Stan data actually copied into the model? Or is it references? I guess if it's references then we could actually have lots of models? But maybe that is undesirable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a mutex? The thread id is a key in the storage map and each thread is thus guaranteed to always access only its "own" profiles. std::map
is supposed to be fine in that case: https://stackoverflow.com/questions/15067160/stdmap-thread-safety and other resources if you google "std::map thread safety".
We could switch to using tbb::concurent_hash_map though I think that is unecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The profiler itself runs in one thread, the problem is that in the above constructor the profile_map
is a global std::map
and idt there's any reasonable way to get around that.
imo idt a mutex is that bad here. I think we can actually put it in the if (p == profiles.end())
since find
should be threadsafe since it's searching for a key that also has the thread_id in it. But for insertion with profiles[key_]
we do not want to be adding keys simultaneously as we could end up in a situation where the map reallocs memory
One solution here is to prefill the profile_map keys. Since we know each key is decided by a unique name and thread combination no two threads can cause a race condition because they can write to different parts of the map. So we only need to worry about the map reallocing for new keys. for an example model like
model {
profile("ayyy") {
theta ~ beta(1,1); // uniform prior on interval 0,1
}
profile("yooo") {
y ~ bernoulli(theta);
}
}
We can have a function in the model like
void init_profile_map(){
const auto max_threads = get_max_threads();
for (auto&& id : std::array<const char*, 2>{"ayyy", "yooo")) {
for (int i = 0; i < max_threads; ++i) {
profile_map[{name, std::this_thread::get_id()}] = empty_profile_info();
}
}
}
Or something like that to fill up the keys before we do any other insertions. Then we don't need to worry about data reallocation since all the keys are there.
But still I think a mutex here is not that bad. It's short lived and only causes locks whenever we are doing profiling (which imo is a usage pattern that is not for performance anyway). I think what I would like is to just put a mutex in the if for now and then later we can do the lock free pattern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh shoot I typed that out before Rok posted his reply. Yeah from there it looks like it is safe to insert since each key is distinct for each thread. I'll look into this more to make sure it's fine. If I can't sort out anything conclusive then I think moving to a tbb::concurent_hash_map
is a reasonable solution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Global variables are problematic for obvious reasons. The TBB data structures should be a good alternative... How often is this code being run over? I mean, is it performance critical?
EDIT: If std map operations needed are supposed to be safe in this usage, then no need to switch to the TBB thing, of course.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Insert to a map happens the first time a profile is registered. So once in a lifetime of a model. After that its just read access to get the reference to profile info. And those are separated for separate threads, no race conditions there.
If locking the insert in a mutex is something that would make us sleep more calmly, thats not a big price to pay. It does seem its not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah idt it's needed. Let's not worry about this for now and we should notice it with a pretty simple test once the cmdstan stuff is up
Computer crashed mid-review here. I came back and comments were gone, so I added them again. It seems like in the end some of the comments got doubled. Sorry for the noise. I cleaned up duplicates. |
Submission Checklist
./runTests.py src/test/unit
make cpplint
Summary
This is a WIP to figure out some of the odds and ends of the parallel version of adaptive sampling. The main issues
The
sample_writers
anddiagnostic_writers
come in as a vector of streams where the current impl comes in as a writer for each stream. Is that fine? At the cmdstan level I suppose we can just check that ifn_chain > 1
we can spin up writers tooutput_{chain_#}
?wrt RNGs, do we need to have a special PRNG or is it safe to just have multiple RNGs? Should we use a different RNG with a longer cycle and set each seed to user given initial +
2^chain_id
or something?For what @wds15 brought up here should the
isolate()
thing happen before we put this PR in or after? I think as long as it's not exposed to users yet it's fine to work on this as long as we do that before we make this available via cmdstan.Till we sort out 1-3 I just brought in the adaptive_sampler from
cmdstan#987
, once we have those 3 sorted we will have a version that just blandly runs each chain in parallel, but it will be very easy to make new methods like auto warmup across chains since we'd just have somerun_auto_warmup_adaptive_sampler()
withparallel_for
s for the warmup and sampling where the warmup is just something likeIntended Effect
Run Chains in Parallel
How to Verify
Side Effects
Documentation
Copyright and Licensing
Please list the copyright holder for the work you are submitting (this will be you or your assignee, such as a university or company):
By submitting this pull request, the copyright holder is agreeing to license the submitted work under the following licenses: