Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pr 1668 #1680

Open
wants to merge 5 commits into
base: pytorch
Choose a base branch
from
Open

Pr 1668 #1680

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,26 @@ frames using the following command:
python -m alf.bin.play --root_dir=LOG_DIR
```

To launch single-node multi-gpu training, set the 'multi-gpu' argument
```bash
python -m alf.bin.train --conf=CONF_FILE --root_dir=LOG_DIR --distributed multi-gpu
```

To launch multi-node multi-gpu training, we use torch distirbuted launch module. The 'local_rank' for each process can be obtained from 'PerProcessContext' class, which can be used to assign gpu for your environment if you wish. To start training, run the following command on the host machine:
```bash
python -m torch.distributed.launch \
--nproc_per_node=NGPU_ON_NODE \
--nnodes=NUMBER_OF_NODES \
--node_rank=NODE_RANK \
--master_addr=HOST_IP \
--master_port=12345 \
./alf/bin/train.py \
--conf=CONF_FILE \
--root_dir=LOG_DIR \
--distributed multi-node-multi-gpu \
```
and simultaneously run the same command on each worker machine. For each worker machine, assign a NODE_RANK to it and update NGPU_ON_NODE if the number of GPUs is different from the host. Please make sure that all machines get a same copy of the codebase.

#### **Deprecated**

An older version of ALF used [gin](https://github.com/google/gin-config)
Expand Down
118 changes: 108 additions & 10 deletions alf/bin/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ def _define_flags():
flags.DEFINE_bool('store_snapshot', True,
'Whether store an ALF snapshot before training')
flags.DEFINE_enum(
'distributed', 'none', ['none', 'multi-gpu'],
'distributed', 'none', ['none', 'multi-gpu', 'multi-node-multi-gpu'],
'Set whether and how to run trainning in distributed mode.')
flags.mark_flag_as_required('root_dir')
flags.DEFINE_integer('local-rank', None,
'Local rank passed from distributed launcher')


FLAGS = flags.FLAGS
Expand All @@ -98,7 +100,6 @@ def _setup_logging(rank: int, log_dir: str):
FLAGS.alsologtostderr = True
logging.set_verbosity(logging.INFO)
logging.get_absl_handler().use_absl_log_file(log_dir=log_dir)
logging.use_absl_handler()


def _setup_device(rank: int = 0):
Expand All @@ -116,12 +117,13 @@ def _setup_device(rank: int = 0):
torch.cuda.set_device(rank)


def _train(root_dir, rank=0, world_size=1):
def _train(root_dir, local_rank=-1, rank=0, world_size=1):
"""Launch the trainer after the conf file has been parsed. This function
could be called by grid search after the config has been modified.

Args:
root_dir (str): Path to the directory for writing logs/summaries/checkpoints.
local_rank (int): The ID of the process within current node
rank (int): The ID of the process among all of the DDP processes. For
non-distributed training, this id should be 0.
world_size (int): The number of processes in total. If set to 1, it is
Expand All @@ -133,6 +135,8 @@ def _train(root_dir, rank=0, world_size=1):

if trainer_conf.ml_type == 'rl':
ddp_rank = rank if world_size > 1 else -1
if ddp_rank > -1 and local_rank > -1:
ddp_rank = local_rank
Comment on lines +138 to +139
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the reason for this?

trainer = policy_trainer.RLTrainer(trainer_conf, ddp_rank)
elif trainer_conf.ml_type == 'sl':
# NOTE: SLTrainer does not support distributed training yet
Expand Down Expand Up @@ -176,13 +180,12 @@ def training_worker(rank: int,
# Specialization for distributed mode
dist.init_process_group('nccl', rank=rank, world_size=world_size)
# Recover the flags when spawned as a sub process
if rank > 0:
Copy link

@JingyuQian JingyuQian Aug 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this if condition, if present during "multi-gpu" mode, will raise a "root_dir" has been defined twice error. Removing it seems to solve the problem. Need further confirmation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the contrary, if condition is needed so that flags will not be redefined for rank0 because rank0 process is the main process, which already defined the flags.

_define_flags()
FLAGS(sys.argv, known_only=True)
FLAGS.mark_as_parsed()
_define_flags()
FLAGS(sys.argv, known_only=True)
FLAGS.mark_as_parsed()
# Set the rank and total number of processes for distributed training.
PerProcessContext().set_distributed(
rank=rank, num_processes=world_size)
rank=rank, local_rank=-1, num_processes=world_size)
assert paras_queue is not None
PerProcessContext().set_paras_queue(paras_queue)

Expand All @@ -191,7 +194,68 @@ def training_worker(rank: int,

# Parse the configuration file, which will also implicitly bring up the environments.
common.parse_conf_file(conf_file)
_train(root_dir, rank, world_size)
_train(root_dir=root_dir, rank=rank, world_size=world_size)
except KeyboardInterrupt:
pass
except Exception as e:
if world_size >= 1:
# If the training worker is running as a process in multiprocessing
# environment, this will make sure that the exception raised in this
# particular process is captured and shown.
logging.exception(f'{mp.current_process().name} - {e}')
raise e
finally:
# Note that each training worker will have its own child processes
# running the environments. In the case when training worker process
# finishes ealier (e.g. when it raises an exception), it will hang
# instead of quitting unless all child processes are killed.
alf.close_env()


def training_worker_multi_node(local_rank: int,
rank: int,
world_size: int,
conf_file: str,
root_dir: str,
paras_queue: mp.Queue = None):
"""An executable instance that trains and evaluate the algorithm

Args:
local_rank (int): The ID of the process within current node.
rank (int): The ID of the process among all of the DDP processes.
world_size (int): The number of processes in total. If set to 1, it is
interpreted as "non distributed mode".
conf_file (str): Path to the training configuration.
root_dir (str): Path to the directory for writing logs/summaries/checkpoints.
paras_queue: a shared Queue for checking the consistency of model parameters
in different worker processes, if multi-gpu training is used.
"""
try:
_setup_logging(log_dir=root_dir, rank=rank)
_setup_device(local_rank)
if world_size > 1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's guaranteed that world_size > 1, right? So there is no need for the if

# Specialization for distributed mode
dist.init_process_group('nccl', rank=rank, world_size=world_size)
# Recover the flags when spawned as a sub process
# _define_flags()
FLAGS(sys.argv, known_only=True)
FLAGS.mark_as_parsed()
# Set the rank and total number of processes for distributed training.
PerProcessContext().set_distributed(
rank=rank, local_rank=local_rank, num_processes=world_size)
assert paras_queue is not None
PerProcessContext().set_paras_queue(paras_queue)

# Make PerProcessContext read-only.
PerProcessContext().finalize()

# Parse the configuration file, which will also implicitly bring up the environments.
common.parse_conf_file(conf_file)
_train(
root_dir=root_dir,
local_rank=local_rank,
rank=rank,
world_size=world_size)
except KeyboardInterrupt:
pass
except Exception as e:
Expand All @@ -215,7 +279,7 @@ def main(_):

conf_file = common.get_conf_file()

if FLAGS.store_snapshot:
if FLAGS.store_snapshot and int(os.environ['RANK']) == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may not be RANK environment variable for single process / single node job.

common.generate_alf_snapshot(common.alf_root(), conf_file, root_dir)

# FLAGS.distributed is guaranteed to be one of the possible values.
Expand Down Expand Up @@ -264,6 +328,40 @@ def main(_):
# exit code for the program.
raise ChildProcessError(f'Training failed on subprocess exception')

elif FLAGS.distributed == 'multi-node-multi-gpu':
local_rank = int(os.environ['LOCAL_RANK'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use FLAGS.local_rank?

rank = int(os.environ['RANK'])
world_size = int(os.environ['WORLD_SIZE'])
Comment on lines +333 to +334
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I couldn't find documentation about RANK or WORLD_SIZE environment variable for torch.distributed.launch. Can you give the link for the documentation?

print("local_rank: {} | rank: {} | world_size: {}".format(
local_rank, rank, world_size))

if world_size == 1:
logging.warn(
'Fallback to single GPU mode as there is only one GPU')
training_worker(
rank=0, world_size=1, conf_file=conf_file, root_dir=root_dir)
return

try:
# Create a shared queue for checking the consistency of the parameters
# in different work processes.
manager = mp.Manager()
paras_queue = manager.Queue()
training_worker_multi_node(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess Queue does not work across multiple machines. If so, need to update Trainer._check_dpp_paras_consistency for multi-node mode. Perhaps change if self._rank > 0: to if local_rank > 0:. If so, need to update the doc-string for TrainerConfig.ddp_paras_check_interval to say that it only check the consistency within one node. And please test if TraomerConfig.check_dpp_paras_consistency=True works properly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think paras_queue doesn't work at all. In multi-gpu mode, paras_queue can work because it is created by the main process. All the worker will use the same paras_queue. Here different worker will use the different queue, which makes no sense.
So it's better just to report unsupported error for TraomerConfig.check_dpp_paras_consistency=True in multi-node-multi-gpu mode.

local_rank=local_rank,
rank=rank,
world_size=world_size,
conf_file=conf_file,
root_dir=root_dir,
paras_queue=paras_queue)
except KeyboardInterrupt:
pass
except Exception as e:
# ``e`` has been printed in the subprocess, so here we won't print it
# again. But we raise another error so that we will have a correct
# exit code for the program.
raise ChildProcessError(f'Training failed on subprocess exception')


if __name__ == '__main__':
__spec__ = None # see https://github.com/HorizonRobotics/alf/pull/1554 for explanation
Expand Down
5 changes: 4 additions & 1 deletion alf/environments/process_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def _worker(conn: multiprocessing.connection,
torch_num_threads_per_env: int = 1,
ddp_num_procs: int = 1,
ddp_rank: int = -1,
local_rank: int = -1,
name: str = ''):
"""The process waits for actions and sends back environment results.

Expand Down Expand Up @@ -142,6 +143,7 @@ def _worker(conn: multiprocessing.connection,
SpawnedProcessContext(
ddp_num_procs=ddp_num_procs,
ddp_rank=ddp_rank,
local_rank=local_rank,
env_id=env_id,
env_ctor=env_constructor,
pre_configs=pre_configs))
Expand Down Expand Up @@ -299,13 +301,14 @@ def start(self, wait_to_start=True):

ddp_num_procs = PerProcessContext().num_processes
ddp_rank = PerProcessContext().ddp_rank
local_rank = PerProcessContext().local_rank

self._process = mp_ctx.Process(
target=_worker,
args=(conn, self._env_constructor, self._start_method,
alf.get_handled_pre_configs(), self._env_id, self._flatten,
self._fast, self._num_envs, self._torch_num_threads,
ddp_num_procs, ddp_rank, self._name),
ddp_num_procs, ddp_rank, local_rank, self._name),
name=f"ProcessEnvironment-{self._env_id}")
atexit.register(self.close)
self._process.start()
Expand Down
9 changes: 8 additions & 1 deletion alf/utils/per_process_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __new__(cls):
cls._instance = super(PerProcessContext, cls).__new__(cls)
cls._instance._read_only = False
cls._instance._ddp_rank = -1
cls._instance._local_rank = -1
cls._instance._num_processes = 1
return cls._instance

Expand All @@ -42,7 +43,8 @@ def finalize(self) -> None:
"""
self._read_only = True

def set_distributed(self, rank: int, num_processes: int) -> None:
def set_distributed(self, rank: int, local_rank: int,
num_processes: int) -> None:
"""Set the distributed properties.

Args:
Expand All @@ -53,6 +55,7 @@ def set_distributed(self, rank: int, num_processes: int) -> None:
raise AttributeError(
'Cannot mutate PerProcessContext after it is finalized')
self._ddp_rank = rank
self._local_rank = local_rank
self._num_processes = num_processes

def set_paras_queue(self, paras_queue: mp.Queue):
Expand All @@ -78,6 +81,10 @@ def is_distributed(self):
def ddp_rank(self):
return self._ddp_rank

@property
def local_rank(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since now there are two rank, ddp_rank and local_rank, it is better to clearly document what their meanings are.

return self._local_rank

@property
def num_processes(self):
return self._num_processes
1 change: 1 addition & 0 deletions alf/utils/spawned_process_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class SpawnedProcessContext(NamedTuple):
"""
ddp_num_procs: int
ddp_rank: int
local_rank: int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment ddp_rank and local_rank

env_id: int
env_ctor: Callable[..., AlfEnvironment]
pre_configs: List[Tuple[str, Any]]
Expand Down
Loading