Skip to content

Commit

Permalink
[Model Monitoring] Controller stream, chief worker implementation (ml…
Browse files Browse the repository at this point in the history
  • Loading branch information
royischoss authored Jan 7, 2025
1 parent be52713 commit 98c1f95
Show file tree
Hide file tree
Showing 10 changed files with 429 additions and 129 deletions.
19 changes: 19 additions & 0 deletions mlrun/common/schemas/model_monitoring/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,25 @@ class WriterEventKind(MonitoringStrEnum):
STATS = "stats"


class ControllerEvent(MonitoringStrEnum):
KIND = "kind"
ENDPOINT_ID = "endpoint_id"
ENDPOINT_NAME = "endpoint_name"
PROJECT = "project"
TIMESTAMP = "timestamp"
FIRST_REQUEST = "first_request"
FEATURE_SET_URI = "feature_set_uri"
ENDPOINT_TYPE = "endpoint_type"
ENDPOINT_POLICY = "endpoint_policy"
# Note: currently under endpoint policy we will have a dictionary including the keys: "application_names"
# and "base_period"


class ControllerEventKind(MonitoringStrEnum):
NOP_EVENT = "nop_event"
REGULAR_EVENT = "regular_event"


class MetricData(MonitoringStrEnum):
METRIC_NAME = "metric_name"
METRIC_VALUE = "metric_value"
Expand Down
29 changes: 28 additions & 1 deletion mlrun/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,22 @@
"max_replicas": 1,
},
},
"controller_stream_args": {
"v3io": {
"shard_count": 10,
"retention_period_hours": 24,
"num_workers": 10,
"min_replicas": 1,
"max_replicas": 1,
},
"kafka": {
"partition_count": 10,
"replication_factor": 1,
"num_workers": 10,
"min_replicas": 1,
"max_replicas": 1,
},
},
# Store prefixes are used to handle model monitoring storing policies based on project and kind, such as events,
# stream, and endpoints.
"store_prefixes": {
Expand Down Expand Up @@ -1282,19 +1298,30 @@ def get_model_monitoring_file_target_path(
function_name
and function_name
!= mlrun.common.schemas.model_monitoring.constants.MonitoringFunctionNames.STREAM
and function_name
!= mlrun.common.schemas.model_monitoring.constants.MonitoringFunctionNames.APPLICATION_CONTROLLER
):
return mlrun.mlconf.model_endpoint_monitoring.store_prefixes.user_space.format(
project=project,
kind=kind
if function_name is None
else f"{kind}-{function_name.lower()}",
)
elif kind == "stream":
elif (
kind == "stream"
and function_name
!= mlrun.common.schemas.model_monitoring.constants.MonitoringFunctionNames.APPLICATION_CONTROLLER
):
return mlrun.mlconf.model_endpoint_monitoring.store_prefixes.user_space.format(
project=project,
kind=kind,
)
else:
if (
function_name
== mlrun.common.schemas.model_monitoring.constants.MonitoringFunctionNames.APPLICATION_CONTROLLER
):
kind = function_name
return mlrun.mlconf.model_endpoint_monitoring.store_prefixes.default.format(
project=project,
kind=kind,
Expand Down
5 changes: 5 additions & 0 deletions mlrun/datastore/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -1128,8 +1128,13 @@ def add_nuclio_trigger(self, function):
extra_attributes["workerAllocationMode"] = extra_attributes.get(
"worker_allocation_mode", "static"
)
else:
extra_attributes["workerAllocationMode"] = extra_attributes.get(
"worker_allocation_mode", "pool"
)

trigger_kwargs = {}

if "max_workers" in extra_attributes:
trigger_kwargs = {"max_workers": extra_attributes.pop("max_workers")}

Expand Down
Loading

0 comments on commit 98c1f95

Please sign in to comment.