@@ -649,7 +649,12 @@ def _watch_engines(self):
649649 time .sleep (0.5 )
650650
651651 def _add_engine (
652- self , engine_name : str , engine_ip : str , model_names : List [str ], model_label : str , model_type : str
652+ self ,
653+ engine_name : str ,
654+ engine_ip : str ,
655+ model_names : List [str ],
656+ model_label : str ,
657+ model_type : str ,
653658 ):
654659 logger .info (
655660 f"Discovered new serving engine { engine_name } at "
@@ -711,7 +716,9 @@ def _on_engine_update(
711716 model_type = self ._get_model_type (pod )
712717 logger .info (f"Using model_type={ model_type } for pod { engine_name } " )
713718
714- self ._add_engine (engine_name , engine_ip , model_names , model_label , model_type )
719+ self ._add_engine (
720+ engine_name , engine_ip , model_names , model_label , model_type
721+ )
715722
716723 elif event == "DELETED" :
717724 if engine_name not in self .available_engines :
@@ -728,7 +735,9 @@ def _on_engine_update(
728735 model_type = self ._get_model_type (pod )
729736 logger .info (f"Using model_type={ model_type } for pod { engine_name } " )
730737
731- self ._add_engine (engine_name , engine_ip , model_names , model_label , model_type )
738+ self ._add_engine (
739+ engine_name , engine_ip , model_names , model_label , model_type
740+ )
732741 return
733742
734743 if (
@@ -1042,6 +1051,20 @@ def _get_model_label(self, service) -> Optional[str]:
10421051 return None
10431052 return service .spec .selector .get ("model" )
10441053
1054+ def _get_model_type (self , service ) -> str :
1055+ """
1056+ Get the model type from the service's metadata labels.
1057+
1058+ Args:
1059+ service: The Kubernetes service object
1060+
1061+ Returns:
1062+ The model type if found, 'chat' as default otherwise
1063+ """
1064+ if not service .metadata .labels :
1065+ return "chat" # Default to chat model type
1066+ return service .metadata .labels .get ("model_type" , "chat" )
1067+
10451068 def _watch_engines (self ):
10461069 while self .running :
10471070 try :
@@ -1064,21 +1087,30 @@ def _watch_engines(self):
10641087 if is_service_ready :
10651088 model_names = self ._get_model_names (service_name )
10661089 model_label = self ._get_model_label (service )
1090+ model_type = self ._get_model_type (service )
10671091 else :
10681092 model_names = []
10691093 model_label = None
1094+ model_type = None
10701095 self ._on_engine_update (
10711096 service_name ,
10721097 event_type ,
10731098 is_service_ready ,
10741099 model_names ,
10751100 model_label ,
1101+ model_type ,
10761102 )
10771103 except Exception as e :
10781104 logger .error (f"K8s watcher error: { e } " )
10791105 time .sleep (0.5 )
10801106
1081- def _add_engine (self , engine_name : str , model_names : List [str ], model_label : str ):
1107+ def _add_engine (
1108+ self ,
1109+ engine_name : str ,
1110+ model_names : List [str ],
1111+ model_label : str ,
1112+ model_type : str ,
1113+ ):
10821114 logger .info (
10831115 f"Discovered new serving engine { engine_name } at "
10841116 f"running models: { model_names } "
@@ -1100,6 +1132,7 @@ def _add_engine(self, engine_name: str, model_names: List[str], model_label: str
11001132 added_timestamp = int (time .time ()),
11011133 Id = str (uuid .uuid5 (uuid .NAMESPACE_DNS , engine_name )),
11021134 model_label = model_label ,
1135+ model_type = model_type ,
11031136 sleep = sleep_status ,
11041137 service_name = engine_name ,
11051138 namespace = self .namespace ,
@@ -1121,6 +1154,7 @@ def _on_engine_update(
11211154 is_service_ready : bool ,
11221155 model_names : List [str ],
11231156 model_label : Optional [str ],
1157+ model_type : Optional [str ],
11241158 ) -> None :
11251159 if event == "ADDED" :
11261160 if not engine_name :
@@ -1132,7 +1166,7 @@ def _on_engine_update(
11321166 if not model_names :
11331167 return
11341168
1135- self ._add_engine (engine_name , model_names , model_label )
1169+ self ._add_engine (engine_name , model_names , model_label , model_type )
11361170
11371171 elif event == "DELETED" :
11381172 if engine_name not in self .available_engines :
@@ -1145,7 +1179,7 @@ def _on_engine_update(
11451179 return
11461180
11471181 if is_service_ready and model_names :
1148- self ._add_engine (engine_name , model_names , model_label )
1182+ self ._add_engine (engine_name , model_names , model_label , model_type )
11491183 return
11501184
11511185 if (
0 commit comments