Skip to content

Commit

Permalink
enable continuous log; update doc (#40782)
Browse files Browse the repository at this point in the history
  • Loading branch information
kuizhiqing authored Mar 23, 2022
1 parent b518fa2 commit fdafbc7
Show file tree
Hide file tree
Showing 12 changed files with 330 additions and 155 deletions.
2 changes: 1 addition & 1 deletion python/paddle/distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

from .spawn import spawn # noqa: F401
from .fleet.launch import launch # noqa: F401
from .launch.main import launch # noqa: F401

from .parallel import init_parallel_env # noqa: F401
from .parallel import get_rank # noqa: F401
Expand Down
66 changes: 0 additions & 66 deletions python/paddle/distributed/launch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,69 +13,3 @@
# limitations under the License.

__all__ = []
'''
Paddle distributed training entry ``python -m paddle.distributed.launch``.
Help
# for arg usage and explanation, try the following command
# python -m paddle.distributed.launch -h
Collective Mode
Case 1: 1 node
use all visible devices
# python -m paddle.distributed.launch train.py
use specified devices
# python -m paddle.distributed.launch --devices=0,1,2,3 train.py
Case 2: multi-node, auto detect ip/port
# python -m paddle.distributed.launch --nnodes 2 train.py
# auto print following command
# python -m paddle.distributed.launch --master 10.0.0.1:13538 --nnodes 2 demo.py
# then copy and paste above command to other nodes
Case 3: multi-node, specified master/rendezvous server
# python -m paddle.distributed.launch --nnodes 2 --master 10.0.0.1:2379 train.py
# the master ip must be one of the node and the port must available
Parameter Server Mode
Case 1.1: 1 node, 1 ps, 1 trainer
# python -m paddle.distributed.launch --mode ps train.py
# python -m paddle.distributed.launch --server_num=1 --trainer_num=1 train.py
Case 1.2: 1 node, 2 ps, 2 trainer
# python -m paddle.distributed.launch --server_num=2 --trainer_num=2 train.py
Case 2: 2 node, 2 ps, 2 trainer per node
# python -m paddle.distributed.launch --server_num=2 --trainer_num=2 --nnodes 2 train.py
# auto print following command
# python -m paddle.distributed.launch --master 10.0.0.1:13538 --server_num=2 --trainer_num=2 --nnodes 2 train.py
# then copy and paste above command to other nodes
Case 3: multi-node, specified master/rendezvous server
# python -m paddle.distributed.launch --master 10.0.0.1:13538 --server_num=2 --trainer_num=2 --nnodes 2 train.py
# the master ip must be one of the node and the port must available
Case 4: specified servers and trainers in each node
python -m paddle.distributed.launch --servers 127.0.0.1:8900,127.0.0.1:8901 --trainers 127.0.0.1:8902,127.0.0.1:8903 train.py
Elastic Mode
# run following command in 3 node to run immediately, or in 2 node to run after elastic_timeout
# python -m paddle.distributed.launch --master etcd://10.0.0.1:2379 --nnodes 2:3 train.py
# once the peer number changes between 2:3, the strategy holds
'''
29 changes: 2 additions & 27 deletions python/paddle/distributed/launch/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .context import Context
from . import controllers
from .main import launch


def launch():
# initialize the context to run
ctx = Context()

if ctx.is_legacy_mode():

# legacy mode
from paddle.distributed.fleet import launch
launch.launch()

else:

# initialize the selected controller
c = controllers.init(ctx)

# run the pods
c.run()

# manager or just wait pod
c.finalize()


if __name__ == "__main__":
launch()
launch()
6 changes: 6 additions & 0 deletions python/paddle/distributed/launch/context/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ def get_logger(self, level=logging.INFO):
logger.addHandler(ch)
return logger

def continous_log(self) -> bool:
if self.args.log_level.upper() in ['DEBUG', 'ERROR']:
return True
else:
return False

def set_env_in_args(self):
for k, v in env_args_mapping.items():
if k in self.envs:
Expand Down
10 changes: 5 additions & 5 deletions python/paddle/distributed/launch/context/args_envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
'PADDLE_MASTER': 'master',
'PADDLE_DEVICES': 'devices',
'PADDLE_NNODES': 'nnodes',
'PADDLE_MODE': 'mode',
'PADDLE_RUN_MODE': 'run_mode',
'PADDLE_LOG_LEVEL': 'log_level',
'PADDLE_NPROC_PER_NODE': 'nproc_per_node',
'PADDLE_JOB_ID': 'job_id',
Expand Down Expand Up @@ -60,7 +60,7 @@ def parse_args():
"--legacy", type=bool, default=False, help="use legacy launch")

base_group.add_argument(
"--rank", type=int, default=-1, help="the peer rank")
"--rank", type=int, default=-1, help="the node rank")

base_group.add_argument(
"--log_level", type=str, default="INFO", help="log level. Default INFO")
Expand All @@ -69,7 +69,7 @@ def parse_args():
"--nnodes",
type=str,
default="1",
help="the number of peers, i.e. pod/node number")
help="the number of nodes, i.e. pod/node number")

base_group.add_argument(
"--nproc_per_node",
Expand All @@ -83,7 +83,7 @@ def parse_args():
default="log",
help="the path for each process's log. Default ./log")
base_group.add_argument(
"--mode",
"--run_mode",
type=str,
default="collective",
help="run mode of the job, collective/ps/ps-heter")
Expand Down Expand Up @@ -146,6 +146,6 @@ def parse_args():
"--elastic_timeout",
type=int,
default=30,
help="seconds to wait before elastic perform training")
help="seconds to wait before elastic job begin to train")

