Skip to content

Commit

Permalink
修改COPY-FROM No.13 distributed
Browse files Browse the repository at this point in the history
Signed-off-by: jjyaoao <jjyaoao@126.com>
  • Loading branch information
jjyaoao committed Jul 19, 2023
1 parent 3354d86 commit 07365d6
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 297 deletions.
93 changes: 3 additions & 90 deletions docs/api/paddle/distributed/QueueDataset_cn.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ QueueyDataset 是流式处理数据使用 Dataset 类。与 InmemoryDataset 继
代码示例
::::::::::::

.. code-block:: python
import paddle
dataset = paddle.distributed.QueueDataset()
COPY-FROM: paddle.distributed.QueueDataset

方法
::::::::::::
Expand Down Expand Up @@ -49,56 +46,7 @@ None。

**代码示例**

.. code-block:: python
import paddle
import os
paddle.enable_static()
with open("test_queue_dataset_run_a.txt", "w") as f:
data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
f.write(data)
with open("test_queue_dataset_run_b.txt", "w") as f:
data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
f.write(data)
slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
for slot in slots:
var = paddle.static.data(
name=slot, shape=[None, 1], dtype="int64", lod_level=1)
slots_vars.append(var)
dataset = paddle.distributed.QueueDataset()
dataset.init(
batch_size=1,
thread_num=2,
input_type=1,
pipe_command="cat",
use_var=slots_vars)
dataset.set_filelist(
["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"])
paddle.enable_static()
place = paddle.CPUPlace()
exe = paddle.static.Executor(place)
startup_program = paddle.static.Program()
main_program = paddle.static.Program()
exe.run(startup_program)
exe.train_from_dataset(main_program, dataset)
os.remove("./test_queue_dataset_run_a.txt")
os.remove("./test_queue_dataset_run_b.txt")
COPY-FROM: paddle.distributed.QueueDataset.init


set_filelist(filelist)
Expand All @@ -108,42 +56,7 @@ set_filelist(filelist)

**代码示例**

.. code-block:: python
import paddle
import os
paddle.enable_static()
with open("test_queue_dataset_run_a.txt", "w") as f:
data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
f.write(data)
with open("test_queue_dataset_run_b.txt", "w") as f:
data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
f.write(data)
dataset = paddle.distributed.QueueDataset()
slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
for slot in slots:
var = paddle.static.data(
name=slot, shape=[None, 1], dtype="int64", lod_level=1)
slots_vars.append(var)
dataset.init(
batch_size=1,
thread_num=2,
input_type=1,
pipe_command="cat",
use_var=slots_vars)
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
os.remove("./test_queue_dataset_run_a.txt")
os.remove("./test_queue_dataset_run_b.txt")
COPY-FROM: paddle.distributed.QueueDataset.set_filelist


**参数**
Expand Down
24 changes: 8 additions & 16 deletions docs/api/paddle/distributed/fleet/PaddleCloudRoleMaker_cn.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,15 @@ PaddleCloudRoleMaker 是基于从环境变量中获取分布式相关信息进
代码示例
::::::::::::

