Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The imperative programming paradigm #7464

Closed
wangkuiyi opened this issue Jan 11, 2018 · 18 comments
Closed

The imperative programming paradigm #7464

wangkuiyi opened this issue Jan 11, 2018 · 18 comments

Comments

@wangkuiyi
Copy link
Collaborator

wangkuiyi commented Jan 11, 2018

r = fluid.data.mnist.train()

W = fluid.Tensor()
b = fluid.Tensor()

with fluid.While(iters=100):
    mb = r.next()

    x = fluid.layer.data(mb.image)
    l = fluid.layer.data(mb.label)
    y = fluid.layer.fc(x, W, b)
    cost = fluid.layer.mse(y, l)

    fluid.optimize(cost)
    
fluid.print(W, b)
@wangkuiyi
Copy link
Collaborator Author

wangkuiyi commented Jan 11, 2018

r = fluid.data.mnist.train()

W = fluid.Tensor()
b = fluid.Tensor()

with fluid.While(iters=100):
    mb = r.next()

    x = fluid.layer.data(mb.image)
    l = fluid.layer.data(mb.label)
    y = fluid.layer.fc(x, W, b)
    cost = fluid.layer.mse(y, l)

    fluid.optimize(cost)
    
fluid.save(W, b, "model.json")
W, b = fluid.load("model.json", "W", "b")

mb = r.next()[1:2]
x = fluid.layer.data(mb)
y = fluid.layer.fc(x, W, b)
fluid.print(y)

@wangkuiyi
Copy link
Collaborator Author

We can use atexit to register a launcher.

For example, we could have a launcher which saves the ProgramDesc into a file, or another launcher that calls the C++ Executor class to run the ProgramDesc, or yet another launcher that calls a transpiler (or a pipe of transpilers) to convert the ProgramDesc into C++ programs and build them into a binary executable.

@wangkuiyi
Copy link
Collaborator Author

Do we still need the data layer, or should we just call fluid.layer.fc(input=mb.image, ...)?

@reyoung
Copy link
Collaborator

reyoung commented Jan 12, 2018

default

For fully parallel, we might use goroutine and channel.

BTW, I suggest we use Process instead of Goroutine, since

  1. Process is a general concept than goroutine. Sharing data between processes directly is forbidden. The only way to share data between processes is by using channels. (We can also rename channel to pipe or mailbox to adopt the concepts in the operating system.)

  2. We are not using golang and our users may not familiar with golang. However, users should familiar with the operating system.

Some basic thinking about parallel program

There is some basic thinking of imperative programming paradigm in parallel.

  1. Every Process will run a Program. The Program will be run when all inputs are ready. In other words, Process is a runtime Program. (It is a very common concept in the operating system.)

  2. Communication between Program can only use a channel or named pipe. As we show in the figure, the Channel is not a part of any program. So Channel should not be a Variable. It should be an independent type.

  3. We should create a new compile-type representation to manage multiple/concurrent Program and Channels. I am not sure the name of this representation. Maybe ConcurrentProgram or ProgramGroup?

There are some fundamental questions.

Q1: Should we use Channel and Process to implement CSP? A similar approach to CSP is actor model, which erlang implemented.

Here is a simple introduction about Actor Model:

  • In actor model, there are only one first-order concept Process, rather than Process and Channel in CSP.
  • Starting a Process will return a PID. Every process can send message to another process by using PID.
  • Every Process has one mailbox(A similar concept like Channel). Process can read other process message from the mailbox. Each message contains the sender PID and the message payload.
  • The only way to share data is sending messages. It can be implemted by Queue locally and TCP remotely.

I think the actor model may be better because

  • it is simpler than CSP. The only first order concept in Actor model is Process.
  • Actor model is more natural for cluster. A channel instance cannot be devided in two nodes directly. We must add Send/Recv operators to channel, and split one channel into two channels for two nodes. How about P2P communication for every node? It will be an exploration of channels. However, the mailbox in Actor model can be naturally devided into multiple nodes. Sending and recving a message could be done by network.

