Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1e110af
env var controlled
ishandhanani Jun 14, 2025
4010276
Merge branch 'main' of github.com:ai-dynamo/dynamo
ishandhanani Jun 17, 2025
bd7e81d
Merge branch 'main' of github.com:ai-dynamo/dynamo
ishandhanani Jun 18, 2025
b7801c8
Merge branch 'main' of github.com:ai-dynamo/dynamo
ishandhanani Jun 24, 2025
524204e
init
ishandhanani Jun 24, 2025
80cca39
bump
ishandhanani Jun 24, 2025
6b44876
Merge branch 'main' of github.com:ai-dynamo/dynamo
ishandhanani Jun 24, 2025
38aec4a
Merge branch 'main' into ishan/cmpl-token-id
ishandhanani Jun 24, 2025
a25b54d
ok
ishandhanani Jun 24, 2025
b2fd84f
sgl inc
ishandhanani Jun 24, 2025
63b5b73
bump
ishandhanani Jun 24, 2025
71ac724
logs
ishandhanani Jun 24, 2025
074a89b
pc
ishandhanani Jun 24, 2025
ac74f12
bump
ishandhanani Jun 25, 2025
422a97f
lcp
ishandhanani Jun 25, 2025
ff52e20
batch for text as well
ishandhanani Jun 25, 2025
72655b7
to string instead of clone
ishandhanani Jun 25, 2025
5ca61ef
remove debug
ishandhanani Jun 25, 2025
95b7e6c
update example - tested it works
ishandhanani Jun 25, 2025
46f4a42
Merge branch 'main' into ishan/cmpl-token-id
ishandhanani Jun 25, 2025
e4b950e
bump
ishandhanani Jun 25, 2025
21a493b
clippy
ishandhanani Jun 25, 2025
f2d5251
bump sgl deps
ishandhanani Jun 25, 2025
64a299d
bump
ishandhanani Jun 25, 2025
aa39885
full batch support
ishandhanani Jun 25, 2025
71c7dd6
ok
ishandhanani Jun 25, 2025
0e52cf5
pc
ishandhanani Jun 25, 2025
224f465
fix clippy
ishandhanani Jun 25, 2025
d5fcb43
Merge branch 'ishan/cmpl-token-id' of github.com:ai-dynamo/dynamo int…
ishandhanani Jun 25, 2025
7fb52e6
bring back tokkio task for encoding
ishandhanani Jun 25, 2025
753b21f
tokkio
ishandhanani Jun 25, 2025
a883e2a
mypy
ishandhanani Jun 25, 2025
cb16e9a
p
ishandhanani Jun 25, 2025
26b0a58
bump
ishandhanani Jun 25, 2025
2f84f55
bump
ishandhanani Jun 25, 2025
b24863a
go
ishandhanani Jun 25, 2025
6af8d7d
Merge branch 'main' into ishan/cmpl-token-id
rmccorm4 Jun 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions container/Dockerfile.sglang-deepep
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,62 @@ ARG ARCH_ALT=x86_64

WORKDIR /sgl-workspace

# Install UCX dependencies
RUN apt-get update -y && \
apt-get install -y --no-install-recommends \
--reinstall libibverbs-dev rdma-core ibverbs-utils libibumad-dev \
libnuma-dev librdmacm-dev ibverbs-providers \
autoconf libtool

# Build UCX from source
ARG NIXL_UCX_REF=v1.19.x
RUN rm -rf /opt/hpcx/ucx && \
rm -rf /usr/local/ucx && \
cd /usr/local/src && \
git clone https://github.com/openucx/ucx.git && \
cd ucx && \
git checkout $NIXL_UCX_REF && \
./autogen.sh && ./configure \
--prefix=/usr/local/ucx \
--enable-shared \
--disable-static \
--disable-doxygen-doc \
--enable-optimizations \
--enable-cma \
--enable-devel-headers \
--with-cuda=/usr/local/cuda \
--with-verbs \
--with-efa \
--with-dm \
--with-gdrcopy=/usr/local \
--enable-mt && \
make -j && \
make -j install-strip && \
ldconfig

