@@ -22,6 +22,7 @@ import (
2222 "context"
2323 "strconv"
2424 "strings"
25+ "sync"
2526 "time"
2627
2728 "github.com/prometheus/client_golang/prometheus"
@@ -135,20 +136,25 @@ func (s *VllmSimulator) reportLoras() {
135136 return
136137 }
137138
138- var loras []string
139- s .runningLoras .Range (func (key interface {} , _ interface {} ) bool {
139+ var runningLoras []string
140+ s .runningLoras .Range (func (key any , _ any ) bool {
140141 if lora , ok := key .(string ); ok {
141- loras = append (loras , lora )
142+ runningLoras = append (runningLoras , lora )
143+ }
144+ return true
145+ })
146+ var waitingLoras []string
147+ s .waitingLoras .Range (func (key any , _ any ) bool {
148+ if lora , ok := key .(string ); ok {
149+ waitingLoras = append (waitingLoras , lora )
142150 }
143151 return true
144152 })
145153
146- allLoras := strings .Join (loras , "," )
147154 s .loraInfo .WithLabelValues (
148155 strconv .Itoa (s .config .MaxLoras ),
149- allLoras ,
150- // TODO - add names of loras in queue
151- "" ).Set (float64 (time .Now ().Unix ()))
156+ strings .Join (runningLoras , "," ),
157+ strings .Join (waitingLoras , "," )).Set (float64 (time .Now ().Unix ()))
152158}
153159
154160// reportRunningRequests sets information about running completion requests
@@ -184,6 +190,7 @@ func (s *VllmSimulator) unregisterPrometheus() {
184190func (s * VllmSimulator ) startMetricsUpdaters (ctx context.Context ) {
185191 go s .waitingRequestsUpdater (ctx )
186192 go s .runningRequestsUpdater (ctx )
193+ go s .lorasUpdater (ctx )
187194}
188195
189196// waitingRequestsUpdater updates the waiting requests metric by listening on the relevant channel
@@ -211,3 +218,48 @@ func (s *VllmSimulator) runningRequestsUpdater(ctx context.Context) {
211218 }
212219 }
213220}
221+
222+ // lorasUpdater updates the running loras metric by listening on the relevant channel
223+ // one function updates both waiting and running loras since they a part of the same prometheus gauge
224+ func (s * VllmSimulator ) lorasUpdater (ctx context.Context ) {
225+ for {
226+ select {
227+ case <- ctx .Done ():
228+ return
229+ case loraUpdate := <- s .lorasChan :
230+ switch loraUpdate .state {
231+ case waitingUsageState :
232+ s .incrementLoraRefCount (loraUpdate .name , & s .waitingLoras )
233+ case runningUsageState :
234+ s .decrementLoraRefCount (loraUpdate .name , & s .waitingLoras )
235+ s .incrementLoraRefCount (loraUpdate .name , & s .runningLoras )
236+ case doneUsageState :
237+ s .decrementLoraRefCount (loraUpdate .name , & s .runningLoras )
238+ }
239+ s .reportLoras ()
240+ }
241+ }
242+ }
243+
244+ func (s * VllmSimulator ) incrementLoraRefCount (lora string , theMap * sync.Map ) {
245+ count := 0
246+ if value , ok := theMap .Load (lora ); ok {
247+ // if lora is already in the map - increment its counter
248+ count = value .(int )
249+ }
250+ theMap .Store (lora , count + 1 )
251+ }
252+
253+ func (s * VllmSimulator ) decrementLoraRefCount (lora string , theMap * sync.Map ) {
254+ if value , ok := theMap .Load (lora ); ok {
255+ count := value .(int )
256+ if count > 1 {
257+ theMap .Store (lora , count - 1 )
258+ } else {
259+ // last lora instance stopped its execution - remove from the map
260+ theMap .Delete (lora )
261+ }
262+ } else {
263+ s .logger .Error (nil , "Zero model reference" , "model" , lora )
264+ }
265+ }
0 commit comments