Skip to content

Commit

Permalink
v3
Browse files Browse the repository at this point in the history
  • Loading branch information
li126com committed Oct 28, 2024
1 parent 882814f commit 0607da8
Show file tree
Hide file tree
Showing 22 changed files with 251 additions and 424 deletions.
12 changes: 5 additions & 7 deletions configs/7B_internlm2.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@
TRAIN_FOLDER = None
VALID_FOLDER = None # "/path/to/dataset"
data = dict(
# tokenizer_path = "/mnt/petrelfs/share_data/caizheng/pretrained/internlm2-chat-7b",
# type="streaming",
seq_len=SEQ_LEN,
# micro_num means the number of micro_batch contained in one gradient update
micro_num=8,
micro_num=4,
# packed_length = micro_bsz * SEQ_LEN
micro_bsz=1,
# defaults to the value of micro_num
Expand Down Expand Up @@ -92,7 +90,7 @@

hybrid_zero_optimizer = dict(
# Enable low_level_optimzer overlap_communication
overlap_sync_grad=False,
overlap_sync_grad=True,
overlap_sync_param=False,
# bucket size for nccl communication params
reduce_bucket_size=512 * 1024 * 1024,
Expand Down Expand Up @@ -182,9 +180,9 @@
"""
parallel = dict(
zero1=dict(size=-1),
tensor=dict(size=1, mode="mtp"),
pipeline=dict(size=4, interleaved_overlap=True, mode='zbv'),
weight=dict(size=1, overlap=True),
tensor=dict(size=2, mode="isp"),
pipeline=dict(size=1, interleaved_overlap=True),
weight=dict(size=2, overlap=True),
)

cudnn_deterministic = False
Expand Down
8 changes: 4 additions & 4 deletions doc/code-docs/locales/en/LC_MESSAGES/parallel.po
Original file line number Diff line number Diff line change
Expand Up @@ -563,8 +563,8 @@ msgstr ""
msgid "返回类型"
msgstr "Return type"

#: internlm.core.scheduler.pipeline_scheduler.InterleavedPipelineScheduler.forward_backward_step:19
#: internlm.core.scheduler.pipeline_scheduler.PipelineScheduler.forward_backward_step:19
#: internlm.core.scheduler.pipeline_scheduler_1f1b.InterleavedPipelineScheduler.forward_backward_step:19
#: internlm.core.scheduler.pipeline_scheduler_1f1b.PipelineScheduler.forward_backward_step:19
#: of
msgid "Tuple[:class:`torch.Tensor`]"
msgstr ""
Expand All @@ -579,11 +579,11 @@ msgstr ""
"To use interleaved pipeline scheduler, users need to set "
"``model.num_chunks > 1`` in the config file."

#: internlm.core.scheduler.pipeline_scheduler.InterleavedPipelineScheduler:1 of
#: internlm.core.scheduler.pipeline_scheduler_1f1b.InterleavedPipelineScheduler:1 of
msgid "Interleaved Pipeline Scheduler."
msgstr ""

#: internlm.core.scheduler.pipeline_scheduler.InterleavedPipelineScheduler.forward_backward_step:1
#: internlm.core.scheduler.pipeline_scheduler_1f1b.InterleavedPipelineScheduler.forward_backward_step:1
#: of
msgid ""
"Run interleaved 1F1B schedule (model split into model chunks), with "
Expand Down
4 changes: 2 additions & 2 deletions doc/code-docs/source/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ InternEvo 在流水线并行中使用 `1F1B <https://arxiv.org/pdf/2104.04473.pd
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
如果要使用非交错式调度, 需要设置 ``model.num_chunks = 1`` 。

.. autoclass:: internlm.core.scheduler.pipeline_scheduler.PipelineScheduler
.. autoclass:: internlm.core.scheduler.pipeline_scheduler_1f1b.PipelineScheduler
:members:

交错式流水线调度
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
如果要使用交错式调度, 需要设置 ``model.num_chunks > 1`` 。

.. autoclass:: internlm.core.scheduler.pipeline_scheduler.InterleavedPipelineScheduler
.. autoclass:: internlm.core.scheduler.pipeline_scheduler_1f1b.InterleavedPipelineScheduler
:members:

值得注意的是,在使用交错式流水线调度器时可启用通信优化功能,即在 1F1B 阶段启用异步通信,以充分利用上行/下行带宽并实现通信与计算重叠。
Expand Down
3 changes: 2 additions & 1 deletion doc/en/structure.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ The system code file structure is shown below:
│ │ │ └── process_group_initializer.py
│ │ ├── scheduler # Scheduling module, which manages schedulers for parallel training, including non-pipeline and pipeline parallel schedulers
│ │ │ ├── no_pipeline_scheduler.py
│ │ │ └── pipeline_scheduler.py
│ │ │ ├── pipeline_scheduler_1f1b.py
│ │ │ └── pipeline_scheduler_zb.py
│ │ ├── engine.py # Responsible for managing the training and evaluation process of the model
│ │ └── trainer.py # Responsible for managing the training engine and scheduler
│ ├── data # Data module, responsible for managing dataset generation and processing
Expand Down
3 changes: 2 additions & 1 deletion doc/structure.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
│ │ │ └── process_group_initializer.py
│ │ ├── scheduler # 调度模块,管理并行训练的调度器,包括非流水线并行调度器和流水线并行调度器
│ │ │ ├── no_pipeline_scheduler.py
│ │ │ └── pipeline_scheduler.py
│ │ │ ├── pipeline_scheduler_1f1b.py
│ │ │ └── pipeline_scheduler_zb.py
│ │ ├── engine.py # 负责管理模型的训练和评估过程
│ │ └── trainer.py # 负责管理训练引擎和调度器
│ ├── data # 数据模块,负责管理数据集生成和处理
Expand Down
2 changes: 1 addition & 1 deletion internlm/core/context/parallel_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def is_rank_for_log(self):
is_log_rank = is_log_rank and self.is_last_rank(ParallelMode.PIPELINE)
else:
is_log_rank = is_log_rank and self.is_first_rank(ParallelMode.PIPELINE)

return is_log_rank

def is_last_rank(self, parallel_mode: ParallelMode):
Expand Down
2 changes: 1 addition & 1 deletion internlm/core/parallel/comm/isp.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ def after_backward(self, scheduler, inputs_grad) -> None: # pylint: disable=W06
# accumulate left gradients in last bucket after backward.
if self._isp_communicator and self._isp_communicator.overlap:
self._zero_optim.accumulate_left_grads_after_backward()

if not self._zero_optim.skip_grad_reduce:
self._zero_optim.reduce_left_grads_after_backward()

Expand Down
7 changes: 4 additions & 3 deletions internlm/core/parallel/shard.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,12 @@ def partition_uniform(num_items: int, pipeline_parallel_size: int, num_chunks: i
left = pipeline_parallel_size - partition_items % pipeline_parallel_size
if chunk_size == 0:
raise ValueError("Some nodes in Pipeline have no requests")

if gpc.config.parallel["pipeline"]["mode"] == "ZBV" and idx == 1:
for p in range(pipeline_parallel_size-1, -1, -1):
for p in range(pipeline_parallel_size - 1, -1, -1):
st = base_idx
base_idx += chunk_size + ((pipeline_parallel_size - p - 1) >= left)
parts[p].append((st, base_idx))
parts[p].append((st, base_idx))
else:
for p in range(pipeline_parallel_size):
st = base_idx
Expand Down Expand Up @@ -236,6 +236,7 @@ def pipeline_parallel_sharding_wrapper(
kwargs["last"] = end == num_layers and len(all_parts[-1]) != 0
kwargs["device"] = device
kwargs["start_layer_idx"] = start

chunk = model_builder(**kwargs).to(device)
setattr(chunk, "first_layer", start)
setattr(chunk, "last_layer", end)
Expand Down
5 changes: 1 addition & 4 deletions internlm/core/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from .base_scheduler import BaseScheduler
from .no_pipeline_scheduler import NonPipelineScheduler
from .pipeline_scheduler_1f1b import (
InterleavedPipelineScheduler,
PipelineScheduler,
)
from .pipeline_scheduler_1f1b import InterleavedPipelineScheduler, PipelineScheduler
from .pipeline_scheduler_zb import (
ZeroBubblePipelineScheduler,
ZeroBubblePipelineVShapeScheduler,
Expand Down
2 changes: 1 addition & 1 deletion internlm/core/scheduler/comm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .p2p import (
AsynCommunicator,
fused_send_recv_tensor,
recv_backward,
recv_forward,
send_backward,
Expand All @@ -9,7 +10,6 @@
send_forward_backward_recv_forward_backward,
send_forward_recv_backward,
send_forward_recv_forward,
fused_send_recv_tensor,
)
from .utils import recv_obj_meta, send_obj_meta

Expand Down
22 changes: 8 additions & 14 deletions internlm/core/scheduler/comm/p2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def _communicate(
for req in reqs:
req.wait()
# To protect against race condition when using batch_isend_irecv().
internlm_accelerator.synchronize()
# internlm_accelerator.synchronize()

if recv_prev and recv_prev_split:
if isinstance(tensor_recv_prev, torch.Tensor):
Expand Down Expand Up @@ -279,15 +279,15 @@ def _communicate_async(

if len(ops) > 0:
reqs = dist.batch_isend_irecv(ops)

# return and do other things
yield

if len(ops) > 0:
for req in reqs:
for req in reqs: # pylint: disable=E0601
req.wait()
# To protect against race condition when using batch_isend_irecv().
internlm_accelerator.synchronize()
# internlm_accelerator.synchronize()

if recv_prev and recv_prev_split:
if isinstance(tensor_recv_prev, torch.Tensor):
Expand All @@ -306,14 +306,7 @@ def _communicate_async(
tensor_recv_next[index] = (
gather_split_1d_tensor(tensor_recv_next[index]).view(recv_next_shape[index]).requires_grad_()
)

# if tensor_recv_prev is not None and tensor_recv_next is None:
# yield tensor_recv_prev
# elif tensor_recv_next is not None and tensor_recv_prev is None:
# yield tensor_recv_next
# elif tensor_recv_next is None and tensor_recv_prev is None:
# yield None
# else:

yield tensor_recv_prev, tensor_recv_next


Expand Down Expand Up @@ -546,6 +539,7 @@ def send_forward_backward_recv_forward_backward(
)
return input_tensor, output_tensor_grad


def fused_send_recv_tensor(
object_send_next: Union[torch.Tensor, List[torch.Tensor]] = None,
object_send_prev: Union[torch.Tensor, List[torch.Tensor]] = None,
Expand Down Expand Up @@ -608,7 +602,7 @@ def __init__(
dtype: torch.dtype = None,
scatter_gather_tensors: bool = False,
) -> None:
self._need_receive = recv_prev_shape is not None or recv_next_shape is not None
self._need_receive = recv_prev_shape is not None or recv_next_shape is not None
self._coroutine = _communicate_async(
object_send_prev=object_send_prev,
object_send_next=object_send_next,
Expand Down
17 changes: 5 additions & 12 deletions internlm/core/scheduler/pipeline_scheduler_1f1b.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ def _call_engine(engine, data): # pylint: disable=W0237
elif isinstance(data, (list, tuple)):
return engine(*data)
elif isinstance(data, dict):
# print(f"data: {data}, {gpc.get_global_rank()}", flush=True)
stage_output = data.pop("stage_output", None)
if stage_output is None:
return engine(**data)
Expand Down Expand Up @@ -745,7 +744,6 @@ def __init__(
assert (
isinstance(num_chunks, int) and num_chunks > 0
), f"expected num_chunks to be an integer and larger than 0, but got {num_chunks}"
print(f"InterleavedPipelineScheduler", flush=True)

super().__init__(
num_microbatches,
Expand Down Expand Up @@ -841,13 +839,10 @@ def _forward_step(self, engine, chunk_id, input_obj=None):

if gpc.is_pipeline_first_stage() and len(self._input_objs[chunk_id]) == len(self._output_objs[chunk_id]):
self._input_objs[chunk_id].append(None)

if input_obj is None:
input_obj = self._input_objs[chunk_id][-1]

if input_obj is not None:
assert input_obj.requires_grad == True


if not gpc.is_pipeline_first_stage():
assert input_obj is not None, f"{gpc.get_global_rank()} input is None"
micro_batch_data = self.load_micro_batch(chunk_id)
Expand Down Expand Up @@ -892,7 +887,7 @@ def _forward_step(self, engine, chunk_id, input_obj=None):

self._output_objs[chunk_id].append(output_obj)
self._moe_losses[chunk_id].append(moe_loss)

assert output_obj is not None, f"{gpc.get_global_rank()} chunk{chunk_id} output is None"

return output_obj
Expand Down Expand Up @@ -929,7 +924,6 @@ def _get_chunk_by_microbatch(self, step_id: int, backward: bool = False) -> int:
"""Helper method to get the model chunk ID given the iteration number."""
microbatch_id_in_group = step_id % (self._pp_size * self._num_chunks)
chunk_id = microbatch_id_in_group // self._pp_size


if backward:
chunk_id = self._num_chunks - chunk_id - 1
Expand All @@ -942,7 +936,7 @@ def _get_current_microbatch_id(self, step_id: int) -> int:
# microbatch_id: 1 2 3 4 1 2 3 4 5 6 7 8 5 6 7 8
num_microbatch_group = step_id // (self._pp_size * self._num_chunks)
step_id_in_group = step_id % (self._pp_size * self._num_chunks)

microbatch_id = num_microbatch_group * self._pp_size + step_id_in_group % self._pp_size

return microbatch_id
Expand Down Expand Up @@ -1414,7 +1408,6 @@ def forward_backward_step(self, engine, data_iter, forward_only=False, return_lo
output, label = pack_return_tensors(self._return_tensors)
else:
output, label = (None, None)


if hasattr(gpc.config.model, "num_experts") and gpc.config.model.num_experts > 1:
dist.all_reduce(self._accum_moe_loss, group=gpc.get_group(ParallelMode.PIPELINE))
Expand All @@ -1430,4 +1423,4 @@ def forward_backward_step(self, engine, data_iter, forward_only=False, return_lo
if hasattr(gpc.config.model, "num_experts"):
return output, label, accum_loss, accum_moe_loss
else:
return output, label, accum_loss
return output, label, accum_loss
Loading

0 comments on commit 0607da8

Please sign in to comment.