ENV LD_LIBRARY_PATH=/usr/lib:/usr/local/ucx/lib:$LD_LIBRARY_PATH

# Pinning to NIXL 0.2.1 right now
# TODO: investigate pip install failure with 0.3.0 release
ARG NIXL_COMMIT="5e4c179ee850d482a83cb2a211e0947e46281060"
RUN git clone https://github.com/ai-dynamo/nixl.git && cd nixl && git checkout ${NIXL_COMMIT} &&pip install --break-system-packages . --config-settings=setup-args="-Ducx_path=/opt/hpcx/ucx"
RUN git clone https://github.com/ai-dynamo/nixl.git && cd nixl && git checkout ${NIXL_COMMIT} && pip install --break-system-packages . --config-settings=setup-args="-Ducx_path=/usr/local/ucx"

WORKDIR /sgl-workspace

RUN pip uninstall --break-system-packages -y sglang
RUN rm -rf sglang
# 0.4.7
RUN pip install --break-system-packages "sglang==0.4.7"
# 0.4.8 has a bug with CUDA graphs and decode worker
# https://github.com/sgl-project/sglang/issues/7511
RUN pip install --break-system-packages "sglang==0.4.7.post1"

# Allow forceful shutdown of inflight requests
ENV SGL_FORCE_SHUTDOWN=1

WORKDIR /sgl-workspace
# https://github.com/ai-dynamo/dynamo/pull/1510
ARG DYNAMO_COMMIT="382e3aedc421b3b3abc338062b332b54b5aa8529"
RUN git clone https://github.com/ai-dynamo/dynamo.git && cd dynamo && git checkout ${DYNAMO_COMMIT}
ARG DYNAMO_BRANCH="ishan/cmpl-token-id"
RUN git clone https://github.com/ai-dynamo/dynamo.git && cd dynamo && git checkout ${DYNAMO_BRANCH}

# install dynamo in editable mode
WORKDIR /sgl-workspace/dynamo
Expand Down
6 changes: 3 additions & 3 deletions examples/sglang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ Dynamo supports SGLang's implementation of wide expert parallelism and large sca

Steps to run:

1. Build the SGLang DeepEP container
1. Build the SGLang DeepEP container.

```bash
git clone https://github.com/sgl-project/sglang.git
git clone -b v0.4.8 https://github.com/sgl-project/sglang.git
cd sglang/docker
docker build -f Dockerfile.deepep -t deepep .
docker build -f Dockerfile -t deepep .
```

