Skip to content

Commit 7da510c

Browse files
authored
chore: many bug fixes and improvements when testing planner (#2776)
Signed-off-by: hongkuanz <hongkuanz@nvidia.com> Signed-off-by: hongkuan <hongkuanz@nvidia.com>
1 parent 945a572 commit 7da510c

File tree

25 files changed

+1219
-122
lines changed

25 files changed

+1219
-122
lines changed

benchmarks/nixl/nixl-benchmark-deployment.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ spec:
1515
app: nixl-benchmark
1616
spec:
1717
imagePullSecrets:
18-
- name: nvcrimagepullsecret
18+
- name: nvcr-imagepullsecret
1919
containers:
2020
- name: nixl-benchmark
2121
image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:nixlbench-e42c07a8

benchmarks/sin_load_generator/sin_synth.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def get_isl_osl(t):
5151
isl, osl = get_isl_osl(t_req)
5252
output_data.append(
5353
{
54-
"timestamp": t_req * 1000, # in ms
54+
"timestamp": int(t_req * 1000), # in ms, integer
5555
"input_length": isl,
5656
"output_length": osl,
5757
"hash_ids": np.random.choice(

components/backends/sglang/deploy/disagg_planner.yaml

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -111,19 +111,24 @@ spec:
111111
image: nvcr.io/nvidian/nim-llm-dev/sglang-runtime:hzhou-0811-1
112112
workingDir: /workspace/components/backends/sglang
113113
command:
114-
- /bin/sh
115-
- -c
114+
- python3
116115
args:
117-
- >-
118-
python3 -m dynamo.sglang
119-
--model-path Qwen/Qwen3-0.6B
120-
--served-model-name Qwen/Qwen3-0.6B
121-
--page-size 16
122-
--tp 1
123-
--trust-remote-code
124-
--skip-tokenizer-init
125-
--disaggregation-mode decode
126-
--disaggregation-transfer-backend nixl
116+
- -m
117+
- dynamo.sglang
118+
- --model-path
119+
- Qwen/Qwen3-0.6B
120+
- --served-model-name
121+
- Qwen/Qwen3-0.6B
122+
- --page-size
123+
- "16"
124+
- --tp
125+
- "1"
126+
- --trust-remote-code
127+
- --skip-tokenizer-init
128+
- --disaggregation-mode
129+
- decode
130+
- --disaggregation-transfer-backend
131+
- nixl
127132
SGLangPrefillWorker:
128133
dynamoNamespace: dynamo
129134
envFromSecret: hf-token-secret
@@ -137,16 +142,21 @@ spec:
137142
image: nvcr.io/nvidian/nim-llm-dev/sglang-runtime:hzhou-0811-1
138143
workingDir: /workspace/components/backends/sglang
139144
command:
140-
- /bin/sh
141-
- -c
145+
- python3
142146
args:
143-
- >-
144-
python3 -m dynamo.sglang
145-
--model-path Qwen/Qwen3-0.6B
146-
--served-model-name Qwen/Qwen3-0.6B
147-
--page-size 16
148-
--tp 1
149-
--trust-remote-code
150-
--skip-tokenizer-init
151-
--disaggregation-mode prefill
152-
--disaggregation-transfer-backend nixl
147+
- -m
148+
- dynamo.sglang
149+
- --model-path
150+
- Qwen/Qwen3-0.6B
151+
- --served-model-name
152+
- Qwen/Qwen3-0.6B
153+
- --page-size
154+
- "16"
155+
- --tp
156+
- "1"
157+
- --trust-remote-code
158+
- --skip-tokenizer-init
159+
- --disaggregation-mode
160+
- prefill
161+
- --disaggregation-transfer-backend
162+
- nixl

components/backends/sglang/src/dynamo/sglang/main.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,9 @@ async def register_model():
117117
# Requests queue until ready_event is set
118118
await asyncio.gather(
119119
generate_endpoint.serve_endpoint(
120-
handler.generate, graceful_shutdown=False, metrics_labels=metrics_labels
120+
handler.generate,
121+
graceful_shutdown == config.migration_limit <= 0,
122+
metrics_labels=metrics_labels,
121123
),
122124
register_model(),
123125
)

components/backends/vllm/deploy/disagg_planner.yaml

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,12 @@ spec:
121121
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.4.1
122122
workingDir: /workspace/components/backends/vllm
123123
command:
124-
- /bin/sh
125-
- -c
124+
- python3
126125
args:
127-
- "python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --migration-limit=3"
126+
- -m
127+
- dynamo.vllm
128+
- --model
129+
- Qwen/Qwen3-0.6B
128130
VllmPrefillWorker:
129131
dynamoNamespace: vllm-disagg-planner
130132
envFromSecret: hf-token-secret
@@ -144,7 +146,10 @@ spec:
144146
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.4.1
145147
workingDir: /workspace/components/backends/vllm
146148
command:
147-
- /bin/sh
148-
- -c
149+
- python3
149150
args:
150-
- python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --is-prefill-worker --migration-limit=3
151+
- -m
152+
- dynamo.vllm
153+
- --model
154+
- Qwen/Qwen3-0.6B
155+
- --is-prefill-worker

components/backends/vllm/src/dynamo/vllm/main.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,12 @@ def signal_handler():
8484

8585
if config.is_prefill_worker:
8686
await init_prefill(runtime, config)
87+
logger.debug("init_prefill completed")
8788
else:
8889
await init(runtime, config)
90+
logger.debug("init completed")
91+
92+
logger.debug("Worker function completed, exiting...")
8993

9094

9195
def setup_vllm_engine(config, stat_logger=None):
@@ -147,6 +151,7 @@ async def init_prefill(runtime: DistributedRuntime, config: Config):
147151
)
148152

149153
try:
154+
logger.debug("Starting serve_endpoint for prefill worker")
150155
await asyncio.gather(
151156
# for prefill, we want to shutdown the engine after all prefill requests are finished because
152157
# (temp reason): we don't support re-routing prefill requests
@@ -161,10 +166,12 @@ async def init_prefill(runtime: DistributedRuntime, config: Config):
161166
handler.clear_kv_blocks, metrics_labels=[("model", config.model)]
162167
),
163168
)
169+
logger.debug("serve_endpoint completed for prefill worker")
164170
except Exception as e:
165171
logger.error(f"Failed to serve endpoints: {e}")
166172
raise
167173
finally:
174+
logger.debug("Cleaning up prefill worker")
168175
handler.cleanup()
169176

170177

@@ -254,22 +261,25 @@ async def init(runtime: DistributedRuntime, config: Config):
254261
)
255262

256263
try:
264+
logger.debug("Starting serve_endpoint for decode worker")
257265
await asyncio.gather(
258266
# for decode, we want to transfer the in-flight requests to other decode engines,
259267
# because waiting them to finish can take a long time for long OSLs
260268
generate_endpoint.serve_endpoint(
261269
handler.generate,
262-
graceful_shutdown=False,
270+
graceful_shutdown=config.migration_limit <= 0,
263271
metrics_labels=[("model", config.model)],
264272
),
265273
clear_endpoint.serve_endpoint(
266274
handler.clear_kv_blocks, metrics_labels=[("model", config.model)]
267275
),
268276
)
277+
logger.debug("serve_endpoint completed for decode worker")
269278
except Exception as e:
270279
logger.error(f"Failed to serve endpoints: {e}")
271280
raise
272281
finally:
282+
logger.debug("Cleaning up decode worker")
273283
# Cleanup background tasks
274284
handler.cleanup()
275285

components/planner/src/dynamo/planner/utils/load_predictor.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,14 @@ def __init__(self, minimum_data_points=5):
4242

4343
def add_data_point(self, value):
4444
"""Add new data point to the buffer"""
45-
if not math.isnan(value):
46-
self.data_buffer.append(value)
45+
if math.isnan(value):
46+
value = 0
47+
48+
if len(self.data_buffer) == 0 and value == 0:
49+
# skip the beginning idle period
50+
return
4751
else:
48-
self.data_buffer.append(0)
52+
self.data_buffer.append(value)
4953

5054
def get_last_value(self):
5155
"""Get the last value from the buffer"""
@@ -126,6 +130,10 @@ def add_data_point(self, value):
126130
# Use proper datetime for Prophet
127131
timestamp = self.start_date + timedelta(seconds=self.curr_step)
128132
value = 0 if math.isnan(value) else value
133+
134+
if len(self.data_buffer) == 0 and value == 0:
135+
# skip the beginning idle period
136+
return
129137
self.data_buffer.append({"ds": timestamp, "y": value})
130138
self.curr_step += 1
131139

components/planner/src/dynamo/planner/utils/planner_core.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -259,18 +259,24 @@ def _compute_replica_requirements(
259259
# compute how many replicas are needed for prefill
260260
# here we assume the prefill bias is purely due to request queueing
261261
# and we increase the number of prefill replicas linearly to account for the queueing delay
262-
pred_prefill_load_per_gpu = (
262+
pred_prefill_throughput = (
263263
next_num_req
264264
* next_isl
265265
/ self.args.adjustment_interval
266266
* min(1, self.p_correction_factor)
267267
)
268268
next_num_p = math.ceil(
269-
pred_prefill_load_per_gpu
269+
pred_prefill_throughput
270270
/ self.prefill_interpolator.interpolate_thpt_per_gpu(next_isl)
271271
/ self.args.prefill_engine_num_gpu
272272
)
273273

274+
logger.info(
275+
f"Prefill calculation: {pred_prefill_throughput:.2f}(p_thpt) / "
276+
f"{self.prefill_interpolator.interpolate_thpt_per_gpu(next_isl) * self.args.prefill_engine_num_gpu:.2f}(p_engine_cap) = "
277+
f"{next_num_p}(num_p)"
278+
)
279+
274280
# compute how many replicas are needed for decode
275281
# 1. apply d_correction_factor to the ITL SLA
276282
# Prevent divide by zero when d_correction_factor is 0 (no metrics yet)
@@ -290,14 +296,19 @@ def _compute_replica_requirements(
290296
itl=corrected_itl, context_length=next_isl + next_osl / 2
291297
)
292298
# 3. compute number of decode replicas needed
299+
pred_decode_throughput = next_num_req * next_osl / self.args.adjustment_interval
293300
next_num_d = math.ceil(
294-
next_num_req
295-
* next_osl
296-
/ self.args.adjustment_interval
301+
pred_decode_throughput
297302
/ pred_decode_thpt_per_gpu
298303
/ self.args.decode_engine_num_gpu
299304
)
300305

306+
logger.info(
307+
f"Decode calculation: {pred_decode_throughput:.2f}(d_thpt) / "
308+
f"{pred_decode_thpt_per_gpu * self.args.decode_engine_num_gpu:.2f}(d_engine_cap) = "
309+
f"{next_num_d}(num_d)"
310+
)
311+
301312
# correct num_p and num_d based on the gpu budget
302313
next_num_p = max(next_num_p, self.args.min_endpoint)
303314
next_num_d = max(next_num_d, self.args.min_endpoint)

lib/llm/src/discovery/watcher.rs

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -176,46 +176,6 @@ impl ModelWatcher {
176176
.await
177177
.with_context(|| model_name.clone())?;
178178
if !active_instances.is_empty() {
179-
let mut update_tx = true;
180-
let mut model_type: ModelType = model_entry.model_type;
181-
if model_entry.model_type == ModelType::Chat
182-
&& self.manager.list_chat_completions_models().is_empty()
183-
{
184-
self.manager.remove_chat_completions_model(&model_name).ok();
185-
model_type = ModelType::Chat;
186-
} else if model_entry.model_type == ModelType::Completion
187-
&& self.manager.list_completions_models().is_empty()
188-
{
189-
self.manager.remove_completions_model(&model_name).ok();
190-
model_type = ModelType::Completion;
191-
} else if model_entry.model_type == ModelType::Embedding
192-
&& self.manager.list_embeddings_models().is_empty()
193-
{
194-
self.manager.remove_embeddings_model(&model_name).ok();
195-
model_type = ModelType::Embedding;
196-
} else if model_entry.model_type == ModelType::Backend {
197-
if self.manager.list_chat_completions_models().is_empty() {
198-
self.manager.remove_chat_completions_model(&model_name).ok();
199-
model_type = ModelType::Chat;
200-
}
201-
if self.manager.list_completions_models().is_empty() {
202-
self.manager.remove_completions_model(&model_name).ok();
203-
if model_type == ModelType::Chat {
204-
model_type = ModelType::Backend;
205-
} else {
206-
model_type = ModelType::Completion;
207-
}
208-
}
209-
} else {
210-
tracing::debug!(
211-
"Model {} is still active in other instances, not removing",
212-
model_name
213-
);
214-
update_tx = false;
215-
}
216-
if update_tx && let Some(tx) = &self.model_update_tx {
217-
tx.send(ModelUpdate::Removed(model_type)).await.ok();
218-
}
219179
return Ok(None);
220180
}
221181

0 commit comments

Comments
 (0)