Q2: Should we implement coroutines or fibers for Process? Should we support yield the running (or release the CPU to another Process explicitly)?

  • If we using thread to implement Process. It will be an exploration of threads no matter we use CSP or Actor model. The operator system will be busy in switching threads.
  • Since the ONLY way to communicate between Processes is messaging, (no matter we use channel or mailbox). If a process read a message and the message is not ready, the process can release CPU. It can be implemented by coroutine. We can dig out how golang or erlang implement them.

This can be implemented by thread temporarily. But at least, we should have a single, long-term goal for everyone.

Q3: It is hard for users write a CSP or Actor Model directly. Should we provide a transpiler for this?

I think it is neccesary to privde a transpiler to a CSP or Actor Model.The whole idea about a CSP or Actor Model for deep learning is very exciting. However, it is very hard to implement CSP and a transpiler directly. We should devided it into several sub-tasks. Perhaps we will have some temporary implementations, but we should have a long-term goal and plan in details to get this splendid job done.

@wangkuiyi
Copy link
Collaborator Author

We might be able to use Python multiprocess to communicate between a Python process and a Fluid process. -- from @emailweixu

@typhoonzero
Copy link
Contributor

typhoonzero commented Jan 15, 2018

I took a glance at http://ray.readthedocs.io/en/latest/tutorial.html, for distributed training fluid API, we may have two main design:

  1. Using channel
grad_param_mapping = dict()

def def_trainer():
    r = fluid.data.mnist.train()
    W = fluid.Tensor()
    b = fluid.Tensor()
    with fluid.While(iters=100):
        mb = r.next()

        x = fluid.layer.data(mb.image)
        l = fluid.layer.data(mb.label)
        y = fluid.layer.fc(x, W, b)
        cost = fluid.layer.mse(y, l)
        grads, params = fluid.bp(cost)
        for idx, grad in enumerate(grads):
        grad_param_mapping[grad.name] = params[idx]
        fluid.send(grads, overwrite=params)

