Skip to content

Commit

Permalink
dev
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengjieLi28 committed Mar 27, 2024
1 parent 0e51dcb commit d52df93
Showing 1 changed file with 37 additions and 10 deletions.
47 changes: 37 additions & 10 deletions xinference/core/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ def __init__(
self._model_uid_to_model_spec: Dict[str, ModelDescription] = {}
self._gpu_to_model_uid: Dict[int, str] = {}
self._gpu_to_embedding_model_uids: Dict[int, Set[str]] = defaultdict(set)
self._user_specified_gpu_to_model_uids: Dict[int, Set[str]] = defaultdict(set)
# Dict structure: gpu_index: {(replica_model_uid, model_type)}
self._user_specified_gpu_to_model_uids: Dict[
int, Set[Tuple[str, str]]
] = defaultdict(set)
self._model_uid_to_addr: Dict[str, str] = {}
self._model_uid_to_recover_count: Dict[str, int] = {}
self._model_uid_to_launch_args: Dict[str, Dict] = {}
Expand Down Expand Up @@ -285,7 +288,7 @@ async def allocate_devices_for_embedding(self, model_uid: str) -> int:
not has_vllm_model
and _dev in self._user_specified_gpu_to_model_uids
):
for rep_uid in self._user_specified_gpu_to_model_uids[_dev]:
for rep_uid, _ in self._user_specified_gpu_to_model_uids[_dev]:
has_vllm_model = await self.is_model_vllm_backend(rep_uid)
if has_vllm_model:
break
Expand Down Expand Up @@ -313,9 +316,20 @@ async def allocate_devices_for_embedding(self, model_uid: str) -> int:
return device

def allocate_devices(self, model_uid: str, n_gpu: int) -> List[int]:
allocated_devices = set(
list(self._user_specified_gpu_to_model_uids.keys())
+ list(self._gpu_to_model_uid.keys())
user_specified_allocated_devices: Set[int] = set()
for dev, model_infos in self._user_specified_gpu_to_model_uids.items():
allocated_non_embedding_rerank_models = False
for _, model_type in model_infos:
allocated_non_embedding_rerank_models = model_type not in [
"embedding",
"rerank",
]
if allocated_non_embedding_rerank_models:
break
if not allocated_non_embedding_rerank_models:
user_specified_allocated_devices.add(dev)
allocated_devices = set(self._gpu_to_model_uid.keys()).union(
user_specified_allocated_devices
)
if n_gpu > len(self._total_gpu_devices) - len(allocated_devices):
raise RuntimeError("No available slot found for the model")
Expand All @@ -324,15 +338,15 @@ def allocate_devices(self, model_uid: str, n_gpu: int) -> List[int]:
dev
for dev in self._total_gpu_devices
if dev not in self._gpu_to_model_uid
and dev not in self._user_specified_gpu_to_model_uids
and dev not in user_specified_allocated_devices
][:n_gpu]
for dev in devices:
self._gpu_to_model_uid[int(dev)] = model_uid

return sorted(devices)

async def allocate_devices_with_gpu_idx(
self, model_uid: str, gpu_idx: List[int]
self, model_uid: str, model_type: str, gpu_idx: List[int]
) -> List[int]:
"""
When user specifies the gpu_idx, allocate models on user-specified GPUs whenever possible
Expand Down Expand Up @@ -360,7 +374,7 @@ async def allocate_devices_with_gpu_idx(
# If user has run the vLLM model on the GPU that was forced to be specified,
# it is not possible to force this GPU to be allocated again
if idx in self._user_specified_gpu_to_model_uids:
for rep_uid in self._user_specified_gpu_to_model_uids[idx]:
for rep_uid, _ in self._user_specified_gpu_to_model_uids[idx]:
is_vllm_model = await self.is_model_vllm_backend(rep_uid)
if is_vllm_model:
raise RuntimeError(
Expand All @@ -375,7 +389,7 @@ async def allocate_devices_with_gpu_idx(
)

for idx in gpu_idx:
self._user_specified_gpu_to_model_uids[idx].add(model_uid)
self._user_specified_gpu_to_model_uids[idx].add((model_uid, model_type))
return sorted(gpu_idx)

def release_devices(self, model_uid: str):
Expand All @@ -392,6 +406,17 @@ def release_devices(self, model_uid: str):
if model_uid in self._gpu_to_embedding_model_uids[dev]:
self._gpu_to_embedding_model_uids[dev].remove(model_uid)

# check user-specified slots
for dev in self._user_specified_gpu_to_model_uids:
model_infos = list(
filter(
lambda x: x[0] == model_uid,
self._user_specified_gpu_to_model_uids[dev],
)
)
for model_info in model_infos:
self._user_specified_gpu_to_model_uids[dev].remove(model_info)

async def _create_subpool(
self,
model_uid: str,
Expand All @@ -417,7 +442,9 @@ async def _create_subpool(
logger.debug(f"GPU disabled for model {model_uid}")
else:
assert isinstance(gpu_idx, list)
devices = await self.allocate_devices_with_gpu_idx(model_uid, gpu_idx)
devices = await self.allocate_devices_with_gpu_idx(
model_uid, model_type, gpu_idx # type: ignore
)
env["CUDA_VISIBLE_DEVICES"] = ",".join([str(dev) for dev in devices])

if os.name != "nt" and platform.system() != "Darwin":
Expand Down

0 comments on commit d52df93

Please sign in to comment.