Skip to content

Commit

Permalink
feat: support multi-route with shared workers (mosecorg#423)
Browse files Browse the repository at this point in the history
* fix dockerfile path

Signed-off-by: Keming <kemingy94@gmail.com>

* refactor py args to rs

Signed-off-by: Keming <kemingy94@gmail.com>

* finish the basic multiroute rust part

Signed-off-by: Keming <kemingy94@gmail.com>

* fix the openapi

Signed-off-by: Keming <kemingy94@gmail.com>

* add protocol state

Signed-off-by: Keming <kemingy94@gmail.com>

* add runtime register

Signed-off-by: Keming <kemingy94@gmail.com>

* Apply suggestions from code review

Co-authored-by: zclzc <38581401+lkevinzc@users.noreply.github.com>
Signed-off-by: Keming <kemingy94@gmail.com>

* fix a deadlock

Signed-off-by: Keming <kemingy94@gmail.com>

* add test

Signed-off-by: Keming <kemingy94@gmail.com>

* add doc

Signed-off-by: Keming <kemingy94@gmail.com>

* bump version

Signed-off-by: Keming <kemingy94@gmail.com>

* combine ingress & egress to state enum

Signed-off-by: Keming <kemingy94@gmail.com>

---------

Signed-off-by: Keming <kemingy94@gmail.com>
Co-authored-by: zclzc <38581401+lkevinzc@users.noreply.github.com>
  • Loading branch information
kemingy and lkevinzc authored Aug 3, 2023
1 parent 01235cc commit 30261fe
Show file tree
Hide file tree
Showing 40 changed files with 1,110 additions and 997 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
- name: Test
run: |
make semantic_lint test
- name: Test pyarrow in Linux
- name: Test shm in Linux
if: ${{ startsWith(matrix.os, 'ubuntu') }}
run: |
sudo apt update && sudo apt install redis
Expand Down
59 changes: 18 additions & 41 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 7 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mosec"
version = "0.7.2"
version = "0.8.0"
authors = ["Keming <kemingy94@gmail.com>", "Zichen <lkevinzc@gmail.com>"]
edition = "2021"
license = "Apache-2.0"
Expand All @@ -19,13 +19,12 @@ tracing-subscriber = { version = "0.3", features = ["local-time", "json"] }
tokio = { version = "1", features = ["rt", "rt-multi-thread", "time", "macros", "sync", "signal", "io-util"] }
derive_more = { version = "0.99", features = ["display", "error", "from"] }
# MPMS that only one consumer sees each message & async
async-channel = { version = "1" }
async-channel = "1.9"
once_cell = "1.18"
prometheus-client = "0.21.1"
argh = "0.1"
axum = "0.6.18"
prometheus-client = "0.21"
axum = "0.6"
async-stream = "0.3.5"
utoipa = "3.3.0"
serde_json = "1.0.96"
serde = "1.0.163"
utoipa = "3.4"
serde_json = "1.0"
serde = "1.0"
utoipa-swagger-ui = { version = "3", features = ["axum"] }
9 changes: 2 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@ ARG base=nvidia/cuda:11.6.2-cudnn8-runtime-ubuntu20.04
FROM ${base}

ENV DEBIAN_FRONTEND=noninteractive LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8

ARG MOSEC_PORT=8000
ENV MOSEC_PORT=${MOSEC_PORT}
ENV PATH /opt/conda/bin:$PATH

ARG CONDA_VERSION=py310_23.3.1-0

RUN apt update && \
apt install -y --no-install-recommends \
wget \
git \
build-essential \
ca-certificates && \
rm -rf /var/lib/apt/lists/*

Expand Down Expand Up @@ -45,9 +42,7 @@ RUN set -x && \
find /opt/conda/ -follow -type f -name '*.js.map' -delete && \
/opt/conda/bin/conda clean -afy

RUN /opt/conda/bin/conda create -n mosec python=3.10

ENV PYTHON_PREFIX=/opt/conda/envs/mosec/bin
ENV PYTHON_PREFIX=/opt/conda/bin

RUN update-alternatives --install /usr/bin/python python ${PYTHON_PREFIX}/python 1 && \
update-alternatives --install /usr/bin/python3 python3 ${PYTHON_PREFIX}/python3 1 && \
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ test_all: dev
pytest tests -vv -s
RUST_BACKTRACE=1 cargo test -vv

test_chaos: dev
@python -m tests.bad_req

doc:
@cd docs && make html && cd ../
@python -m http.server -d docs/build/html 7291 -b 127.0.0.1
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ Then let's start the server with debug logs:
python examples/stable_diffusion/server.py --log-level debug --timeout 30000
```

Open `http://127.0.0.1:8000/openapi/swagger/` in your browser to get the OpenAPI doc.

And in another terminal, test it:

```shell
Expand All @@ -173,8 +175,9 @@ That's it! You have just hosted your **_stable-diffusion model_** as a service!

More ready-to-use examples can be found in the [Example](https://mosecorg.github.io/mosec/examples/index.html) section. It includes:

- [Multi-stage workflow demo](https://mosecorg.github.io/mosec/examples/echo.html): a simple echo demo even without any ML model.
- [Pipeline](https://mosecorg.github.io/mosec/examples/echo.html): a simple echo demo even without any ML model.
- [Request validation](https://mosecorg.github.io/mosec/examples/validate.html): validate the request with type annotation.
- [Multiple route](https://mosecorg.github.io/mosec/examples/multi_route.html): serve multiple models in one service
- [Shared memory IPC](https://mosecorg.github.io/mosec/examples/ipc.html): inter-process communication with shared memory.
- [Customized GPU allocation](https://mosecorg.github.io/mosec/examples/env.html): deploy multiple replicas, each using different GPUs.
- [Customized metrics](https://mosecorg.github.io/mosec/examples/metric.html): record your own metrics for monitoring.
Expand Down
1 change: 1 addition & 0 deletions docs/source/examples/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ metric
pytorch
stable_diffusion
validate
multi_route
```

We provide examples across different ML frameworks and for various tasks in this section.
Expand Down
33 changes: 33 additions & 0 deletions docs/source/examples/multi_route.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Multi-Route

This example shows how to use the multi-route feature.

You will need this feature if you want to:

- Serve multiple models in one service on different endpoints.
- i.e. register `/embedding` & `/classify` with different models
- Serve one model to multiple different endpoints in one service.
- i.e. register LLaMA with `/inference` and `/v1/chat/completions` to make it compatible with the OpenAI API
- Share a worker in different routes
- The shared worker will collect the dynamic batch from multiple previous stages.
- If you want to have multiple runtimes with sharing, you can declare multiple runtime instances with the same worker class.

The worker definition part is the same as for a single route. The only difference is how you register the worker with the server.

Here we expose a new [concept](../reference/concept.md) called [`Runtime`](mosec.runtime.Runtime).

You can create the `Runtime` and register on the server with a `{endpoint: [Runtime]}` dictionary.

See the complete demo code below.

## Server

```{include} ../../../examples/multi_route/server.py
:code: python
```

## Client

```{include} ../../../examples/multi_route/client.py
:code: python
```
4 changes: 3 additions & 1 deletion docs/source/reference/concept.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ There are a few terms used in `mosec`.

- `worker`: a Python process that executes the `forward` method (inherit from [`mosec.Worker`](mosec.worker.Worker))
- `stage`: one processing unit in the pipeline, each stage contains several `worker` replicas
- also known as [`Runtime`](mosec.runtime.Runtime) in the code
- each stage retrieves the data from the previous stage and passes the result to the next stage
- retrieved data will be deserialized by the [`Worker.deserialize_ipc`](mosec.worker.Worker.deserialize_ipc) method
- data to be passed will be serialized by the [`Worker.serialize_ipc`](mosec.worker.Worker.serialize_ipc) method
- `ingress/egress`: the first/last stage in the pipeline
- ingress gets data from the client, while egress sends data to the client
- data will be deserialized by the ingress [`Worker.serialize`](mosec.worker.Worker.serialize) method and serialized by the egress [`Worker.deserialize`](mosec.worker.Worker.deserialize) method
- `pipeline`: a chain of processing stages
- `pipeline`: a chain of processing stages, will be registered to an endpoint (default: `/inference`)
- a server can have multiple pipelines, check the [multi-route](../examples/multi_route.md) example
- `dynamic batching`: batch requests until either the max batch size or the max wait time is reached
- `controller`: a Rust tokio thread that works on:
- read from the previous queue to get new tasks
Expand Down
20 changes: 7 additions & 13 deletions docs/source/reference/interface.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,25 @@
:members:
```

## Errors
## Runtime

```{eval-rst}
.. automodule:: mosec.errors
:members:
:show-inheritance:
.. automodule:: mosec.runtime
:members: Runtime
```

## Mixins
## Errors

```{eval-rst}
.. automodule:: mosec.mixin
.. automodule:: mosec.errors
:members:
:show-inheritance:
```

## Plugins

```{eval-rst}
.. automodule:: mosec.ipc
:members:
```
## Mixins

```{eval-rst}
.. automodule:: mosec.plugins
.. automodule:: mosec.mixin
:members:
:show-inheritance:
```
40 changes: 40 additions & 0 deletions examples/multi_route/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2023 MOSEC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
from http import HTTPStatus

import httpx
import msgpack # type: ignore

typed_req = {
"bin": b"hello mosec with type check",
"name": "type check",
}

print(">> requesting for the typed route with msgpack serde")
resp = httpx.post(
"http://127.0.0.1:8000/v1/inference", content=msgpack.packb(typed_req)
)
if resp.status_code == HTTPStatus.OK:
print(f"OK: {msgpack.unpackb(resp.content)}")
else:
print(f"err[{resp.status_code}] {resp.text}")

print(">> requesting for the untyped route with json serde")
resp = httpx.post("http://127.0.0.1:8000/inference", content=b"hello mosec")
if resp.status_code == HTTPStatus.OK:
print(f"OK: {json.loads(resp.content)}")
else:
print(f"err[{resp.status_code}] {resp.text}")
Loading

0 comments on commit 30261fe

Please sign in to comment.