def def_pserver():
    with fluid.listen_and_serv() as server:
        with fluid.While():
            grad = server.recv_chan.get()
            param = grad_param_mapping.get(grad.name)
            fluid.optimize(param, grad)
            server.return_chan.push(param)
  1. Using actor mode (https://github.com/ray-project/ray/blob/master/examples/parameter_server/async_parameter_server.py)
@fluid.remote
class Pserver:
    def __init__(self, name_map):
        self.grad_param_mapping = name_map
    def send(self, grad):
        param = self.grad_param_mapping.get(grad.name)
        fluid.optimize(param, grad)
    def get(self, varname):
        return self.grad_param_mapping.get(grad.name)

@fluid.remote
def trainer():
    ...

@QiJune
Copy link
Member

QiJune commented Jan 15, 2018

A simple demo of integrating fluid and ray.

import ray
import paddle.v2 as paddle
import paddle.v2.fluid as fluid
import numpy

ray.init()

NUM_WORKERS = 2
BATCH_SIZE = 128


def mnist_network():
    img = fluid.layers.data(name='img', shape=[784])
    hidden = fluid.layers.fc(img, size=100, act='tanh',
                             param_attr='fc.w',
                             bias_attr='fc.b')
    prediction = fluid.layers.fc(hidden, size=10, act='softmax',
                                 param_attr='sftmax.w',
                                 bias_attr='sftmax.b')
    label = fluid.layers.data(name='label',shape=[1],
                              dtype='int64')
    loss = fluid.layers.cross_entropy(input=prediction, label=label)


    avg_loss = fluid.layers.mean(x=loss)
    fluid.backward.append_backward(avg_loss)
    return img, label, avg_loss


@ray.remote
class Worker(object):
    def __init__(self, worker_id):
        self.worker_id = worker_id
        self.scope = fluid.core.Scope()
        self.program = fluid.Program()
        self.startup = fluid.Program()
        with fluid.program_guard(self.program, self.startup):
            img, label, self.loss = mnist_network()

        self.place = fluid.CPUPlace()
        self.executor = fluid.Executor(self.place)
        self.reader_creator = paddle.batch(
            paddle.reader.shuffle(
                paddle.dataset.mnist.train(), buf_size=8192),
            batch_size=BATCH_SIZE)

        self.reader = self.reader_creator()
        self.feeder = fluid.DataFeeder(feed_list=[img, label], place=self.place)


    def compute_gradient(self, weights):
        for var_name in weights:
            tensor = self.scope.var(var_name).get_tensor()
            tensor.set(weights[var_name][0], self.place)
            tensor.set_lod(weights[var_name][1])

        try:
            data = next(self.reader)
        except:
            self.reader = self.reader_creator()
            data = next(self.reader)

        outs = self.executor.run(self.program,
                     feed=self.feeder.feed(data),
                               scope=self.scope,
                               fetch_list=[ var_name + "@GRAD" for var_name in weights] + [self.loss])
        if self.worker_id == 0:
            print outs[-1]
        return outs[:-1]


@ray.remote
class PServer(object):
    def __init__(self, learning_rate):
        self.scope = fluid.core.Scope()
        self.learning_rate = learning_rate
        self.program = fluid.Program()
        self.startup = fluid.Program()
        with fluid.program_guard(self.program, self.startup):
            mnist_network()

        self.place = fluid.CPUPlace()
        self.executor = fluid.Executor(self.place)
        self.executor.run(self.startup, scope=self.scope)
        self.optimize_program = fluid.Program()

    def apply_gradients(self, *gradients):
        # TODO(qijun) an optimization program is needed
        mean_gradients = numpy.mean(gradients, axis=0)
        weights = self.get_weight()
        for idx, name in enumerate(weights):
            w = weights[name][0]
            w -= self.learning_rate * mean_gradients[idx]
            self.scope.find_var(name).get_tensor().set(w, self.place)

    def get_weight(self):
        weights = dict()
        for p in self.program.global_block().iter_parameters():
            lod_tensor = self.scope.find_var(p.name).get_tensor()
            weights[p.name] = (numpy.copy(numpy.array(lod_tensor)), lod_tensor.lod())
        return weights


if __name__ == '__main__':
    ps = PServer.remote(1e-3 * NUM_WORKERS)
    weights = ps.get_weight.remote()

    works = [Worker.remote(i) for i in range(NUM_WORKERS)]

    while True:
        gradients = [work.compute_gradient.remote(weights) for work in works]
        ps.apply_gradients.remote(*gradients)
        weights = ps.get_weight.remote()

@typhoonzero
Copy link
Contributor

@QiJune in the above code, ps should be a member of Worker, and the worker should call get_weight after apply_gradients.

And, yes, using ray be a simple way to implement dist train, but I doubt it could use more memory because it's using a redis instance to store objects, so all the weights in pserver must store both in fluid and redis so that workers can fetch them, and I must take a look into the implementaion of ray to confirm it that's true.

@reyoung
Copy link
Collaborator

reyoung commented Jan 15, 2018

ps should be a member of Worker, and the worker should call get_weight after apply_gradients.

No.

gradients = [work.compute_gradient.remote(weights) for work in works]
ps.apply_gradients.remote(*gradients)

This example contains N workers and one pserver. In the main function, these lines will wait all workers complete.

@typhoonzero
Copy link
Contributor

ps should be a member of Worker, and the worker should call get_weight after apply_gradients.

Sorry, my fault. @ray.remote decorated class is server instances, it should not be the member of the worker. Yet workers should update local weights after ps.apply_gradients.remote(*gradients)

@chengduoZH
Copy link
Contributor

chengduoZH commented Jan 15, 2018

        try:
            data = next(self.reader)
        except:
            self.reader = self.reader_creator()
            data = next(self.reader)

This is to say that all the workers use their own data reader, right? @QiJune
In my view, the Worker.compute_gradient's parameter should be data and weights.

@chengduoZH chengduoZH reopened this Jan 15, 2018
@QiJune
Copy link
Member

QiJune commented Jan 15, 2018

@chengduoZH This is just a simple demo following sync_parameter_server in ray.
And we can add several data reader actors and a data provider actor for many workers.

@QiJune
Copy link
Member

QiJune commented Jan 15, 2018

@typhoonzero Has refine the code and call get_weight method after apply_gradients . The loss curve seems normal.

@typhoonzero
Copy link
Contributor

@QiJune Cool 👍!

@reyoung
Copy link
Collaborator

reyoung commented Jan 15, 2018

Just list some facts about Ray.

Ray might not be complete for our needs for several reasons:

  1. Ray is strongly tied to Python and using pickle to serialize objects.
  2. The scheduler is implemented inside Ray.
  • Ray can take over all CPUs and GPUs as a service, by ray start
  • In users' scripts, use ray.init can connect to the cluster service before.
  • Actors are managed by Ray schedulers. i.e., which node will execute the actor is decided by Ray framework.
  • We cannot use k8s to explicitly customize the scheduler without digging into Ray deeply.
  • The scheduler of Ray is not mature for now(according the issues and documentation, there could be schedule two actors on the same GPU when there are actually two GPUs).
  1. The Ray implements the Actor Model. However, Ray gives an object-oriented API for hiding the real message APIs of Actor Models. Comparing the standard actor model, there are some limits of Ray.

Goods about Ray.

  1. A separate project about communication. It can be easily integrated into any framework.
  2. Messages will only be read by its need. If an actor is just receiving a message and routing it to another actor, the message will not be read twice.
  3. Ray implements actor-model, which is scalable and fault-tolerant by nature.
  4. Ray has a WebUI for debugging.

We may not use Ray directly. However, we can learn a lot from this project. We may implement a separate project for communication and provides C++ API to the fluid. Fluid may provide some operators or higher-level concepts to represent Actor Model or CSP. It seems to be a practical way.

@wangkuiyi
Copy link
Collaborator Author

I quickly went over Ray. It seems an implementation of the Actor model in Python? In my understanding, Ray is heavily integrated with Python. However, Fluid is trying to get out of Python. Are they having conflicting ideas?

@Yancey1989
Copy link
Contributor

From @typhoonzero

for distributed training fluid API, we may have two main design:
Using channel

def def_pserver():
    with fluid.listen_and_serv() as server:
        with fluid.While():
            grad = server.recv_chan.get()
            param = grad_param_mapping.get(grad.name)
            fluid.optimize(param, grad)
            server.return_chan.push(param)

For my personal opinion:

  1. Recv Op is executed in the background, so maybe we need to create a Goroutine and execute it and pass the channel as an argument.
  2. A Channel need a type.

So, maybe the program would look like:

def def_pserver():
    chan = fluid.Channel(type=sendrecv.message)
    with fluid.Go(chan):
        with fluild.listen_and_serv() as server:
                msg = server.get_msg()
                chan.push(msg)

    with fluid.While():
         grad = chan.get()
         param = grad_param_mapping.get(grad.name)
         fluid.optimize(param, grad)
         return_chan.push(param)

@shanyi15
Copy link
Collaborator

您好,此issue在近一个月内暂无更新,我们将于今天内关闭。若在关闭后您仍需跟进提问,可重新开启此问题,我们将在24小时内回复您。因关闭带来的不便我们深表歉意,请您谅解~感谢您对PaddlePaddle的支持!
Hello, this issue has not been updated in the past month. We will close it today for the sake of other user‘s experience. If you still need to follow up on this question after closing, please feel free to reopen it. In that case, we will get back to you within 24 hours. We apologize for the inconvenience caused by the closure and thank you so much for your support of PaddlePaddle Group!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests