Skip to content

Commit

Permalink
Merge pull request PaddlePaddle#61 from Yancey1989/refine_cluster_tra…
Browse files Browse the repository at this point in the history
…in_quickstart

refine cluster train quickstart
  • Loading branch information
shanyi15 authored Sep 18, 2018
2 parents d16c5ca + 16f4917 commit bb5cda6
Showing 1 changed file with 147 additions and 96 deletions.
243 changes: 147 additions & 96 deletions doc/fluid/user_guides/howto/training/cluster_quick_start.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,110 +9,170 @@
在本篇文章中,我们将会在介绍如何快速在一个集群中启动一个 PaddlePaddle
的分布式训练任务,在开始之前,请按如下步骤做些准备工作:

1. 准备一个至少4个节点的集群,并且保证网络可以联通,在本文中我们使用
``*.paddlepaddle.com`` 来表示每个节点的主机名称,您可以根据集群的实际情况来修改它
1. 准备一个网络连通的训练集群,在本文中我们使用4个训练节点使用 ``*.paddlepaddle.com``
来表示节点的主机名称,您可以根据实际情况修改它

2. 在开始之前确保已经阅读过 :ref:`how_to_install`
2. 在开始之前确保已经阅读过 :ref:`install_steps`
并且可以在集群的所有节点上可以正常运行 PaddlePaddle。

启动集群训练任务
----------------

在启动集群训练脚本时,需要在不同的节点上指定不同的环境变量,具体如下:

+-----------------+-----------------+-----------------+---------------------+
| 环境变量 | 数据类型 | 样例 | 描述 |
+=================+=================+=================+=====================+
| PADDLE_TRAINING | str | PSERVER,TRAINER | 训练节点的角色 |
| _ROLE | | | |
+-----------------+-----------------+-----------------+---------------------+
| PADDLE_PSERVER_ | str | ps0.paddlepaddl | 所有 pserver |
| IPS | | e.com,ps1.paddl | 节点的 IP |
| | | epaddle.com… | 地址或 |
| | | | hostname, |
| | | | 用“,”分隔 |
+-----------------+-----------------+-----------------+---------------------+
| PADDLE_PSERVER_ | int | 6174 | pserver |
| PORT | | | 节点监听的端口 |
+-----------------+-----------------+-----------------+---------------------+
| PADDLE_TRAINERS | int | 2 | 训练任务中 |
| | | | trainer |
| | | | 节点的数量 |
+-----------------+-----------------+-----------------+---------------------+
| PADDLE_CURRENT_ | str | ps0.paddlepaddl | 当前 pserver |
| IP | | e.com | 节点的 IP |
| | | | 地址或 hostanme |
+-----------------+-----------------+-----------------+---------------------+
| PADDLE_TRAINER_ | int | 0 | 当前 trainer |
| ID | | | 节点的唯一 ID, |
| | | | 取值范围为从0开始到 |
| | | | PADDLE_TRAINERS-1 |
+-----------------+-----------------+-----------------+---------------------+

样例代码
~~~~~~~~
-------

将下面程序代码保存为 ``fluid_dist.py``
下面使用一个非常简单的线性回归模型作为样例来解释如何启动一个包含2个 pserver server 节点以及
2个 trainer 节点的分布式训练任务,您可以将本段代码保存为 ``dist_train.py``

.. code:: python
import paddle
import paddle.fluid as fluid
import contextlib
import numpy
import unittest
# train reader
BATCH_SIZE = 20
train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.uci_housing.train(), buf_size=500),
batch_size=BATCH_SIZE)
test_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.uci_housing.test(), buf_size=500),
batch_size=BATCH_SIZE)
import os
import paddle
import paddle.fluid as fluid
# train reader
BATCH_SIZE = 20
EPOCH_NUM = 30
BATCH_SIZE = 8
train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.uci_housing.train(), buf_size=500),
batch_size=BATCH_SIZE)
def train():
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
x = fluid.layers.data(name='x', shape=[13], dtype='float32')
y_predict = fluid.layers.fc(input=x, size=1, act=None)
loss = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_loss = fluid.layers.mean(loss)
opt = fluid.optimizer.SGD(learning_rate=0.001)
opt.minimize(avg_loss)
place = fluid.CPUPlace()
feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
exe = fluid.Executor(place)
# fetch distributed training environment setting
training_role = os.getenv("PADDLE_TRAINING_ROLE", None)
port = os.getenv("PADDLE_PSERVER_PORT", "6174")
pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
eplist = []
for ip in pserver_ips.split(","):
eplist.append(':'.join([ip, port]))
pserver_endpoints = ",".join(eplist)
trainers = int(os.getenv("PADDLE_TRAINERS"))
current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
t = fluid.DistributeTranspiler()
t.transpile(
trainer_id = trainer_id,
pservers = pserver_endpoints,
trainers = trainers)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(current_endpoint)
startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
exe.run(startup_prog)
exe.run(pserver_prog)
elif training_role == "TRAINER":
trainer_prog = t.get_trainer_program()
exe.run(fluid.default_startup_program())
for epoch in range(EPOCH_NUM):
for batch_id, batch_data in enumerate(train_reader()):
avg_loss_value, = exe.run(trainer_prog,
feed=feeder.feed(batch_data),
fetch_list=[avg_loss])
if (batch_id + 1) % 10 == 0:
print("Epoch: {0}, Batch: {1}, loss: {2}".format(
epoch, batch_id, avg_loss_value[0]))
# destory the resource of current trainer node in pserver server node
exe.close()
else:
raise AssertionError("PADDLE_TRAINING_ROLE should be one of [TRAINER, PSERVER]")
train()
环境变量说明
-----------

