Skip to content

Commit

Permalink
user defined transports
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Jan 21, 2015
1 parent 8b4e9e9 commit a2edd64
Show file tree
Hide file tree
Showing 18 changed files with 617 additions and 17 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ $(build_docdir):
@mkdir -p $@/examples
@cp -R doc/devdocs doc/manual doc/stdlib $@
@cp -R examples/*.jl $@/examples/
@cp -R examples/clustermanager $@/examples/

git-submodules:
ifneq ($(NO_GIT), 1)
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ Library improvements

* Split `Triangular` type into `UpperTriangular`, `LowerTriangular`, `UnitUpperTriagular` and `UnitLowerTriangular` ([#9779])

* ClusterManager - Performance improvements([#9309]) and support for changing transports([#9434])

Deprecated or removed
---------------------

Expand Down
11 changes: 9 additions & 2 deletions base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,15 @@ function process_options(args::Vector{UTF8String})
if args[i]=="-q" || args[i]=="--quiet"
quiet = true
elseif args[i]=="--worker"
start_worker()
# doesn't return
worker_arg = (i == length(args)) ? "" : args[i+1]

if worker_arg == "custom"
i += 1
else
start_worker()
# doesn't return
end

elseif args[i]=="--bind-to"
i+=1 # has already been processed
elseif args[i]=="-e" || args[i]=="--eval"
Expand Down
2 changes: 2 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export
BitArray,
BitMatrix,
BitVector,
BufferStream,
CartesianIndex,
CartesianRange,
CFILE,
Expand Down Expand Up @@ -1218,6 +1219,7 @@ export
nprocs,
nworkers,
pmap,
process_messages,

This comment has been minimized.

Copy link
@JeffBezanson

JeffBezanson Mar 15, 2015

Member

This doesn't seem like a great function to have exported. The manual tells you to call it for every connection --- is there a way to automate that so the API requires less boilerplate?

Also, the functions init_worker, launch, manage, and process_messages need help entries.

This comment has been minimized.

Copy link
@amitmurthy

amitmurthy Mar 16, 2015

Author Contributor

To clarify, process_messages needs to be called only on incoming connections, and only when using custom transports. Since it will only be used in exceptional circumstances, we could un-export it and document its usage as Base.process_messages

OTOH, to avoid exporting a new name, we could reuse schedule. schedule(cman::ClusterManager, instrm::AsyncStream, outstrm::AsyncStream) would be the method that schedules a message processing loop for a new inbound connection.

Will add help entries for the other functions.

This comment has been minimized.

Copy link
@JeffBezanson

JeffBezanson Mar 17, 2015

Member

It just doesn't seem like quite the right factoring to me. Ideally code inside Base would call process_messages, and the code for a custom transport would handle only what's different about that transport. In other words, it should be impossible to have a worker that doesn't call process_messages.

This comment has been minimized.

Copy link
@amitmurthy

amitmurthy Mar 18, 2015

Author Contributor

Yeah, I know, but I couldn't figure out a way around this. The current mechanism is as follows:

  • Each logical connection is handled by a Task and 2 AsyncStream objects (one for incoming messages and one for outgoing).
  • Workers with higher pids connect to workers with lower pids.
  • Consider two workers, 2 and 3.
  • On 3, Base calls connect to initiate a connection to 2. connect on 3, implemented by the custom transport implementation returns a pair of AsyncStream objects, Base calls process_messages, which launches a new task to handle messages from 2 and the setup is complete.
  • On 2, the custom transport implementation receives the logical connect request from 3. Base does not have control. 2 calls process_messages with the AsyncStream objects representing the logical connection.
  • With the inbuilt TCP transport, process_messages is called upon an accept from the SSH/Local Managers which happen to be in Base. The equivalent of accept is implemented by the MPI or ZMQ managers in their own way.
  • In short for every connection setup between pairs of workers, on one side, connect is called into the custom transport implementation from Base, and on the other end, process_messages is called from the cluster manager into Base.

Short of re-architecting the current model of one task per logical connection, I don't see a way around this. Open to ideas.

This comment has been minimized.

Copy link
@eschnett

eschnett Mar 18, 2015

Contributor

I assume that process_messages is the main loop that each worker needs to call. To make it "impossible" to not call process_messages, consider the following:

  • There is a command line flag, known to Base, that specifies that the current process is starting a new worker
  • This command line flag accepts an option (probably a file name) that specifies a particular function that is called
  • This function needs to initialize the custom transport mechanism, and then returns the AsyncStream object

Probably there needs to be an equivalent function that Base needs to call down to shut down the custom transport mechanism. E.g. for MPI, this would wrap MPI_Init and MPI_Finalize. Or maybe this finalization should be registered, in the initialization routine, as a finalizer.

This is probably what @JeffBezanson is looking for. I don't see much benefit of this approach -- writing a custom transport mechanism is a fairly special task, so that calling back into a routine in Base to handle the event loop seems fine.

This comment has been minimized.

Copy link
@JeffBezanson

JeffBezanson Mar 18, 2015

Member

Ok, perhaps not export it then?

This comment has been minimized.

Copy link
@amitmurthy

amitmurthy Mar 18, 2015

Author Contributor

Yeah, I am fine with that. Will do it.

procs,
put!,
remotecall,
Expand Down
9 changes: 8 additions & 1 deletion base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,8 @@ function rmprocs(args...; waitfor = 0.0)
else
if haskey(map_pid_wrkr, i)
push!(rmprocset, i)
remote_do(i, exit)
w = map_pid_wrkr[i]
kill(w.manager, i, w.config)
end
end
end
Expand Down Expand Up @@ -1286,6 +1287,12 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
(s, s)
end

function kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
remote_do(pid, exit) # For TCP based transports this will result in a close of the socket
# at our end, which will result in a cleanup of the worker.
nothing
end

function connect_w2w(pid::Int, config::WorkerConfig)
(rhost, rport) = get(config.connect_at)
config.host = rhost
Expand Down
44 changes: 44 additions & 0 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -960,3 +960,47 @@ mark(x::AsyncStream) = mark(x.buffer)
unmark(x::AsyncStream) = unmark(x.buffer)
reset(x::AsyncStream) = reset(x.buffer)
ismarked(x::AsyncStream) = ismarked(x.buffer)

# BufferStream's are non-OS streams, backed by a regular IOBuffer
type BufferStream <: AsyncStream
buffer::IOBuffer
r_c::Condition
close_c::Condition
is_open::Bool

BufferStream() = new(PipeBuffer(), Condition(), Condition(), true)
end

isopen(s::BufferStream) = s.is_open
close(s::BufferStream) = (s.is_open = false; notify(s.r_c; all=true); notify(s.close_c; all=true); nothing)

function wait_readnb(s::BufferStream, nb::Int)
while isopen(s) && nb_available(s.buffer) < nb
wait(s.r_c)
end

(nb_available(s.buffer) < nb) && error("closed BufferStream")
end

function eof(s::BufferStream)
wait_readnb(s,1)
!isopen(s) && nb_available(s.buffer)<=0
end

show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",nb_available(s.buffer),", isopen:", s.is_open)

nb_available(s::BufferStream) = nb_available(s.buffer)

function wait_readbyte(s::BufferStream, c::UInt8)
while isopen(s) && search(s.buffer,c) <= 0
wait(s.r_c)
end
end

wait_close(s::BufferStream) = if isopen(s) wait(s.close_c); end
start_reading(s::BufferStream) = nothing

write(s::BufferStream, b::UInt8) = (rv=write(s.buffer, b); notify(s.r_c; all=true);rv)
write{T}(s::BufferStream, a::Array{T}) = (rv=write(s.buffer, a); notify(s.r_c; all=true);rv)
write(s::BufferStream, p::Ptr, nb::Integer) = (rv=write(s.buffer, p, nb); notify(s.r_c; all=true);rv)

105 changes: 92 additions & 13 deletions doc/manual/parallel-computing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -674,21 +674,46 @@ retained.
ClusterManagers
---------------

Julia worker processes can also be spawned on arbitrary machines,
enabling Julia's natural parallelism to function quite transparently
in a cluster environment. The :class:`ClusterManager` interface provides a
way to specify a means to launch and manage worker processes.
The launching, management and networking of julia processes into a logical
cluster is done via cluster managers. A ``ClusterManager`` is responsible for

Thus, a custom cluster manager would need to:
- launching worker processes in a cluster environment
- managing events during the lifetime of each worker
- optionally, a cluster manager can also provide data transport

- be a subtype of the abstract :class:`ClusterManager`
- implement :func:`launch`, a method responsible for launching new workers
- implement :func:`manage`, which is called at various events during a worker's lifetime
A julia cluster has the following characteristics:
- The initial julia process, also called the ``master`` is special and has a julia id of 1.
- Only the ``master`` process can add or remove worker processes.
- All processes can directly communicate with each other.

Connections between workers (using the in-built TCP/IP transport) is established in the following manner:
- ``addprocs`` is called on the master process with a ``ClusterManager`` object
- ``addprocs`` calls the appropriate ``launch`` method which spawns required
number of worker processes on appropriate machines
- Each worker starts listening on a free port and writes out its host, port information to STDOUT
- The cluster manager captures the stdout's of each worker and makes it available to the master process
- The master process parses this information and sets up TCP/IP connections to each worker
- Every worker is also notified of other workers in the cluster
- Each worker connects to all workers whose julia id is less than its own id
- In this way a mesh network is established, wherein every worker is directly connected with every other worker


While the default transport layer uses plain TCP sockets, it is possible for a julia cluster to provide
its own transport.

Julia provides two in-built cluster managers:

- :class:`LocalManager`, used when :func:`addprocs` or :func:`addprocs(::Integer) <addprocs>` are called
- :class:`SSHManager`, used when :func:`addprocs(::Array) <addprocs>` is called with a list of hostnames
- ``LocalManager``, used when :func:`addprocs` or :func:`addprocs(np::Integer) <addprocs>` are called
- ``SSHManager``, used when :func:`addprocs(hostnames::Array) <addprocs>` is called with a list of hostnames

:class:`LocalManager` is used to launch additional workers on the same host, thereby leveraging multi-core
and multi-processor hardware.

Thus, a minimal cluster manager would need to:

- be a subtype of the abstract :class:`ClusterManager`
- implement :func:`launch`, a method responsible for launching new workers
- implement :func:`manage`, which is called at various events during a worker's lifetime

:func:`addprocs(manager::FooManager) <addprocs>` requires ``FooManager`` to implement::

Expand Down Expand Up @@ -730,8 +755,8 @@ argument. Optionally ``--bind-to bind_addr[:port]`` may also be specified to ena
to connect to it at the specified ``bind_addr`` and ``port``. Useful for multi-homed hosts.


For every worker launched, the :func:`launch` method must add a :clas`WorkerConfig`
object with appropriate fields initialized to ``launched`` ::
For every worker launched, the :func:`launch` method must add a :class:`WorkerConfig`
object (with appropriate fields initialized) to ``launched`` ::

type WorkerConfig
# Common fields relevant to all cluster managers
Expand All @@ -753,6 +778,8 @@ object with appropriate fields initialized to ``launched`` ::
sshflags::Nullable{Cmd}
max_parallel::Nullable{Integer}

connect_at::Nullable{Any}

.....
end

Expand All @@ -778,7 +805,6 @@ required to connect to the workers from the master process.
``userdata`` is provided for custom cluster managers to store their own worker specific information.



:func:`manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol) <manage>` is called at different
times during the worker's lifetime with different ``op`` values:

Expand All @@ -789,6 +815,59 @@ times during the worker's lifetime with different ``op`` values:
interrupt signal.
- with ``:finalize`` for cleanup purposes.


Cluster Managers with custom transports
---------------------------------------

Replacing the default TCP/IP all-to-all socket connections with a custom transport layer is a little more involved.
Each julia process has as many communication tasks as the workers it is connected to. For example, consider a julia cluster of
32 processes in a all-to-all mesh network:

- Each julia process thus has 31 communication tasks
- Each task handles all incoming messages from a single remote worker in a message processing loop
- The message processing loop waits on an ``AsyncStream`` object - for example, a TCP socket in the default implementation, reads an entire
message, processes it and waits for the next one
- Sending messages to a process is done directly from any julia task - not just communication tasks - again, via the appropriate
``AsyncStream`` object

Replacing the default transport involves the new implementation to setup connections to remote workers, and to provide appropriate
``AsyncStream`` objects that the message processing loops can wait on. The manager specific callbacks to be implemented are::

connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)

The default implementation (which uses TCP/IP sockets) is implemented as ``connect(manager::ClusterManager, pid::Integer, config::WorkerConfig)``.

``connect`` should return a pair of ``AsyncStream`` objects, one for reading data sent from worker ``pid``,
and the other to write data that needs to be sent to worker ``pid``. Custom cluster managers can use an in-memory ``BufferStream``
as the plumbing to proxy data between the custom, possibly non-AsyncStream transport and julia's in-built parallel infrastructure.

A ``BufferStream`` is an in-memory ``IOBuffer`` which behaves like an ``AsyncStream``.

Folder ``examples/clustermanager/0mq`` is an example of using ZeroMQ is connect julia workers in a star network with a 0MQ broker in the middle.
Note: The julia processes are still all *logically* connected to each other - any worker can message any other worker directly without any
awareness of 0MQ being used as the transport layer.

When using custom transports:
- julia workers must be started with arguments ``--worker custom``. Just ``--worker`` will result in the newly launched
workers defaulting to the socket transport implementation
- For every logical connection with a worker, :func:`process_messages(rd::AsyncStream, wr::AsyncStream)` must be called
This launches a new task that handles reading and writing of messages from/to the worker represented by the ``AsyncStream`` objects
- :func:`init_worker(manager::FooManager)` must be called as part of worker process initializaton
- Field ``connect_at::Any`` in :class:`WorkerConfig` can be set by the cluster manager when ``launch`` is called. The value of
this field is passed in in all ``connect`` callbacks. Typically, it carries information on *how to connect* to a worker. For example,
the TCP/IP socket transport uses this field to specify the ``(host, port)`` tuple at which to connect to a worker


``kill(manager, pid, config)`` is called to remove a worker from the cluster.
On the master process, the corresponding ``AsyncStream`` objects must be closed by the implementation to ensure proper cleanup. The default
implementation simply executes an ``exit()`` call on the specified remote worker.

``examples/clustermanager/simple`` is an example that shows a simple implementation using unix domain sockets for cluster setup




.. rubric:: Footnotes

.. [#mpi2rma] In this context, MPI refers to the MPI-1 standard. Beginning with MPI-2, the MPI standards committee introduced a new set of communication mechanisms, collectively referred to as Remote Memory Access (RMA). The motivation for adding RMA to the MPI standard was to facilitate one-sided communication patterns. For additional information on the latest MPI standard, see http://www.mpi-forum.org/docs.
2 changes: 1 addition & 1 deletion doc/stdlib/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ General Parallel Computing Support
.. function:: addprocs(n::Integer; exeflags=``) -> List of process identifiers

Launches workers using the in-built ``LocalManager`` which only launches workers on the local host.
This can be used to take advantage of multiple cores. `addprocs(4)`` will add 4 processes on the local machine.
This can be used to take advantage of multiple cores. ``addprocs(4)`` will add 4 processes on the local machine.

.. function:: addprocs() -> List of process identifiers

Expand Down
27 changes: 27 additions & 0 deletions examples/clustermanager/0mq/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
This is a proof-of-concept that uses ZeroMQ as transport.
It uses a star topology as opposed to the native mesh network.

Package ZMQ must be installed. All workers only run on localhost.

All Julia nodes only connect to a "broker" process that listens on known ports
8100 and 8101 via ZMQ sockets.


All commands must be run from `examples/clustermanager/0mq` directory

First, start the broker. In a new console type:
julia broker.jl

This does not return.

Next, start a Julia REPL and type:
include("ZMQCM.jl")
ZMQCM.start_master(4) # start with four workers


Alternatively, head.jl, a test script could be run. It just launches the requested number of workers,
executes a simple command on all of them and exits.
julia head.jl 4

NOTE: As stated this is a proof-of-concept. A real Julia cluster using ZMQ will probably use
different ZMQ socket types and optimize the transport.
Loading

0 comments on commit a2edd64

Please sign in to comment.