Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pooled futures #4

Closed
blaenk opened this issue Apr 8, 2015 · 13 comments
Closed

pooled futures #4

blaenk opened this issue Apr 8, 2015 · 13 comments

Comments

@blaenk
Copy link

blaenk commented Apr 8, 2015

I'm not sure what to name this, I asked you about this on IRC a while back. Basically I'd like to be able to spawn futures that execute within some bounded thread pool, so that I can comfortably fire off say 100 futures and not worry about it possibly spawning 100 threads at once in the worst case. Instead it would reuse a set pool of threads to avoid the tear up/tear down cost of threads.

You told me that this was possible in some way and gave me links, but I honestly got a bit lost. If this is already possible in some way, I think it'd be great if there were an example for it :D

Looking at the example code in the documentation, maybe one option would be some function that had a parameter for the pool size, e.g.:

// execute in a thread pool of 4
let res = pooled(4, (
        future1.map(|v| v * 2),
        future2.map(|v| v + 5)))
    .and_then(|(v1, v2)| v1 - v2)
    .await().unwrap();

Although I guess it would also be pretty cool if we could create one such pool and pump futures into it separately.

If this isn't already possible, then I figured I'd create an issue for it so that it could be closed when it's implemented (if you do plan on doing so) to allow me and other people interested in such a feature to be notified.

@reem
Copy link
Contributor

reem commented Apr 8, 2015

This should be possible with join, ScopedThreadPool, and the changes to Future discussed in #3

@carllerche
Copy link
Owner

So, basically Syncbox provides ThreadPool and it also has a Run trait which is implemented by ThreadPool. Eventual doesn't have anything related to scheduling.

Combining ThreadPool and eventual's Future would provide the functionality that you want, but currently doesn't exist. It used to exist back when Eventual & Syncbox were the same library, but now that it is split out, I haven't figured out how to provide the pooled future w/o having a circular dependency.

Here is how you would implement it:

https://github.com/carllerche/syncbox/blob/8fddf7f9bd51c1d37fd8e74c6786818e999d6a68/src/util/run.rs#L14-L22

Basically, create a Future / Complete pair, schedule the block for execution on the thread pool, then return the future component.

I would propose a defer function, like this (I didn't execute it, so I don't know if it compiles):

// Fires the computation on the thread pool.
pub fn defer<A: Async, R: Run>(a: A, r: R) -> Future<A::Value, A::Error> {
    let (tx, rx) = Future::pair();

    r.run(move || {
        // Receive "fires" the computation on the current thread
        a.receive(move |res| {
            match res {
                Ok(val) => tx.complete(val),
                Err(AsyncError::Failed(err)) => tx.fail(err),
                // Otherwise, do nothing. A thread panicked before completing the value, doing
                // nothing will proxy the panic (represented as an AsyncError::Aborted)
                Err(_) => { }
            }
        });
    });

    rx
}

And then you would use it:

let f1 = defer(future1.map(|v| v * 2), my_threadpool);
let f2 = defer(future2.map(|v| v + 5), my_threadpool);

let res = join((f1, f2))
    .and_then(|(v1, v2)| v1 - v2)
    .await().unwrap();

And of course, fancier scheduling abstractions could be built up using these building blocks :)

@blaenk
Copy link
Author

blaenk commented Apr 10, 2015

Thanks for taking the time to write this out Carl! I'll play around with this when I get a chance, if you don't happen to push something like this somewhere before then.

I'll keep this open for you to close when this is available for ready-use somewhere (will help to notify me and anyone else that may be interested), but if you'd rather close it or do something else feel free! :)

@posix4e
Copy link
Contributor

posix4e commented Apr 14, 2015

@blaenk curious if you are working on this or if i could take a shot
@carllerche I am pretty new to rust development. What type of tests would you like to see?

@blaenk
Copy link
Author

blaenk commented Apr 14, 2015

Go for it! :)

On Tuesday, April 14, 2015, Alex Newman notifications@github.com wrote:

@blaenk https://github.com/blaenk curious if you are working on this or
if i could take a shot
@carllerche https://github.com/carllerche I am pretty new to rust
development. What type of tests would you like to see?


Reply to this email directly or view it on GitHub
#4 (comment).

  • Jorge Israel Peña

@carllerche
Copy link
Owner

Great! I'm going to try to do a brain dump to hopefully get you started.

I think there should be two functions that end up getting exported at the top level of the lib. You can create a run.rs file though and then re-export the functions (following the pattern like join()).

The functions could look like this:

/// Execute any chained computation builders on the given `Run`.
pub fn defer<R: Run, A: Async>(r: R, async: A) -> Future<A::Value, A::Error> {
    // ...
}

// Executes the given action on the given `Run`.
pub fn background<R: Run, F: FnOnce() -> T, T: Send>(R: Run, action: F) -> Future<T, ()> {
    // ...
}

The core tests could look something like:

#[test]
fn test_defer_runs_on_thread_pool() {
    // Set thread local
    let pool = ThreadPool::single_thread();
    let (complete, future) = Future::pair();

    let res = eventual::defer(pool, future).and_then(|v| {
        // Assert thread local is not present here
        Ok(v + 5)
    });

    complete.complete(7);
    assert_eq!(Ok(7 + 5), future.await());
}

#[test]
fn test_threadpool_background() {
    // Set thread local
    let pool = ThreadPool::single_thread();
    let flag = Arc::new(AtomicBool::new(false));

    let f = flag.clone();
    let res = eventual::background(pool, || {
        // Assert thread local is not present here
        assert!(f.load(Ordering::Relaxed));
        5
    });

    // Wait for a bit to make sure that the background task hasn't run
    thread::sleep_ms(100);

    // Set the flag
    flag.store(true, Ordering::Relaxed);

    assert_eq!(Ok(5), res.await());
}

I have not tried to run any of this code, so it may be wrong... but it is a start :). Basically, the important bit is to try to make sure that the callbacks don't execute on the thread where they are defined. Thread locals seem like the best way to do this.

Besides this, you can look at how i write some of the thread pool tests: https://github.com/carllerche/syncbox/blob/master/test/test_thread_pool.rs

Feel free to ping me on IRC or whatever too.

@posix4e
Copy link
Contributor

posix4e commented Apr 15, 2015

Curious how close I got
posix4e/syncbox@b8ce4b8

@posix4e
Copy link
Contributor

posix4e commented Apr 15, 2015

Note I tried changing the + Send to +Send +'static in syncbox

@posix4e
Copy link
Contributor

posix4e commented Apr 15, 2015

One thing when i walk this through the debugger the test passes. It makes me think we are not waiting

@posix4e
Copy link
Contributor

posix4e commented Apr 17, 2015

For background I am thinking about,

*edited

pub fn background<R: Run < Box < TaskBox >> + Send + 'static, F: Send + 'static + FnOnce(T) -> T, T: Send>

Do I have that right?

@blaenk
Copy link
Author

blaenk commented Apr 17, 2015

Can you format that as a codeblock in markdown? It got all messed up cause of the angle brackets I think.

@posix4e
Copy link
Contributor

posix4e commented Apr 17, 2015

Sure. I updated the method sig above

@blaenk
Copy link
Author

blaenk commented Aug 17, 2015

I'm pretty sure this is done now, so I'll close it.

@blaenk blaenk closed this as completed Aug 17, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants