11// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22// SPDX-License-Identifier: Apache-2.0
33
4- use std:: time:: Duration ;
5- use std:: { future:: Future , pin:: Pin } ;
6-
74use anyhow:: Context as _;
85use dynamo_llm:: entrypoint:: input:: Input ;
96use dynamo_llm:: entrypoint:: EngineConfig ;
@@ -17,9 +14,6 @@ pub use flags::Flags;
1714mod opt;
1815pub use dynamo_llm:: request_template:: RequestTemplate ;
1916pub use opt:: Output ;
20- mod subprocess;
21-
22- const CHILD_STOP_TIMEOUT : Duration = Duration :: from_secs ( 2 ) ;
2317
2418pub async fn run (
2519 runtime : Runtime ,
@@ -48,6 +42,7 @@ pub async fn run(
4842 . request_template ( flags. request_template . clone ( ) )
4943 . migration_limit ( flags. migration_limit ) ;
5044
45+ // TODO: old, address this later:
5146 // If `in=dyn` we want the trtllm/sglang/vllm subprocess to listen on that endpoint.
5247 // If not, then the endpoint isn't exposed so we let LocalModel invent one.
5348 let mut rt = Either :: Left ( runtime. clone ( ) ) ;
@@ -71,7 +66,7 @@ pub async fn run(
7166 flags. validate ( & local_model, & out_opt) ?;
7267
7368 // Make an engine from the local_model, flags and output.
74- let ( engine_config, extra ) = engine_for (
69+ let engine_config = engine_for (
7570 runtime. primary_token ( ) ,
7671 out_opt,
7772 flags. clone ( ) ,
@@ -85,17 +80,9 @@ pub async fn run(
8580 //
8681 dynamo_llm:: entrypoint:: input:: run_input ( rt, in_opt, engine_config) . await ?;
8782
88- // Allow engines to ask main thread to wait on an extra future.
89- // We use this to stop the vllm and sglang sub-process
90- if let Some ( extra) = extra {
91- extra. await ;
92- }
93-
9483 Ok ( ( ) )
9584}
9685
97- type ExtraFuture = Pin < Box < dyn Future < Output = ( ) > + Send > > ;
98-
9986/// Create the engine matching `out_opt`
10087/// Note validation happens in Flags::validate. In here assume everything is going to work.
10188async fn engine_for (
@@ -104,71 +91,27 @@ async fn engine_for(
10491 flags : Flags ,
10592 local_model : LocalModel ,
10693 rt : Either < Runtime , DistributedRuntime > ,
107- ) -> anyhow:: Result < ( EngineConfig , Option < ExtraFuture > ) > {
94+ ) -> anyhow:: Result < EngineConfig > {
10895 match out_opt {
109- Output :: Dynamic => Ok ( ( EngineConfig :: Dynamic ( Box :: new ( local_model) ) , None ) ) ,
110- Output :: EchoFull => Ok ( (
111- EngineConfig :: StaticFull {
112- model : Box :: new ( local_model) ,
113- engine : dynamo_llm:: engines:: make_engine_full ( ) ,
114- } ,
115- None ,
116- ) ) ,
117- Output :: EchoCore => Ok ( (
118- EngineConfig :: StaticCore {
119- engine : dynamo_llm:: engines:: make_engine_core ( ) ,
120- model : Box :: new ( local_model) ,
121- } ,
122- None ,
123- ) ) ,
96+ Output :: Dynamic => Ok ( EngineConfig :: Dynamic ( Box :: new ( local_model) ) ) ,
97+ Output :: EchoFull => Ok ( EngineConfig :: StaticFull {
98+ model : Box :: new ( local_model) ,
99+ engine : dynamo_llm:: engines:: make_engine_full ( ) ,
100+ } ) ,
101+ Output :: EchoCore => Ok ( EngineConfig :: StaticCore {
102+ engine : dynamo_llm:: engines:: make_engine_core ( ) ,
103+ model : Box :: new ( local_model) ,
104+ } ) ,
124105 #[ cfg( feature = "mistralrs" ) ]
125- Output :: MistralRs => Ok ( (
126- EngineConfig :: StaticFull {
127- engine : dynamo_engine_mistralrs:: make_engine ( & local_model) . await ?,
128- model : Box :: new ( local_model) ,
129- } ,
130- None ,
131- ) ) ,
106+ Output :: MistralRs => Ok ( EngineConfig :: StaticFull {
107+ engine : dynamo_engine_mistralrs:: make_engine ( & local_model) . await ?,
108+ model : Box :: new ( local_model) ,
109+ } ) ,
132110 #[ cfg( feature = "llamacpp" ) ]
133- Output :: LlamaCpp => Ok ( (
134- EngineConfig :: StaticCore {
135- engine : dynamo_engine_llamacpp:: make_engine ( cancel_token, & local_model) . await ?,
136- model : Box :: new ( local_model) ,
137- } ,
138- None ,
139- ) ) ,
140- // For multi-node config. vllm uses `ray`, see guide
141- Output :: Vllm => shell ( subprocess:: vllm:: PY , cancel_token, local_model, flags, None ) . await ,
142- // For multi-node config. trtlllm uses `mpi`, see guide
143- Output :: Trtllm => {
144- shell (
145- subprocess:: trtllm:: PY ,
146- cancel_token,
147- local_model,
148- flags,
149- None ,
150- )
151- . await
152- }
153- Output :: SgLang => {
154- let multi_node_config = if flags. num_nodes > 1 {
155- Some ( dynamo_llm:: engines:: MultiNodeConfig {
156- num_nodes : flags. num_nodes ,
157- node_rank : flags. node_rank ,
158- leader_addr : flags. leader_addr . clone ( ) . unwrap_or_default ( ) ,
159- } )
160- } else {
161- None
162- } ;
163- shell (
164- subprocess:: sglang:: PY ,
165- cancel_token,
166- local_model,
167- flags,
168- multi_node_config,
169- )
170- . await
171- }
111+ Output :: LlamaCpp => Ok ( EngineConfig :: StaticCore {
112+ engine : dynamo_engine_llamacpp:: make_engine ( cancel_token, & local_model) . await ?,
113+ model : Box :: new ( local_model) ,
114+ } ) ,
172115 Output :: Mocker => {
173116 let Either :: Right ( drt) = rt else {
174117 panic ! ( "Mocker requires a distributed runtime to run." ) ;
@@ -180,76 +123,12 @@ async fn engine_for(
180123 let engine =
181124 dynamo_llm:: mocker:: engine:: make_mocker_engine ( drt, endpoint, args) . await ?;
182125
183- Ok ( (
184- EngineConfig :: StaticCore {
185- engine,
186- model : Box :: new ( local_model) ,
187- } ,
188- None ,
189- ) )
190- }
191- }
192- }
193-
194- async fn shell (
195- py_script : & ' static str ,
196- cancel_token : CancellationToken ,
197- local_model : LocalModel ,
198- flags : Flags ,
199- multi_node_config : Option < dynamo_llm:: engines:: MultiNodeConfig > ,
200- ) -> anyhow:: Result < ( EngineConfig , Option < ExtraFuture > ) > {
201- let ( py_script, child) =
202- match subprocess:: start ( py_script, & local_model, flags. clone ( ) , multi_node_config) . await {
203- Ok ( x) => x,
204- Err ( err) => {
205- anyhow:: bail!( "Failed starting engine sub-process: {err}" ) ;
206- }
207- } ;
208-
209- // Sub-process cleanup
210- let extra: ExtraFuture = Box :: pin ( async move {
211- stopper ( cancel_token, child, py_script) . await ;
212- } ) ;
213- Ok ( ( EngineConfig :: Dynamic ( Box :: new ( local_model) ) , Some ( extra) ) )
214- }
215-
216- /// Wait for cancel_token to be cancelled, then stop the child as gracefully as possible.
217- /// Keeps the TempPath alive until the child is stopped.
218- async fn stopper (
219- cancel_token : CancellationToken ,
220- mut child : tokio:: process:: Child ,
221- py_script : tempfile:: TempPath ,
222- ) {
223- cancel_token. cancelled ( ) . await ;
224-
225- // Ask subprocess to stop gracefully
226- if let Some ( pid) = child. id ( ) {
227- unsafe { libc:: kill ( pid as i32 , libc:: SIGTERM ) } ;
228- }
229-
230- tokio:: select! {
231- exit = child. wait( ) => {
232- tracing:: trace!( "engine sub-process graceful exit" ) ;
233- match exit {
234- Ok ( exit_status) if exit_status. success( ) => { }
235- Ok ( exit_status) => {
236- // This is nearly always 15 (SIGTERM)
237- tracing:: trace!( "engine sub-process non-0 exit: {exit_status}" ) ;
238- }
239- Err ( err) => {
240- tracing:: warn!( "engine sub-process error getting exit status: {err}" ) ;
241- }
242- }
243- }
244- _ = tokio:: time:: sleep( CHILD_STOP_TIMEOUT ) => {
245- // It didn't stop in time, kill it
246- child. kill( ) . await . expect( "Failed killing engine subprocess" ) ;
247- let _ = child. wait( ) . await ;
126+ Ok ( EngineConfig :: StaticCore {
127+ engine,
128+ model : Box :: new ( local_model) ,
129+ } )
248130 }
249131 }
250- // This temporary file contains the python script running the engine. It deletes on drop.
251- // Keep it alive until the engine has stopped.
252- drop ( py_script) ;
253132}
254133
255134/// If the user will benefit from CUDA/Metal/Vulkan, remind them to build with it.
0 commit comments