From 1ac5eec1947dae81726115243dc06e26e8ece0e5 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Mon, 17 Sep 2018 17:30:12 +0800 Subject: [PATCH 1/3] refine cluster train quickstart --- .../howto/training/cluster_quick_start.rst | 242 +++++++++++------- 1 file changed, 146 insertions(+), 96 deletions(-) diff --git a/doc/fluid/user_guides/howto/training/cluster_quick_start.rst b/doc/fluid/user_guides/howto/training/cluster_quick_start.rst index 6131c92d6f5386..4ca78f3b61ba29 100644 --- a/doc/fluid/user_guides/howto/training/cluster_quick_start.rst +++ b/doc/fluid/user_guides/howto/training/cluster_quick_start.rst @@ -9,110 +9,169 @@ 在本篇文章中,我们将会在介绍如何快速在一个集群中启动一个 PaddlePaddle 的分布式训练任务,在开始之前,请按如下步骤做些准备工作: -1. 准备一个至少4个节点的集群,并且保证网络可以联通,在本文中我们使用 - ``*.paddlepaddle.com`` 来表示每个节点的主机名称,您可以根据集群的实际情况来修改它。 +1. 准备一个网络连通的训练集群,在本文中我们使用4个训练节点使用 ``*.paddlepaddle.com`` + 来表示节点的主机名称,您可以根据实际情况修改它。 -2. 在开始之前确保已经阅读过 :ref:`how_to_install` +2. 在开始之前确保已经阅读过 :ref:`install_steps` 并且可以在集群的所有节点上可以正常运行 PaddlePaddle。 -启动集群训练任务 ----------------- - -在启动集群训练脚本时,需要在不同的节点上指定不同的环境变量,具体如下: - -+-----------------+-----------------+-----------------+---------------------+ -| 环境变量 | 数据类型 | 样例 | 描述 | -+=================+=================+=================+=====================+ -| PADDLE_TRAINING | str | PSERVER,TRAINER | 训练节点的角色 | -| _ROLE | | | | -+-----------------+-----------------+-----------------+---------------------+ -| PADDLE_PSERVER_ | str | ps0.paddlepaddl | 所有 pserver | -| IPS | | e.com,ps1.paddl | 节点的 IP | -| | | epaddle.com… | 地址或 | -| | | | hostname, | -| | | | 用“,”分隔 | -+-----------------+-----------------+-----------------+---------------------+ -| PADDLE_PSERVER_ | int | 6174 | pserver | -| PORT | | | 节点监听的端口 | -+-----------------+-----------------+-----------------+---------------------+ -| PADDLE_TRAINERS | int | 2 | 训练任务中 | -| | | | trainer | -| | | | 节点的数量 | -+-----------------+-----------------+-----------------+---------------------+ -| PADDLE_CURRENT_ | str | ps0.paddlepaddl | 当前 pserver | -| IP | | e.com | 节点的 IP | -| | | | 地址或 hostanme | -+-----------------+-----------------+-----------------+---------------------+ -| PADDLE_TRAINER_ | int | 0 | 当前 trainer | -| ID | | | 节点的唯一 ID, | -| | | | 取值范围为从0开始到 | -| | | | PADDLE_TRAINERS-1 | -+-----------------+-----------------+-----------------+---------------------+ - 样例代码 -~~~~~~~~ +------- -将下面程序代码保存为 ``fluid_dist.py`` +下面使用一个非常简单的限行回归模型作为样例来解释如何启动一个包含2个 pserver 节点以及 +2个 trainer 节点的分布式训练任务,您可以将本段代码保存为 ``dist_train.py`` .. code:: python - import paddle - import paddle.fluid as fluid - import contextlib - import numpy - import unittest - - # train reader - BATCH_SIZE = 20 - - train_reader = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.uci_housing.train(), buf_size=500), - batch_size=BATCH_SIZE) - - test_reader = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.uci_housing.test(), buf_size=500), - batch_size=BATCH_SIZE) + import os + import paddle + import paddle.fluid as fluid + + # train reader + BATCH_SIZE = 20 + EPOCH_NUM = 30 + BATCH_SIZE = 8 + + train_reader = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.uci_housing.train(), buf_size=500), + batch_size=BATCH_SIZE) + + def train(): + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + x = fluid.layers.data(name='x', shape=[13], dtype='float32') + y_predict = fluid.layers.fc(input=x, size=1, act=None) + + loss = fluid.layers.square_error_cost(input=y_predict, label=y) + avg_loss = fluid.layers.mean(loss) + opt = fluid.optimizer.SGD(learning_rate=0.001) + opt.minimize(avg_loss) + + place = fluid.CPUPlace() + feeder = fluid.DataFeeder(place=place, feed_list=[x, y]) + exe = fluid.Executor(place) + + # fetch distributed training environment setting + training_role = os.getenv("PADDLE_TRAINING_ROLE", None) + port = os.getenv("PADDLE_PSERVER_PORT", "6174") + pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "") + trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) + eplist = [] + for ip in pserver_ips.split(","): + eplist.append(':'.join([ip, port])) + pserver_endpoints = ",".join(eplist) + trainers = int(os.getenv("PADDLE_TRAINERS")) + current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port + + t = fluid.DistributeTranspiler() + t.transpile( + trainer_id = trainer_id, + pservers = pserver_endpoints, + trainers = trainers) + + if training_role == "PSERVER": + pserver_prog = t.get_pserver_program(current_endpoint) + startup_prog = t.get_startup_program(current_endpoint, pserver_prog) + exe.run(startup_prog) + exe.run(pserver_prog) + elif training_role == "TRAINER": + trainer_prog = t.get_trainer_program() + exe.run(fluid.default_startup_program()) + + for epoch in range(EPOCH_NUM): + for batch_id, batch_data in enumerate(train_reader()): + avg_loss_value, = exe.run(trainer_prog, + feed=feeder.feed(batch_data), + fetch_list=[avg_loss]) + if (batch_id + 1) % 10 == 0: + print("Epoch: {0}, Batch: {1}, loss: {2}".format( + epoch, batch_id, avg_loss_value[0])) + # destory the resource of current trainer node in pserver server node + exe.close() + else: + raise AssertionError("PADDLE_TRAINING_ROLE should be one of [TRAINER, PSERVER]") + + train() + +环境变量说明 +----------- + +在启动分布式训练任务时,使用不同的环境变量来表示不同的节点角色,具体如下: +.. list-table:: + :header-rows: 1 + + * - 环境变量 + - 数据类型 + - 样例 + - 描述 + * - :code:`PADDLE_TRAINING_ROLE` + - str + - :code:`PSERVER,TRAINER` + - 当前训练节点角色 + * - :code:`PADDLE_PSERVER_IPS` + - str + - :code:`ps0.paddlepaddle.com,ps1.paddlepaddle.com` + - 分布式训练任务中所有 pserver 节点的 IP 地址或 hostname, 使用","分隔 + * - :code:`PADDLE_PSERVER_PORT` + - int + - 6174 + - pserver 进程监听的端口 + * - :code:`PADDLE_TRAINERS` + - int + - 2 + - 分布式训练任务中 trainer 节点的数量 + * - :code:`PADDLE_CURRENT_IP` + - str + - ps0.paddlepaddle.com + - 当前 pserver 节点的 IP 地址或 hostname + * - :code:`PADDLE_TRAINER_ID` + - str + - 0 + - 当前 trainer 节点的 ID (唯一), 取值范围为 [0, PADDLE_TRAINERS) + + +分布式训练相关 API +------------------ + +DistributeTranspiler +~~~~~~~~~~~~~~~~~~~~~~ + +基于 pserver-trainer 架构的的分布式训练任务分为两种角色: Parameter Server(pserver) 以及 trainer, +在 Fluid 中,用户只需配置单机训练所需要的网络配置, ``DistributeTranspiler`` 模块会自动地根据 +当前训练节点的角色将用户配置的单机网路配置改写成 pserver 和 trainer 需要运行的网络配置: - def train_program(): - y = fluid.layers.data(name='y', shape=[1], dtype='float32') - x = fluid.layers.data(name='x', shape=[13], dtype='float32') - y_predict = fluid.layers.fc(input=x, size=1, act=None) - - loss = fluid.layers.square_error_cost(input=y_predict, label=y) - avg_loss = fluid.layers.mean(loss) - - return avg_loss +.. code:: python - def optimizer_func(): - return fluid.optimizer.SGD(learning_rate=0.001) + t = fluid.DistributeTranspiler() + t.transpile( + trainer_id = trainer_id, + pservers = pserver_endpoints, + trainers = trainers) + if PADDLE_TRAINING_ROLE == "TRAINER": + # fetch the pserver program and execute it + trainer_prog = t.get_trainer_program() + ... - def train(use_cuda, train_program): - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + elif PADDLE_TRAINER_ROLE == "PSERVER": + # fetch the trainer program and execute it + pserver_prog = t.get_pserver_program(current_endpoint) + ... - trainer = fluid.Trainer( - train_func=train_program, place=place, optimizer_func=optimizer_func) +exe.close() +~~~~~~~~~~~~~~ - def event_handler(event): - if isinstance(event, fluid.EndStepEvent): - if event.step == 10: - test_metrics = trainer.test( - reader=test_reader, feed_order=['x', 'y']) - print("step {0}, loss: {1}".format(event.step, test_metrics)) - trainer.stop() +pserver 节点中会保存所有 trainer 节点的状态信息,在 trainer结束训练时需要调用 ``exe.close()`` +通知所有 PServer 节点释放当前 Trainer 节点的资源: - trainer.train( - reader=train_reader, - num_epochs=100, - event_handler=event_handler, - feed_order=['x', 'y']) +.. code:: python - train(False, train_program) + exe = fluid.Executor(fluid.CPUPlace()) + # training process ... + exe.close() # notify PServer to destory the resource -启动trainer节点和pserver节点 -~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +启动分布式训练任务 +-------------------- .. list-table:: :header-rows: 1 @@ -132,12 +191,3 @@ * - trainer1.paddlepaddle.com - :code:`PADDLE_TRAINING_ROLE=TRAINER PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_TRAINER_ID=1 PADDLE_PSERVER_PORT=6174 python fluid_dist.py` - 启动第1号 trainer 节点 - -**注意** - -- 需要先启动pserver节点再启动trainer节点 -- 看到trainer节点输出如下日志表示训练任务执行正确 - - .. code:: bash - - step 10, loss: [258.2326202392578] From aa736d3791bcf7c11418ae8cf2c660b12217e7a7 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Mon, 17 Sep 2018 20:36:46 +0800 Subject: [PATCH 2/3] update by comment --- doc/fluid/user_guides/howto/training/cluster_quick_start.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/doc/fluid/user_guides/howto/training/cluster_quick_start.rst b/doc/fluid/user_guides/howto/training/cluster_quick_start.rst index 4ca78f3b61ba29..464b6d59505ed9 100644 --- a/doc/fluid/user_guides/howto/training/cluster_quick_start.rst +++ b/doc/fluid/user_guides/howto/training/cluster_quick_start.rst @@ -18,7 +18,7 @@ 样例代码 ------- -下面使用一个非常简单的限行回归模型作为样例来解释如何启动一个包含2个 pserver 节点以及 +下面使用一个非常简单的线性回归模型作为样例来解释如何启动一个包含2个 pserver server 节点以及 2个 trainer 节点的分布式训练任务,您可以将本段代码保存为 ``dist_train.py`` .. code:: python @@ -130,6 +130,7 @@ - 0 - 当前 trainer 节点的 ID (唯一), 取值范围为 [0, PADDLE_TRAINERS) +注: 环境变量只是获取运行时信息的一种方式,实际任务中可以采用命令行参数等方式获取运行时信息。 分布式训练相关 API ------------------ From 16f4917ee3b0ef737957ab1769af6ce5f371254d Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Tue, 18 Sep 2018 14:46:21 +0800 Subject: [PATCH 3/3] update content --- doc/fluid/user_guides/howto/training/cluster_quick_start.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/fluid/user_guides/howto/training/cluster_quick_start.rst b/doc/fluid/user_guides/howto/training/cluster_quick_start.rst index 464b6d59505ed9..1a8d13f2d34af6 100644 --- a/doc/fluid/user_guides/howto/training/cluster_quick_start.rst +++ b/doc/fluid/user_guides/howto/training/cluster_quick_start.rst @@ -123,7 +123,7 @@ - 分布式训练任务中 trainer 节点的数量 * - :code:`PADDLE_CURRENT_IP` - str - - ps0.paddlepaddle.com + - :code:`ps0.paddlepaddle.com` - 当前 pserver 节点的 IP 地址或 hostname * - :code:`PADDLE_TRAINER_ID` - str