99from typing import Any , Deque , Dict , Optional , Sequence , Tuple
1010
1111import torch
12- from torch .distributed . rendezvous import rendezvous
12+ from torch .distributed import TCPStore
1313
1414import vllm .envs as envs
1515from vllm .logger import init_logger
@@ -97,7 +97,6 @@ class StatelessProcessGroup:
9797 group. Only use it to communicate metadata between processes.
9898 For data-plane communication, create NCCL-related objects.
9999 """
100- prefix : str
101100 rank : int
102101 world_size : int
103102 store : torch ._C ._distributed_c10d .Store
@@ -127,7 +126,7 @@ def __post_init__(self):
127126 def send_obj (self , obj : Any , dst : int ):
128127 """Send an object to a destination rank."""
129128 self .expire_data ()
130- key = f"{ self . prefix } / send_to/{ dst } /{ self .send_dst_counter [dst ]} "
129+ key = f"send_to/{ dst } /{ self .send_dst_counter [dst ]} "
131130 self .store .set (key , pickle .dumps (obj ))
132131 self .send_dst_counter [dst ] += 1
133132 self .entries .append ((key , time .time ()))
@@ -147,8 +146,7 @@ def recv_obj(self, src: int) -> Any:
147146 """Receive an object from a source rank."""
148147 obj = pickle .loads (
149148 self .store .get (
150- f"{ self .prefix } /send_to/{ self .rank } /{ self .recv_src_counter [src ]} "
151- ))
149+ f"send_to/{ self .rank } /{ self .recv_src_counter [src ]} " ))
152150 self .recv_src_counter [src ] += 1
153151 return obj
154152
@@ -159,14 +157,14 @@ def broadcast_obj(self, obj: Optional[Any], src: int) -> Any:
159157 """
160158 if self .rank == src :
161159 self .expire_data ()
162- key = (f"{ self . prefix } / broadcast_from/{ src } /"
160+ key = (f"broadcast_from/{ src } /"
163161 f"{ self .broadcast_send_counter } " )
164162 self .store .set (key , pickle .dumps (obj ))
165163 self .broadcast_send_counter += 1
166164 self .entries .append ((key , time .time ()))
167165 return obj
168166 else :
169- key = (f"{ self . prefix } / broadcast_from/{ src } /"
167+ key = (f"broadcast_from/{ src } /"
170168 f"{ self .broadcast_recv_src_counter [src ]} " )
171169 recv_obj = pickle .loads (self .store .get (key ))
172170 self .broadcast_recv_src_counter [src ] += 1
@@ -194,7 +192,8 @@ def barrier(self):
194192
195193 @staticmethod
196194 def create (
197- init_method : str ,
195+ host : str ,
196+ port : int ,
198197 rank : int ,
199198 world_size : int ,
200199 data_expiration_seconds : int = 3600 ,
@@ -214,15 +213,14 @@ def create(
214213 can call `StatelessProcessGroup.create` to form a group, and then process A, B,
215214 C, and D can call `StatelessProcessGroup.create` to form another group.
216215 """ # noqa
217- from torch . _C . _distributed_c10d import _DEFAULT_PG_TIMEOUT
218- timeout = _DEFAULT_PG_TIMEOUT
219-
220- store , rank , world_size = next (
221- rendezvous ( init_method , rank , world_size , timeout = timeout ))
222- store . set_timeout ( timeout )
216+ store = TCPStore (
217+ host_name = host ,
218+ port = port ,
219+ world_size = world_size ,
220+ is_master = ( rank == 0 ),
221+ )
223222
224223 return StatelessProcessGroup (
225- prefix = init_method ,
226224 rank = rank ,
227225 world_size = world_size ,
228226 store = store ,
0 commit comments