Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFH/WIP : exec_on_worker_thread and exec_on_main via a threadpool #22996

Closed
wants to merge 1 commit into from

Conversation

amitmurthy
Copy link
Contributor

This work is an initial attempt to implement the design in #22631 (comment)

Broadly there are two distinct functionalities -

  1. Execute f(args...) on a worker thread from a threadpool. Currently libuv's threadpool is used for this, but it can be any threadpool.
  2. Execute f(args...) on the main thread running the event loop - required for all IO, remotecalls, etc.

The Julia interface has not been added yet, you need to execute the following code block to try it out.

@everywhere begin
    #
    # Enable running of f(args...) on a uv threadpool worker.
    #

    global tp_reg = Dict{Int, Any}()
    global next_regid = 0

    # Used to spawn f(args...) on a worker thread from the uv threadpool
    # Waits for completion but yields to other tasks on the main thread
    function exec_on_worker_thread(f::Function, args...)
        global next_regid
        id = next_regid
        next_regid = next_regid + 1

        res = Ref{Any}(0)
        tp_reg[id] = (Condition(), false)

        zero_f = ()->(res[]=f(args...); return)

        # println("Pointers ", pointer_from_objref(zero_f))
        ccall(:jl_tp_queue, Void, (Ptr{Void}, Ptr{Void}, Ptr{Void}, Cint),
                            cf_exec_anon, pointer_from_objref(zero_f), cf_notify_c, id)

        c,isdone = tp_reg[id]
        !isdone && wait(c)
        delete!(tp_reg, id)
        res[]
    end

    # Called on the main thread by libuv to notify completion of work.
    # exec_on_worker_thread is waiting on a Condition variable for work completion,
    # trigger it.
    function notify_c(id::Int)
        # println("Done $id")
        c,_ = tp_reg[id]
        tp_reg[id] = (c, true)
        notify(c)
        return
    end
    const cf_notify_c = cfunction(notify_c, Void, (Int,))

    # Helper function to wrap anonynous functions to be C-callable via cfunction
    function exec_anon(p::Ptr)
        x = unsafe_pointer_to_objref(p)
        # exec_on_main(println, "In thread function, Anonymous wrapper called.")
        x()
        return
    end
    const cf_exec_anon = cfunction(exec_anon, Void, (Ptr{Void}, ))

    #
    # Julia code running in worker threads use exec_on_main
    # to forward calls to be executed under the main thread
    #

    function exec_on_main(f::Function, args...)
        ccall(:jl_exec_on_main, Any, (Ptr{Void}, Ptr{Void}), pointer_from_objref(f), pointer_from_objref(args))
    end

    # The below callback can be expected to be executed under the Main thread
    function exec_on_main_cb(f_ptr::Ptr, p_args::Ptr)
        f = getfield(Main, Symbol(unsafe_pointer_to_objref(f_ptr)))
        args = unsafe_pointer_to_objref(p_args)
        println("Executing ", f, " on Main thread with args ", args)
        return f(args...)::Any
    end

    ccall(:init_on_main_cb, Void, (Ptr{Void},), cfunction(exec_on_main_cb, Any, (Ptr{Void}, Ptr{Void})))

end

Julia interface:

  1. exec_on_worker_thread(f::Function, args...) executes f(args...) on a thread from libuv's threadpool
  2. exec_on_main(f::Function, args...) is called from julia code running in a worker thread to execute f(args...) on the main thread.

Examples: foo_on_worker_thread and busyloop_on_worker_thread below.

@everywhere begin
    function foo_on_worker_thread()
        exec_on_main(println, "foo_on_worker_thrd on worker thread ", ccall(:jl_threadid, Int16, ()))
        exec_on_main(remotecall_fetch, myid, 1)
    end

    const rval = Threads.Atomic{Float64}(0.0)
    function busyloop_on_worker_thread()
        global rval
        exec_on_main(println, "busyloop_on_worker_thread on worker thread ", ccall(:jl_threadid, Int16, ()))
        exec_on_main(println, rval[])
        exec_on_main(sleep, 1.0)
        while true
            rval[] = rand()
        end
    end

    getrval() = (global rval; rval[])
end

Start with julia -p1 with

export JULIA_NUM_THREADS=4
export UV_THREADPOOL_SIZE=4

defined in your .profile or equivalent.

The first example simulates a long running computation as a busyloop on a remote worker thread.
It will still be possible to execute further remotecalls on this worker (something not possible today)

# On remote worker 2, start a busy loop in a worker_thread
remotecall(exec_on_worker_thread, 2, busyloop_on_worker_thread)

# Still possible to make other remotecalls
remotecall_fetch(getrval, 2)

The next example is one where

  • a remotecall is executed on 2
  • this executes the request on a worker thread
  • the request executes another remotecall_fetch, But since this involves IO and cannot be executed from the worker thread it is executed via the main thread.
remotecall_fetch(exec_on_worker_thread, 2, foo_on_worker_thread)

Note:

# The busyloop prevents a normal exit of the worker, force kill it when done.
kill(get(Base.Distributed.worker_from_id(2).config.process), 9)

or kill -9 from the shell.

Status:

  • The examples above work. Have not tried with more complex functions.
  • The worker thread initialization routines have been copied over from the existing threading infrastructure. My knowledge of gc / codegen internals is virtually non-existent. Will need help in addressing them in the context of multi-threading.
  • Request forwarding to main thread is a good model and can be used for other stuff (codegen?) from worker threads too.
  • Overall, the concept works and can/should be refined.

Need help from Julia internals gurus - review code, suggest improvements, etc.

*/


#include "../deps/srccache/libuv/src/queue.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't right, if this isn't a public header we can't be using it from the source tree

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can copy it over - https://github.com/JuliaLang/libuv/blob/julia-uv1.9.0/src/queue.h - a simple circular queue implementation, not related to the rest of the libuv stuff.

@ararslan ararslan added the parallelism Parallel or distributed computation label Jul 28, 2017
@JeffBezanson
Copy link
Member

This seems to me to conflict with #22631. We do need the ability to forward I/O calls to thread 1, but we don't need two thread pools.

@amitmurthy
Copy link
Contributor Author

Yes, I agree. This example uses libuv's threadpool since the current Julia threading implementation does not yet have an interface for executing functions off a threadpool at different points in time. Specifically, the use case envisaged here is about incoming remotecall requests being spawned off onto worker threads leaving the main thread free for I/O.

We should finally have only one threadpool and only the I/O call forwarding part should be added into #22631 .

@vtjnash
Copy link
Member

vtjnash commented Feb 10, 2024

Completely replaced now by a usable threading system

@vtjnash vtjnash deleted the amitm/threading branch February 10, 2024 22:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parallelism Parallel or distributed computation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants