Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT: force to specify worker ip and gpu idx when launching models #1195

Merged
merged 5 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions xinference/api/restful_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,8 @@ async def launch_model(
peft_model_path = payload.get("peft_model_path", None)
image_lora_load_kwargs = payload.get("image_lora_load_kwargs", None)
image_lora_fuse_kwargs = payload.get("image_lora_fuse_kwargs", None)
worker_ip = payload.get("worker_ip", None)
gpu_idx = payload.get("gpu_idx", None)

exclude_keys = {
"model_uid",
Expand All @@ -707,6 +709,8 @@ async def launch_model(
"peft_model_path",
"image_lora_load_kwargs",
"image_lora_fuse_kwargs",
"worker_ip",
"gpu_idx",
}

kwargs = {
Expand Down Expand Up @@ -734,6 +738,8 @@ async def launch_model(
peft_model_path=peft_model_path,
image_lora_load_kwargs=image_lora_load_kwargs,
image_lora_fuse_kwargs=image_lora_fuse_kwargs,
worker_ip=worker_ip,
gpu_idx=gpu_idx,
**kwargs,
)

Expand Down
8 changes: 8 additions & 0 deletions xinference/client/restful/restful_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,8 @@ def launch_model(
peft_model_path: Optional[str] = None,
image_lora_load_kwargs: Optional[Dict] = None,
image_lora_fuse_kwargs: Optional[Dict] = None,
worker_ip: Optional[str] = None,
gpu_idx: Optional[Union[int, List[int]]] = None,
**kwargs,
) -> str:
"""
Expand Down Expand Up @@ -828,6 +830,10 @@ def launch_model(
lora load parameters for image model
image_lora_fuse_kwargs: Optional[Dict]
lora fuse parameters for image model
worker_ip: Optional[str]
Specify the worker ip where the model is located in a distributed scenario.
gpu_idx: Optional[Union[int, List[int]]]
Specify the GPU index where the model is located.
**kwargs:
Any other parameters been specified.

Expand All @@ -853,6 +859,8 @@ def launch_model(
"peft_model_path": peft_model_path,
"image_lora_load_kwargs": image_lora_load_kwargs,
"image_lora_fuse_kwargs": image_lora_fuse_kwargs,
"worker_ip": worker_ip,
"gpu_idx": gpu_idx,
}

for key, value in kwargs.items():
Expand Down
33 changes: 32 additions & 1 deletion xinference/core/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ def __init__(self):
def uid(cls) -> str:
return "supervisor"

def _get_worker_ref_by_ip(
self, ip: str
) -> Optional[xo.ActorRefType["WorkerActor"]]:
for addr, ref in self._worker_address_to_worker.items():
existing_ip = addr.split(":")[0]
if existing_ip == ip:
return ref
return None

async def __post_create__(self):
self._uptime = time.time()
if not XINFERENCE_DISABLE_HEALTH_CHECK:
Expand Down Expand Up @@ -717,8 +726,25 @@ async def launch_builtin_model(
peft_model_path: Optional[str] = None,
image_lora_load_kwargs: Optional[Dict] = None,
image_lora_fuse_kwargs: Optional[Dict] = None,
worker_ip: Optional[str] = None,
gpu_idx: Optional[Union[int, List[int]]] = None,
**kwargs,
) -> str:
target_ip_worker_ref = (
self._get_worker_ref_by_ip(worker_ip) if worker_ip is not None else None
)
if (
worker_ip is not None
and not self.is_local_deployment()
and target_ip_worker_ref is None
):
raise ValueError(f"Worker ip address {worker_ip} is not in the cluster.")
if worker_ip is not None and self.is_local_deployment():
logger.warning(
f"You specified the worker ip: {worker_ip} in local mode, "
f"xinference will ignore this option."
)

if model_uid is None:
model_uid = self._gen_model_uid(model_name)

Expand All @@ -735,7 +761,11 @@ async def _launch_one_model(_replica_model_uid):
)

nonlocal model_type
worker_ref = await self._choose_worker()
worker_ref = (
target_ip_worker_ref
if target_ip_worker_ref is not None
else await self._choose_worker()
)
# LLM as default for compatibility
model_type = model_type or "LLM"
await worker_ref.launch_builtin_model(
Expand All @@ -750,6 +780,7 @@ async def _launch_one_model(_replica_model_uid):
peft_model_path=peft_model_path,
image_lora_load_kwargs=image_lora_load_kwargs,
image_lora_fuse_kwargs=image_lora_fuse_kwargs,
gpu_idx=gpu_idx,
**kwargs,
)
self._replica_model_uid_to_worker[_replica_model_uid] = worker_ref
Expand Down
187 changes: 185 additions & 2 deletions xinference/core/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Optional
from typing import List, Optional, Union

import pytest
import pytest_asyncio
Expand Down Expand Up @@ -42,7 +42,14 @@ def get_gpu_to_model_uid(self):
def get_gpu_to_embedding_model_uids(self):
return self._gpu_to_embedding_model_uids

def get_user_specified_gpu_to_model_uids(self):
return self._user_specified_gpu_to_model_uids

async def is_model_vllm_backend(self, model_uid):
if model_uid.startswith("normal_"):
return False
if model_uid.startswith("vllm_"):
return True
for _dev in self._gpu_to_model_uid:
if model_uid == self._gpu_to_model_uid[_dev]:
return True
Expand All @@ -57,10 +64,11 @@ async def launch_builtin_model(
quantization: Optional[str],
model_type: str = "LLM",
n_gpu: Optional[int] = None,
gpu_idx: Optional[Union[int, List[int]]] = None,
**kwargs,
):
subpool_address, devices = await self._create_subpool(
model_uid, model_type, n_gpu=n_gpu
model_uid, model_type, n_gpu=n_gpu, gpu_idx=gpu_idx # type: ignore
)
self._model_uid_to_addr[model_uid] = subpool_address

Expand Down Expand Up @@ -252,3 +260,178 @@ async def test_launch_embedding_model(setup_pool):
)
for i in range(1, 6):
await worker.terminate_model(f"model_model_{i}")


@pytest.mark.asyncio
async def test_launch_model_with_gpu_idx(setup_pool):
pool = setup_pool
addr = pool.external_address

worker: xo.ActorRefType["MockWorkerActor"] = await xo.create_actor(
MockWorkerActor,
address=addr,
uid=WorkerActor.uid(),
supervisor_address="test",
main_pool=pool,
cuda_devices=[i for i in range(4)],
)

# test normal model
await worker.launch_builtin_model(
"normal_model_model_1", "mock_model_name", None, None, None, "LLM", n_gpu=1
)
llm_info = await worker.get_gpu_to_model_uid()
assert len(llm_info) == 1
assert 0 in llm_info

await worker.launch_builtin_model(
"model_model_2", "mock_model_name", None, None, None, "LLM", gpu_idx=[0]
)
llm_info = await worker.get_gpu_to_model_uid()
assert len(llm_info) == 1
assert 0 in llm_info

user_specified_info = await worker.get_user_specified_gpu_to_model_uids()
assert len(user_specified_info) == 1
assert 0 in user_specified_info
assert len(user_specified_info[0]) == 1
assert list(user_specified_info[0])[0][0] == "model_model_2"
assert list(user_specified_info[0])[0][1] == "LLM"

# test vllm model
await worker.launch_builtin_model(
"vllm_model_model_3", "mock_model_name", None, None, None, "LLM", n_gpu=1
)
llm_info = await worker.get_gpu_to_model_uid()
assert len(llm_info) == 2
assert 0 in llm_info
assert 1 in llm_info

with pytest.raises(RuntimeError):
await worker.launch_builtin_model(
"model_model_4", "mock_model_name", None, None, None, "LLM", gpu_idx=[1]
)

await worker.launch_builtin_model(
"model_model_4", "mock_model_name", None, None, None, "LLM", gpu_idx=[2]
)
llm_info = await worker.get_gpu_to_model_uid()
assert len(llm_info) == 2
assert 0 in llm_info
assert 1 in llm_info

user_specified_info = await worker.get_user_specified_gpu_to_model_uids()
assert len(user_specified_info) == 2
assert 0 in user_specified_info
assert 2 in user_specified_info
assert len(user_specified_info[2]) == 1
assert list(user_specified_info[2])[0][0] == "model_model_4"
assert list(user_specified_info[2])[0][1] == "LLM"

# then launch a LLM without gpu_idx
await worker.launch_builtin_model(
"normal_model_model_5", "mock_model_name", None, None, None, "LLM", n_gpu=1
)
llm_info = await worker.get_gpu_to_model_uid()
assert len(llm_info) == 3
assert 0 in llm_info
assert 1 in llm_info
assert 3 in llm_info

# launch without gpu_idx again, error
with pytest.raises(RuntimeError):
await worker.launch_builtin_model(
"normal_model_model_6", "mock_model_name", None, None, None, "LLM", n_gpu=1
)

# test terminate and cleanup
await worker.terminate_model("normal_model_model_1")
await worker.terminate_model("model_model_2")
await worker.terminate_model("vllm_model_model_3")
await worker.terminate_model("model_model_4")
await worker.terminate_model("normal_model_model_5")

llm_info = await worker.get_gpu_to_model_uid()
assert len(llm_info) == 0

user_specified_info = await worker.get_user_specified_gpu_to_model_uids()
for idx, model_infos in user_specified_info.items():
assert len(model_infos) == 0

# next, test with embedding models
await worker.launch_builtin_model(
"embedding_1", "mock_model_name", None, None, None, "embedding", n_gpu=1
)
embedding_info = await worker.get_gpu_to_embedding_model_uids()
assert len(embedding_info) == 1
assert 0 in embedding_info

await worker.launch_builtin_model(
"vllm_mock_model_2", "mock_model_name", None, None, None, "LLM", gpu_idx=[0]
)
embedding_info = await worker.get_gpu_to_embedding_model_uids()
assert len(embedding_info) == 1
assert 0 in embedding_info

user_specified_info = await worker.get_user_specified_gpu_to_model_uids()
assert len(user_specified_info[0]) == 1
assert list(user_specified_info[0])[0][0] == "vllm_mock_model_2"
assert list(user_specified_info[0])[0][1] == "LLM"

# already has vllm model on gpu 0, error
with pytest.raises(RuntimeError):
await worker.launch_builtin_model(
"rerank_3", "mock_model_name", None, None, None, "rerank", gpu_idx=[0]
)
# never choose gpu 0 again
with pytest.raises(RuntimeError):
await worker.launch_builtin_model(
"normal_mock_model_3", "mock_model_name", None, None, None, "LLM", n_gpu=4
)

# should be on gpu 1
await worker.launch_builtin_model(
"embedding_3", "mock_model_name", None, None, None, "embedding", n_gpu=1
)
# should be on gpu 1
await worker.launch_builtin_model(
"rerank_4", "mock_model_name", None, None, None, "rerank", gpu_idx=[1]
)
# should be on gpu 2
await worker.launch_builtin_model(
"embedding_5", "mock_model_name", None, None, None, "embedding", n_gpu=1
)
# should be on gpu 3
await worker.launch_builtin_model(
"rerank_6", "mock_model_name", None, None, None, "rerank", n_gpu=1
)
# should be on gpu 2, due to there are the fewest models on it
await worker.launch_builtin_model(
"rerank_7", "mock_model_name", None, None, None, "rerank", n_gpu=1
)
embedding_info = await worker.get_gpu_to_embedding_model_uids()
user_specified_info = await worker.get_user_specified_gpu_to_model_uids()
assert "rerank_7" in embedding_info[2]
assert len(embedding_info[0]) == 1
assert len(user_specified_info[0]) == 1
assert len(embedding_info[1]) == 1
assert len(user_specified_info[1]) == 1
assert len(embedding_info[2]) == 2
assert len(user_specified_info[2]) == 0
assert len(embedding_info[3]) == 1
assert len(user_specified_info[3]) == 0

# cleanup
await worker.terminate_model("embedding_1")
await worker.terminate_model("vllm_mock_model_2")
await worker.terminate_model("embedding_3")
await worker.terminate_model("rerank_4")
await worker.terminate_model("embedding_5")
await worker.terminate_model("rerank_6")
await worker.terminate_model("rerank_7")

embedding_info = await worker.get_gpu_to_embedding_model_uids()
user_specified_info = await worker.get_user_specified_gpu_to_model_uids()
for info in [embedding_info, user_specified_info]:
for dev, details in info.items():
assert len(details) == 0
Loading
Loading