.. code-block:: python
.. code-block:: text
import os
import paddle.distributed.fleet as fleet
os.environ["PADDLE_PSERVER_NUMS"] = "2"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001"
os.environ["PADDLE_TRAINER_ID"] = "0"
fleet.PaddleCloudRoleMaker(is_collective=False)
from paddle.distributed.fleet.base.role_maker import Role
fleet.UserDefinedRoleMaker(
current_id=0,
role=Role.SERVER,
worker_num=2,
server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"])
方法
::::::::::::
Expand All @@ -45,7 +37,7 @@ string

**代码示例**

.. code-block:: python
.. code-block:: text
import paddle.distributed.fleet as fleet
role = fleet.PaddleCloudRoleMaker(is_collective=False)
Expand Down
7 changes: 2 additions & 5 deletions docs/api/paddle/distributed/fleet/UserDefinedRoleMaker_cn.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ UserDefinedRoleMaker 是基于从用户自定义的参数中获取分布式相
代码示例
::::::::::::

.. code-block:: python
.. code-block:: text
import paddle.distributed.fleet as fleet
from paddle.distributed.fleet.base.role_maker import Role
fleet.UserDefinedRoleMaker(
current_id=0,
role=Role.SERVER,
Expand All @@ -38,15 +37,13 @@ string

**代码示例**

.. code-block:: python
.. code-block:: text
import paddle.distributed.fleet as fleet
from paddle.distributed.fleet.base.role_maker import Role
role = fleet.UserDefinedRoleMaker(
current_id=0,
role=Role.SERVER,
worker_num=2,
server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"])
role.to_string()
132 changes: 5 additions & 127 deletions docs/api/paddle/distributed/fleet/UtilBase_cn.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,7 @@ Numpy.array|None:一个和 `input` 形状一致的 numpy 数组或 None。

**代码示例**

.. code-block:: python
# Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
import paddle.distributed.fleet as fleet
from paddle.distributed.fleet import PaddleCloudRoleMaker
import sys
import numpy as np
import os
os.environ["PADDLE_WITH_GLOO"] = "2"
def train():
role = PaddleCloudRoleMaker(
is_collective=False,
init_gloo=True,
path="./tmp_gloo")
fleet.init(role)
if fleet.is_server():
input = [1, 2]
output = fleet.util.all_reduce(input, "sum", "server")
print(output)
# [2, 4]
elif fleet.is_worker():
input = np.array([3, 4])
output = fleet.util.all_reduce(input, "sum", "worker")
print(output)
# [6, 8]
output = fleet.util.all_reduce(input, "sum", "all")
print(output)
# [8, 12]
if __name__ == "__main__":
train()
COPY-FROM: paddle.distributed.fleet.UtilBase.all_reduce

barrier(comm_world="worker")
'''''''''
Expand All @@ -68,35 +36,7 @@ barrier(comm_world="worker")

**代码示例**

.. code-block:: python
# Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
import paddle.distributed.fleet as fleet
from paddle.distributed.fleet import PaddleCloudRoleMaker
import sys
import os
os.environ["PADDLE_WITH_GLOO"] = "2"
def train():
role = PaddleCloudRoleMaker(
is_collective=False,
init_gloo=True,
path="./tmp_gloo")
fleet.init(role)
if fleet.is_server():
fleet.util.barrier("server")
print("all server arrive here")
elif fleet.is_worker():
fleet.util.barrier("worker")
print("all server arrive here")
fleet.util.barrier("all")
print("all servers and workers arrive here")
if __name__ == "__main__":
train()
COPY-FROM: paddle.distributed.fleet.UtilBase.barrier

all_gather(input, comm_world="worker")
'''''''''
Expand All @@ -113,39 +53,7 @@ all_gather(input, comm_world="worker")

**代码示例**

.. code-block:: python
# Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
import paddle.distributed.fleet as fleet
from paddle.distributed.fleet import PaddleCloudRoleMaker
import sys
import os
os.environ["PADDLE_WITH_GLOO"] = "2"
def train():
role = PaddleCloudRoleMaker(
is_collective=False,
init_gloo=True,
path="./tmp_gloo")
fleet.init(role)
if fleet.is_server():
input = fleet.server_index()
output = fleet.util.all_gather(input, "server")
print(output)
# output = [0, 1]
elif fleet.is_worker():
input = fleet.worker_index()
output = fleet.util.all_gather(input, "worker")
# output = [0, 1]
print(output)
output = fleet.util.all_gather(input, "all")
print(output)
# output = [0, 1, 0, 1]
if __name__ == "__main__":
train()
COPY-FROM: paddle.distributed.fleet.UtilBase.all_gather

get_file_shard(files)
'''''''''
Expand All @@ -166,23 +74,7 @@ get_file_shard(files)

**代码示例**

.. code-block:: python
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
role = role_maker.UserDefinedRoleMaker(
is_collective=False,
init_gloo=False,
current_id=0,
role=role_maker.Role.WORKER,
worker_endpoints=["127.0.0.1:6003", "127.0.0.1:6004"],
server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"])
fleet.init(role)
files = fleet.util.get_file_shard(["file1", "file2", "file3"])
print(files)
# files = ["file1", "file2"]
COPY-FROM: paddle.distributed.fleet.UtilBase.get_file_shard

print_on_rank(message, rank_id)
'''''''''
Expand All @@ -196,18 +88,4 @@ print_on_rank(message, rank_id)

**代码示例**

.. code-block:: python
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
role = role_maker.UserDefinedRoleMaker(
is_collective=False,
init_gloo=False,
current_id=0,
role=role_maker.Role.WORKER,
worker_endpoints=["127.0.0.1:6003", "127.0.0.1:6004"],
server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"])
fleet.init(role)
fleet.util.print_on_rank("I'm worker 0", 0)
COPY-FROM: paddle.distributed.fleet.UtilBase.print_on_rank
Loading

0 comments on commit 07365d6

Please sign in to comment.