2222import vllm .distributed
2323import vllm .envs as envs
2424from torch .distributed import ProcessGroup
25- from torch .distributed .distributed_c10d import (Backend , PrefixStore ,
26- _get_default_timeout ,
27- is_nccl_available )
28- from torch .distributed .rendezvous import rendezvous
2925from vllm .config import ParallelConfig , VllmConfig
3026from vllm .v1 .engine .core import DPEngineCoreProc
27+ from vllm .utils import stateless_init_torch_distributed_process_group
3128
3229
3330def ascend_destroy_model_parallel ():
@@ -49,112 +46,6 @@ def ascend_destroy_model_parallel():
4946 destory_ascend_model_parallel ()
5047
5148
52- def stateless_init_torch_distributed_process_group (
53- host : str , port : int , rank : int , world_size : int ,
54- backend : str ) -> ProcessGroup :
55- """
56- A replacement for `torch.distributed.init_process_group` that does not
57- pollute the global state. The created ProcessGroup object can be used for
58- some operations such as `allreduce`, because it does not depend on the
59- global rank. However, some operations such as `broadcast` cannot be used
60- because it depends on the global rank.
61-
62- # TODO: ask for help from PyTorch team if we need the `broadcast` operation.
63-
64- This function is useful when we are not sure about the total number of
65- processes in the process group. For example, we may have process
66- 1, 2, ..., 8 who want to communicate, and process 9 might be the same
67- process as process 1, or it might be a different process; process 10
68- might be the same process as process 5, or it might be a different process.
69- In this case, how can we reliably form a communication channel within
70- process 9 and 10, without affecting the communication channel within
71- process 1, 2, ..., 8?
72-
73- One possible solution is to figure out if process 9 and 10 are the same
74- as process 1 and 5 beforehand, and then form a communication channel
75- based on the information, adjusting the ranks and world_size etc. However,
76- figuring out the information is not always easy, and it will interfere
77- with the main communication channel.
78-
79- Our solution is to always form a communication channel with process 1, 2,
80- ..., 8, and then use this function to form another communication channel
81- with process 9 and 10. This way, regardless of whether process 9 and 10
82- are the same as process 1 and 5, the main communication channel is
83- always formed with process 1, 2, ..., 8, and the additional communication
84- channel is formed with process 9 and 10.
85- """
86- init_method = f"tcp://{ host } :{ port } "
87- backend = Backend (backend ) # it is basically string
88- timeout = _get_default_timeout (backend )
89-
90- store , rank , world_size = next (
91- rendezvous (init_method , rank , world_size , timeout = timeout ))
92- store .set_timeout (timeout )
93-
94- group_rank = rank
95- group_size = world_size
96-
97- # Use a PrefixStore to avoid accidental overrides of keys used by
98- # different systems (e.g. RPC) in case the store is multi-tenant.
99- prefix_store = PrefixStore (init_method , store )
100-
101- # TODO(Yizhou): The reason we need to set options while vllm does not
102- # seems to be related to the version of PyTorch. In the latest version,
103- # there is no need to set options. While in the older version, 2.5.1
104- # specifically, we need to set options.
105- options = ProcessGroup .Options (backend = backend )
106- pg : ProcessGroup = ProcessGroup (
107- prefix_store ,
108- group_rank ,
109- group_size ,
110- options ,
111- )
112- if backend == "gloo" :
113- from torch .distributed .distributed_c10d import ProcessGroupGloo
114- backend_class = ProcessGroupGloo (prefix_store ,
115- group_rank ,
116- group_size ,
117- timeout = timeout )
118- backend_type = ProcessGroup .BackendType .GLOO
119- device = torch .device ("cpu" )
120- elif backend == "nccl" :
121- assert is_nccl_available ()
122- from torch .distributed .distributed_c10d import ProcessGroupNCCL
123-
124- backend_options = ProcessGroupNCCL .Options ()
125- backend_options ._timeout = timeout
126-
127- backend_class = ProcessGroupNCCL (prefix_store , group_rank , group_size ,
128- backend_options )
129- backend_type = ProcessGroup .BackendType .NCCL
130- device = torch .device ("cuda" )
131- elif backend == "hccl" :
132- from torch .distributed import is_hccl_available
133- assert is_hccl_available ()
134- from torch_npu ._C ._distributed_c10d import ProcessGroupHCCL
135- backend_options = ProcessGroupHCCL .Options ()
136- backend_options ._timeout = timeout
137- backend_class = ProcessGroupHCCL (prefix_store , group_rank , group_size ,
138- backend_options )
139- device = torch .device ("npu" )
140- backend_class ._set_sequence_number_for_group ()
141- backend_type = ProcessGroup .BackendType .CUSTOM
142- pg ._register_backend (device , backend_type , backend_class )
143- return pg
144- else :
145- raise RuntimeError (f"Unsupported torch distributed backend: { backend } " )
146-
147- # TODO(Yizhou): Like we mentioned above, _set_default_backend is not
148- # implemented in the 2.5.1 version of PyTorch. But we need to set it
149- # after the latest version is released.
150- # pg._set_default_backend(backend_type)
151- backend_class ._set_sequence_number_for_group ()
152-
153- pg ._register_backend (device , backend_type , backend_class )
154-
155- return pg
156-
157-
15849def parallel_config_get_dp_port (self ) -> int :
15950 """
16051 We might need to initialize process groups in multiple
0 commit comments