return parser.parse_known_args()
42 changes: 2 additions & 40 deletions python/paddle/distributed/launch/controllers/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,46 +115,6 @@ def register(self):

self.master.register_heartbeat(self.job.id, self.pod.name)

def watch(self) -> bool:
'''
watch self and peer status, return true to exit
'''

self.ctx.logger.info("Watching {}".format(self.pod))
while not self.ctx.status.is_done():
# self status
status = self.pod.watch(timeout=2)
self.ctx.logger.debug("Pod status {}, Ctx status {}".format(
status, self.ctx.status.current()))

# completed
if status == self.ctx.status.COMPLETED:
self.master.set_status(status)
self.ctx.status.complete()
self.ctx.logger.info("Pod complete {}".format(status))
return True

# self failure
elif status == self.ctx.status.FAILED:
self.master.set_status(status)
self.master.restart_peer()
self.ctx.logger.info("Pod failed {}".format(status))
self.pod.stop()

if self.ctx.args.elastic_level <= 0:
return True
else:
return False

# peer failure
if self.ctx.status.is_restarting() and self.master.get_status(
) != self.ctx.status.COMPLETED:
self.pod.stop()
return False

#peers = self.master.fetch_peer_alive()
#print("peers {}".format(peers))

def run(self):

timeout = self.ctx.args.elastic_timeout if self.job.elastic else self.ctx.args.elastic_timeout * 10
Expand All @@ -164,6 +124,8 @@ def run(self):

self.build_job()

self.ctx.logger.info("Waiting peer ready...")

ok, replicas = self.master.wait_peer_ready(
self.job.replicas_min, self.job.replicas_max, timeout)
if ok:
Expand Down
53 changes: 43 additions & 10 deletions python/paddle/distributed/launch/controllers/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self, ctx):
self.master = Master.factory(self.ctx)

self.job = Job(nnodes=self.ctx.args.nnodes,
mode=self.ctx.args.mode,
mode=self.ctx.args.run_mode,
jid=self.ctx.args.job_id)
self.pod = Pod()

Expand All @@ -65,18 +65,51 @@ def run(self):
self.watch()

def watch(self) -> bool:
'''
watch self and peer status, return true to exit
'''
#TODO(kuizhiqing) unify ctx.status and master status

self.ctx.logger.info("Watching {}".format(self.pod))

status = self.pod.watch()
while not self.ctx.status.is_done():
status = self.pod.watch(timeout=2)

if self.ctx.continous_log():
self.pod.logs()

# completed
if status == self.ctx.status.COMPLETED:
self.ctx.status.complete()

self.master.set_status(status)

self.ctx.logger.info("Pod {}".format(status))
return True

# self failure
elif status == self.ctx.status.FAILED:
self.ctx.status.fail()

self.master.set_status(status)
self.master.restart_peer()

fc = self.pod.failed_container()
self.ctx.logger.info("Pod {}".format(status))
self.ctx.logger.error("Container failed !!!\n{}".format(fc[0]))
fc[0].tail()
self.pod.stop()

if self.ctx.args.elastic_level <= 0:
return True
else:
return False

if status == self.ctx.status.COMPLETED:
self.ctx.logger.info("Pod {}".format(status))
elif status == self.ctx.status.FAILED:
fc = self.pod.failed_container()
self.ctx.logger.info("Pod {}".format(status))
self.ctx.logger.error("Container failed !!!\n{}".format(fc[0]))
fc[0].tail()
self.pod.stop()
# peer failure
if self.ctx.status.is_restarting() and self.master.get_status(
) != self.ctx.status.COMPLETED:
self.pod.stop()
return False

def stop(self, sigint=None):
self.ctx.logger.debug("Controller stop")
Expand Down
13 changes: 11 additions & 2 deletions python/paddle/distributed/launch/controllers/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ def __init__(self, ctx):
def stop(self):
raise NotImplementedError

def set_status(self, status):
pass

def get_status(self):
return None

def restart_peer(self):
pass

def sync_peers(self, prefix, key, value, size, rank=-1) -> (list, int):
raise NotImplementedError

Expand Down Expand Up @@ -122,7 +131,7 @@ def sync_peers(self, prefix, key, value, size, rank=-1) -> (list, int):
if size < 2:
return [value], 0

self.ctx.logger.info("Waiting peer ready...")
self.ctx.logger.info("Waiting peer start...")

self.lazy_init()

Expand Down Expand Up @@ -184,7 +193,7 @@ def sync_peers(self, prefix, key, value, size, rank=-1) -> (list, int):
if size < 2:
return [value], 0

self.ctx.logger.info("Waiting peer ready...")
self.ctx.logger.info("Waiting peer start...")

path = "{}/{}/{}".format(prefix, key, rank)

Expand Down
4 changes: 2 additions & 2 deletions python/paddle/distributed/launch/controllers/ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
class PSController(Controller):
@classmethod
def enable(cls, ctx):
if ctx.args.mode == ControleMode.PS or ctx.args.server_num or len(
if ctx.args.run_mode == ControleMode.PS or ctx.args.server_num or len(
ctx.args.servers) > 0 or ctx.args.trainer_num or len(
ctx.args.trainers) > 0:
ctx.logger.debug("{} enabled".format(cls.__name__))
ctx.args.mode = ControleMode.PS
ctx.args.run_mode = ControleMode.PS
return True
else:
return False
Expand Down
Loading

0 comments on commit fdafbc7

Please sign in to comment.