You will now have a `deepep:latest` image
Expand Down
4 changes: 3 additions & 1 deletion examples/sglang/components/decode_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def __init__(self):
@endpoint()
async def generate(self, req: DisaggPreprocessedRequest):
g = await self.engine.async_generate(
input_ids=req.request.token_ids,
input_ids=req.request.token_ids
if req.request.batch_token_ids is None
else req.request.batch_token_ids,
sampling_params=req.sampling_params,
stream=True,
bootstrap_host=req.bootstrap_host,
Expand Down
95 changes: 78 additions & 17 deletions examples/sglang/components/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import logging
import random
import socket
from typing import Dict, Union

import sglang as sgl
from components.decode_worker import SGLangDecodeWorker
Expand Down Expand Up @@ -112,63 +113,123 @@ def _build_sampling_params(self, request: PreprocessedRequest) -> dict:
sampling_params["ignore_eos"] = request.stop_conditions.ignore_eos
return sampling_params

def _get_request_batch_size(self, request: PreprocessedRequest):
"""Get batch size from request, returns None for single requests"""
if request.batch_token_ids is not None:
return len(request.batch_token_ids)
return None

def _is_batch_request(self, request: PreprocessedRequest):
"""Check if request is in batch mode"""
return request.batch_token_ids is not None

@endpoint()
async def generate(self, request: PreprocessedRequest):
# Check if we're in batch mode at the start
is_batch = self._is_batch_request(request)
batch_size = self._get_request_batch_size(request)

# TODO: maintain a mapping from SGLang's Ouput struct to LLMEngineOuput
sampling_params = self._build_sampling_params(request)

if self.engine_args.disaggregation_mode != "null":
bootstrap_room = self._generate_bootstrap_room()
if is_batch:
bootstrap_room = [
self._generate_bootstrap_room() for _ in range(batch_size)
]
bootstrap_host = [self.bootstrap_host] * batch_size
bootstrap_port = [self.bootstrap_port] * batch_size
else:
bootstrap_host = self.bootstrap_host
bootstrap_port = self.bootstrap_port
bootstrap_room = self._generate_bootstrap_room()

# decode worker request
disagg_request = DisaggPreprocessedRequest(
request=request,
sampling_params=sampling_params,
bootstrap_host=self.bootstrap_host,
bootstrap_port=self.bootstrap_port,
bootstrap_host=bootstrap_host,
bootstrap_port=bootstrap_port,
bootstrap_room=bootstrap_room,
)

# prefill response is not used
prefill = await self.engine.async_generate(
input_ids=request.token_ids,
input_ids=request.token_ids
if not is_batch
else request.batch_token_ids,
sampling_params=sampling_params,
stream=True,
bootstrap_host=self.bootstrap_host,
bootstrap_port=self.bootstrap_port,
bootstrap_host=bootstrap_host,
bootstrap_port=bootstrap_port,
bootstrap_room=bootstrap_room,
)
prefill_task = asyncio.create_task(self._prefill_generator(prefill))

decode = await self.decode_client.generate(disagg_request.model_dump_json())

async for out in self._process_stream(decode, unpack=True):
async for out in self._process_stream(
decode, unpack=True, is_batch=is_batch
):
yield out

await prefill_task
else:
g = await self.engine.async_generate(
input_ids=request.token_ids,
input_ids=request.token_ids
if not is_batch
else request.batch_token_ids,
sampling_params=sampling_params,
stream=True,
)

async for out in self._process_stream(g, unpack=False):
async for out in self._process_stream(g, unpack=False, is_batch=is_batch):
yield out

async def _process_stream(self, stream_source, unpack: bool):
num_output_tokens_so_far = 0
async def _process_stream(self, stream_source, unpack: bool, is_batch: bool):
# Initialize based on batch mode
num_output_tokens_so_far: Union[Dict[int, int], int]
if is_batch:
num_output_tokens_so_far = {}
else:
num_output_tokens_so_far = 0

async for res in stream_source:
data = res.data() if unpack else res
finish_reason = data["meta_info"]["finish_reason"]
if finish_reason:
# Don't forward the stop token
out = {"token_ids": [], "finish_reason": finish_reason["type"]}

if is_batch:
# Handle batch response
assert isinstance(num_output_tokens_so_far, dict)
index = data.get("index", 0)
if index not in num_output_tokens_so_far:
num_output_tokens_so_far[index] = 0

if finish_reason:
out = {
"token_ids": [],
"finish_reason": finish_reason["type"],
"index": index,
}
else:
next_total_toks = len(data["output_ids"])
new_tokens = data["output_ids"][num_output_tokens_so_far[index] :]
out = {
"token_ids": new_tokens,
"index": index,
}
num_output_tokens_so_far[index] = next_total_toks
else:
next_total_toks = len(data["output_ids"])
out = {"token_ids": data["output_ids"][num_output_tokens_so_far:]}
# Handle single response
assert isinstance(num_output_tokens_so_far, int)
if finish_reason:
out = {"token_ids": [], "finish_reason": finish_reason["type"]}
else:
next_total_toks = len(data["output_ids"])
out = {"token_ids": data["output_ids"][num_output_tokens_so_far:]}
num_output_tokens_so_far = next_total_toks

yield out
num_output_tokens_so_far = next_total_toks

def _generate_bootstrap_room(self):
return random.randint(0, 2**63 - 1)
Expand Down
9 changes: 5 additions & 4 deletions examples/sglang/utils/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional
from typing import List, Optional, Union

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -47,6 +47,7 @@ class SamplingOptions(BaseModel):

class PreprocessedRequest(BaseModel):
token_ids: List[TokenIdType]
batch_token_ids: Optional[List[List[TokenIdType]]] = None
stop_conditions: StopConditions
sampling_options: SamplingOptions
eos_token_ids: List[TokenIdType] = Field(default_factory=list)
Expand All @@ -57,7 +58,7 @@ class PreprocessedRequest(BaseModel):
class DisaggPreprocessedRequest(BaseModel):
request: PreprocessedRequest
sampling_params: dict
bootstrap_host: str
bootstrap_port: int
bootstrap_room: int
bootstrap_host: Union[str, List[str]]
bootstrap_port: Union[int, List[int]]
bootstrap_room: Union[int, List[int]]
data_parallel_rank: Optional[int] = None
71 changes: 60 additions & 11 deletions launch/dynamo-run/src/subprocess/sglang_inc.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,71 @@ async def generate(self, request):
# sglang defaults this to 128
"max_new_tokens": request["stop_conditions"]["max_tokens"],
}
num_output_tokens_so_far = 0
gen = await self.engine_client.async_generate(
input_ids=request["token_ids"], sampling_params=sampling_params, stream=True
)

