-
Notifications
You must be signed in to change notification settings - Fork 121
a preliminary implementation of single-process simulator, which still has problems #884
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
a preliminary implementation of single-process simulator, which still has problems #884
Conversation
Summary of ChangesHello @shuailong616, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request delivers an initial implementation of a single-process simulator for the FlagScale framework. The simulator aims to facilitate the analysis of pipeline parallelism costs and memory consumption across various heterogeneous device configurations. By introducing a dummy distributed backend and adapting core training components, it allows for theoretical performance evaluation and optimal configuration generation in a non-distributed setting, which is crucial for efficient large-scale model training. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a preliminary implementation of a single-process simulator for auto-tuning, which is a significant feature. The changes include a new dummy distributed backend, scripts for configuration generation and pipeline time analysis, and modifications to the training code to support simulation mode. My review has identified several issues, including two critical bugs that would either crash the program or break existing training functionality. I have also found a number of high-severity issues related to incorrect logic, security vulnerabilities, and maintainability problems like hardcoded paths and commands. I've provided detailed feedback and code suggestions to address these points. Overall, this is a good starting point, and addressing these comments will significantly improve the robustness and correctness of the simulator.
| micro_batch_size=args.micro_batch_size, | ||
| decoder_seq_length=args.decoder_seq_length, | ||
| forward_only=False, | ||
| forward_only=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting forward_only=True unconditionally will break the backward pass and prevent the model from training. This change is intended for the simulator to measure forward pass time. It must be guarded by a check for simulator mode, for example: forward_only=args.enable_simulator.
| forward_only=True, | |
| forward_only=args.enable_simulator, |
| # os.environ["WORLD_SIZE"] = args.world_size | ||
| os.environ["WORLD_SIZE"] = "8" | ||
| # os.environ["WORLD_SIZE"] = "32" | ||
| rdav_endpoint = random.randint(0, 40000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| else: | ||
| return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The is_extreme_strategy function has a logic error. The else: return False statement is inside the loop, which will cause the function to return after checking only the first mesh. The return False should be moved outside the loop to ensure all meshes in the combination are checked.
):
return True
return False| # each stage onlt depends on its next stage | ||
| if scheme == '1F1B' or scheme == 'AFAB': | ||
| pipeline_cost = pp_last_stage_time | ||
| for stage_from_last in range(2, num_pp_stages): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The loop for stage_from_last in range(2, num_pp_stages): seems to have an off-by-one error. It will not iterate over all the necessary stages. For example, if num_pp_stages is 3, the loop only runs for stage_from_last = 2, missing the calculation for the first stage (index 0). To include all stages from the second-to-last down to the first, the range should be range(2, num_pp_stages + 1).
| for stage_from_last in range(2, num_pp_stages): | |
| for stage_from_last in range(2, num_pp_stages + 1): |
| os.environ["PYTHONPATH"] = ( | ||
| "/workspace/20251010/new/FlagScale:" | ||
| "/workspace/20251010/new/FlagScale/third_party/Megatron-LM" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| config_file.write(f"{config_data}\n") | ||
|
|
||
| print(f"Hetero configurations saved to {output_config_file}") | ||
|
|
||
|
|
||
| import ast | ||
| import json | ||
|
|
||
|
|
||
| def read_configs_from_json(file_path: str): | ||
| configs_list = [] | ||
| with open(file_path, "r") as file: | ||
| for line in file: | ||
| # config_data = json.loads(line.strip()) | ||
| config_data = ast.literal_eval(line.strip()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The script serializes configuration dictionaries to a file using str(dict) and deserializes them using ast.literal_eval. This is brittle and not a standard practice. It's better to use the json module (json.dumps to write and json.loads to read) for serializing and deserializing data. This is more robust and interoperable.
| pp_layers_split=hetero_config['pp_layer_split'], | ||
| ) | ||
| print(f"pipeline cost: {pp_cost}") | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| c10::intrusive_ptr<Work> BackendDummy::allreduce( | ||
| std::vector<at::Tensor>& tensors, | ||
| const AllreduceOptions& opts) { | ||
| // printf("dummy allreduce\n"); | ||
| for (auto& tensor : tensors) { | ||
| tensor.zero_(); | ||
| } | ||
|
|
||
| auto future = c10::make_intrusive<c10::ivalue::Future>( | ||
| c10::ListType::create(c10::TensorType::get())); | ||
| future->markCompleted(c10::IValue(tensors)); | ||
| return c10::make_intrusive<WorkDummy>(OpType::ALLGATHER, std::move(future)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The allreduce function incorrectly uses OpType::ALLGATHER when creating the WorkDummy object. It should use OpType::ALLREDUCE. This issue is present in many of the collective implementations in this file. Using the correct OpType is important for correctness and debugging, even in a dummy backend.
| c10::intrusive_ptr<Work> BackendDummy::allreduce( | |
| std::vector<at::Tensor>& tensors, | |
| const AllreduceOptions& opts) { | |
| // printf("dummy allreduce\n"); | |
| for (auto& tensor : tensors) { | |
| tensor.zero_(); | |
| } | |
| auto future = c10::make_intrusive<c10::ivalue::Future>( | |
| c10::ListType::create(c10::TensorType::get())); | |
| future->markCompleted(c10::IValue(tensors)); | |
| return c10::make_intrusive<WorkDummy>(OpType::ALLGATHER, std::move(future)); | |
| c10::intrusive_ptr<Work> BackendDummy::allreduce( | |
| std::vector<at::Tensor>& tensors, | |
| const AllreduceOptions& opts) { | |
| // printf("dummy allreduce\n"); | |
| for (auto& tensor : tensors) { | |
| tensor.zero_(); | |
| } | |
| auto future = c10::make_intrusive<c10::ivalue::Future>( | |
| c10::ListType::create(c10::TensorType::get())); | |
| future->markCompleted(c10::IValue(tensors)); | |
| return c10::make_intrusive<WorkDummy>(OpType::ALLREDUCE, std::move(future)); | |
| } |
| #use_gloo_process_groups=args.enable_gloo_process_groups, | ||
| #use_gloo_process_groups=False, | ||
| # If the user is asking for a non-zero embedding init std, skip weight decay for embeddings | ||
| # to avoid embeddings from shrinking to zero as recommended in https://arxiv.org/abs/2312.16903 | ||
| default_skip_embedding_weight_decay=args.embedding_init_method_std is not None, | ||
| #default_skip_embedding_weight_decay=args.embedding_init_method_std is not None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| ), "\flength of list {num_layers_per_stage} should match {num_stages}" | ||
| assert ( | ||
| len(fwd_time_per_stage_chunk) == num_pp_stages | ||
| ), "\flength of list {fwd_time_per_stage_chunk} should match {num_stages}" | ||
| assert ( | ||
| len(bwd_time_per_stage_chunk) == num_pp_stages | ||
| ), "\flength of list {bwd_time_per_stage_chunk} should match {num_stages}" | ||
| assert ( | ||
| len(comm_time_between_stages) == num_pp_stages | ||
| ), "\flength of list {comm_time_between_stages} should match {num_stages}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertion messages on lines 33, 36, 39, and 42 use \f which is a form-feed character. This is likely a typo and was intended to be an f-string for proper message formatting. Additionally, the variable names inside the string are incorrect. For example, on line 33, {num_layers_per_stage} should be {len(pp_layers_split)} and {num_stages} should be {num_pp_stages}.
| ), "\flength of list {num_layers_per_stage} should match {num_stages}" | |
| assert ( | |
| len(fwd_time_per_stage_chunk) == num_pp_stages | |
| ), "\flength of list {fwd_time_per_stage_chunk} should match {num_stages}" | |
| assert ( | |
| len(bwd_time_per_stage_chunk) == num_pp_stages | |
| ), "\flength of list {bwd_time_per_stage_chunk} should match {num_stages}" | |
| assert ( | |
| len(comm_time_between_stages) == num_pp_stages | |
| ), "\flength of list {comm_time_between_stages} should match {num_stages}" | |
| ), f"length of list pp_layers_split {len(pp_layers_split)} should match num_pp_stages {num_pp_stages}" | |
| assert ( | |
| len(fwd_time_per_stage_chunk) == num_pp_stages | |
| ), f"length of list fwd_time_per_stage_chunk {len(fwd_time_per_stage_chunk)} should match num_pp_stages {num_pp_stages}" | |
| assert ( | |
| len(bwd_time_per_stage_chunk) == num_pp_stages | |
| ), f"length of list bwd_time_per_stage_chunk {len(bwd_time_per_stage_chunk)} should match num_pp_stages {num_pp_stages}" | |
| assert ( | |
| len(comm_time_between_stages) == num_pp_stages | |
| ), f"length of list comm_time_between_stages {len(comm_time_between_stages)} should match num_pp_stages {num_pp_stages}" |
|
|
Modify pr 389 to adapt the code for the new version of FlagScale
#389