在启动分布式训练任务时,使用不同的环境变量来表示不同的节点角色,具体如下:

.. list-table::
:header-rows: 1

* - 环境变量
- 数据类型
- 样例
- 描述
* - :code:`PADDLE_TRAINING_ROLE`
- str
- :code:`PSERVER,TRAINER`
- 当前训练节点角色
* - :code:`PADDLE_PSERVER_IPS`
- str
- :code:`ps0.paddlepaddle.com,ps1.paddlepaddle.com`
- 分布式训练任务中所有 pserver 节点的 IP 地址或 hostname, 使用","分隔
* - :code:`PADDLE_PSERVER_PORT`
- int
- 6174
- pserver 进程监听的端口
* - :code:`PADDLE_TRAINERS`
- int
- 2
- 分布式训练任务中 trainer 节点的数量
* - :code:`PADDLE_CURRENT_IP`
- str
- :code:`ps0.paddlepaddle.com`
- 当前 pserver 节点的 IP 地址或 hostname
* - :code:`PADDLE_TRAINER_ID`
- str
- 0
- 当前 trainer 节点的 ID (唯一), 取值范围为 [0, PADDLE_TRAINERS)

注: 环境变量只是获取运行时信息的一种方式,实际任务中可以采用命令行参数等方式获取运行时信息。

分布式训练相关 API
------------------

DistributeTranspiler
~~~~~~~~~~~~~~~~~~~~~~

基于 pserver-trainer 架构的的分布式训练任务分为两种角色: Parameter Server(pserver) 以及 trainer,
在 Fluid 中,用户只需配置单机训练所需要的网络配置, ``DistributeTranspiler`` 模块会自动地根据
当前训练节点的角色将用户配置的单机网路配置改写成 pserver 和 trainer 需要运行的网络配置:

def train_program():
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
x = fluid.layers.data(name='x', shape=[13], dtype='float32')
y_predict = fluid.layers.fc(input=x, size=1, act=None)
loss = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_loss = fluid.layers.mean(loss)
return avg_loss
.. code:: python
def optimizer_func():
return fluid.optimizer.SGD(learning_rate=0.001)
t = fluid.DistributeTranspiler()
t.transpile(
trainer_id = trainer_id,
pservers = pserver_endpoints,
trainers = trainers)
if PADDLE_TRAINING_ROLE == "TRAINER":
# fetch the pserver program and execute it
trainer_prog = t.get_trainer_program()
...
def train(use_cuda, train_program):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
elif PADDLE_TRAINER_ROLE == "PSERVER":
# fetch the trainer program and execute it
pserver_prog = t.get_pserver_program(current_endpoint)
...
trainer = fluid.Trainer(
train_func=train_program, place=place, optimizer_func=optimizer_func)
exe.close()
~~~~~~~~~~~~~~

def event_handler(event):
if isinstance(event, fluid.EndStepEvent):
if event.step == 10:
test_metrics = trainer.test(
reader=test_reader, feed_order=['x', 'y'])
print("step {0}, loss: {1}".format(event.step, test_metrics))
trainer.stop()
pserver 节点中会保存所有 trainer 节点的状态信息,在 trainer结束训练时需要调用 ``exe.close()``
通知所有 PServer 节点释放当前 Trainer 节点的资源:

trainer.train(
reader=train_reader,
num_epochs=100,
event_handler=event_handler,
feed_order=['x', 'y'])
.. code:: python
train(False, train_program)
exe = fluid.Executor(fluid.CPUPlace())
# training process ...
exe.close() # notify PServer to destory the resource
启动trainer节点和pserver节点
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
启动分布式训练任务
--------------------

.. list-table::
:header-rows: 1
Expand All @@ -132,12 +192,3 @@
* - trainer1.paddlepaddle.com
- :code:`PADDLE_TRAINING_ROLE=TRAINER PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_TRAINER_ID=1 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
- 启动第1号 trainer 节点

**注意**

- 需要先启动pserver节点再启动trainer节点
- 看到trainer节点输出如下日志表示训练任务执行正确

.. code:: bash
step 10, loss: [258.2326202392578]

0 comments on commit bb5cda6

Please sign in to comment.