# Check if this is a batch request
is_batch = "batch_token_ids" in request and request["batch_token_ids"]

if is_batch:
# Track tokens separately for each batch item
num_output_tokens_so_far = {}
logging.debug("received batch token ids")
gen = await self.engine_client.async_generate(
input_ids=request["batch_token_ids"],
sampling_params=sampling_params,
stream=True,
)
else:
num_output_tokens_so_far = 0
logging.debug("received token ids")
gen = await self.engine_client.async_generate(
input_ids=request["token_ids"],
sampling_params=sampling_params,
stream=True,
)

async for res in gen:
# res is a dict

logging.debug(f"res: {res}")
finish_reason = res["meta_info"]["finish_reason"]
if finish_reason:
# Don't forward the stop token
out = {"token_ids": [], "finish_reason": finish_reason["type"]}

if is_batch:
# Handle batch response - get index from SGLang response
index = res.get("index", 0)
if index not in num_output_tokens_so_far:
num_output_tokens_so_far[index] = 0

if finish_reason:
logging.warning(f"finish_reason: {finish_reason}")
# Final response for this batch item
out = {
"token_ids": [],
"finish_reason": finish_reason["type"],
"index": index,
}
else:
# Streaming response for this batch item
next_total_toks = len(res["output_ids"])
new_tokens = res["output_ids"][num_output_tokens_so_far[index] :]
out = {
"token_ids": new_tokens,
"index": index,
}
num_output_tokens_so_far[index] = next_total_toks
else:
next_total_toks = len(res["output_ids"])
out = {"token_ids": res["output_ids"][num_output_tokens_so_far:]}
if finish_reason:
out = {
"token_ids": [],
"finish_reason": finish_reason["type"],
}
else:
next_total_toks = len(res["output_ids"])
new_tokens = res["output_ids"][num_output_tokens_so_far:]
out = {
"token_ids": new_tokens,
}
num_output_tokens_so_far = next_total_toks

yield out
num_output_tokens_so_far = next_total_toks


class EmbeddingRequestHandler(RequestHandler):
Expand Down
1 change: 1 addition & 0 deletions lib/engines/llamacpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ fn run_request(
cum_log_probs: None, // TODO output.cumulative_logprob.map(|v| v as f64),
log_probs: None, // TODO output.logprobs
finish_reason: None,
index: None,
};
work_request
.response_channel
Expand Down
1 change: 1 addition & 0 deletions lib/llm/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ impl
log_probs: data.log_probs,
finish_reason: data.finish_reason,
//mdcsum: mdcsum.clone(),
index: data.index,
})
})
});
Expand Down
1 change: 1 addition & 0 deletions lib/llm/src/engines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ fn delta_core(tok: u32) -> Annotated<LLMEngineOutput> {
cum_log_probs: None,
log_probs: None,
finish_reason: None,
index: None,
};
Annotated::from_data(delta)
}
Expand Down
Loading
Loading