Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/development' into development
Browse files Browse the repository at this point in the history
  • Loading branch information
yaelgen committed Jan 7, 2025
2 parents 7a8a117 + 98c1f95 commit 44190cb
Showing 51 changed files with 1,485 additions and 628 deletions.
2 changes: 1 addition & 1 deletion dockerfiles/jupyter/Dockerfile
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ RUN python -m pip install --upgrade pip~=${MLRUN_PIP_VERSION}
WORKDIR $HOME

COPY --chown=$NB_UID:$NB_GID ./docs/tutorials $HOME/tutorials
COPY --chown=$NB_UID:$NB_GID ./docs/_static/images/MLRun-logo.png $HOME/_static/images
COPY --chown=$NB_UID:$NB_GID ./docs/_static/images/MLRun-logo.png $HOME/_static/images/MLRun-logo.png
COPY --chown=$NB_UID:$NB_GID ./dockerfiles/jupyter/README.ipynb $HOME
COPY --chown=$NB_UID:$NB_GID ./dockerfiles/jupyter/mlrun.env $HOME
COPY --chown=$NB_UID:$NB_GID ./dockerfiles/jupyter/mlce-start.sh /usr/local/bin/mlce-start.sh
54 changes: 53 additions & 1 deletion docs/concepts/notifications.md
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ MLRun supports configuring notifications on jobs and scheduled jobs. This sectio
- [Local vs. remote](#local-vs-remote)
- [Notification parameters and secrets](#notification-parameters-and-secrets)
- [Notification kinds](#notification-kinds)
- [Mail notifications](#mail-notifications)
- [Configuring notifications for runs](#configuring-notifications-for-runs)
- [Configuring notifications for pipelines](#configuring-notifications-for-pipelines)
- [Setting notifications on live runs](#setting-notifications-on-live-runs)
@@ -24,7 +25,7 @@ Notifications can be sent either locally from the SDK, or remotely from the MLRu
Usually, a local run sends locally, and a remote run sends remotely.
However, there are several special cases where the notification is sent locally either way.
These cases are:
- Local or KFP Engine Pipelines: To conserve backwards compatibility, the SDK sends the notifications as it did before adding the run
- Local: To conserve backwards compatibility, the SDK sends the notifications as it did before adding the run
notifications mechanism. This means you need to watch the pipeline in order for its notifications to be sent. (Remote pipelines act differently. See [Configuring Notifications For Pipelines](#configuring-notifications-for-pipelines) for more details.
- Dask: Dask runs are always local (against a remote Dask cluster), so the notifications are sent locally as well.

@@ -45,6 +46,57 @@ It's essential to utilize `secret_params` exclusively for handling sensitive inf

See {py:class}`~mlrun.common.schemas.notification.NotificationKind`.

## Mail notifications
To send mail notifications, you need an existing SMTP server.
```python
mail_notification = mlrun.model.Notification(
kind="mail",
when=["completed", "error", "running"],
name="mail-notification",
message="",
condition="",
severity="verbose",
params={
"start_tls": True,
"use_tls": False,
"validate_certs": False,
"email_addresses": ["user.name@domain.com"],
},
)
```
We use the [aiosmtplib](https://aiosmtplib.readthedocs.io/en/stable/) library for sending mail notifications.
The `params` argument is a dictionary, that supports the following fields:
- server_host (string): The SMTP server host.
- server_port (int): The SMTP server port.
- sender_address (string): The sender email address.
- username (string): The username for the SMTP server.
- password (string): The password for the SMTP server.
- email_addresses (list of strings): The list of email addresses to send the mail to.
- start_tls (boolean): Whether to start the TLS connection.
- use_tls (boolean): Whether to use TLS.
- validate_certs (boolean): Whether to validate the certificates.

You can read more about `start_tls` and `use_tls` on the [aiosmtplib docs](https://aiosmtplib.readthedocs.io/en/stable/encryption.html).
Missing params are enriched with default values which can be configured in the `mlrun-smtp-config` kubernetes (see below).

### MLRun on Iguazio
If MLRun is deployed on the Iguazio platform, an SMTP server already exists.
To use it, run the following (with privileged user - `IT Admin`):
```python
import mlrun

mlrun.get_run_db().refresh_smtp_configuration()
```
The `refresh_smtp_configuration` method will get the smtp configuration from the Iguazio platform and set it
as the default smtp configuration (create a `mlrun-smtp-config` with the smtp configuration).
If you edit the configuration on the Iguazio platform, you should run the `refresh_smtp_configuration` method again.

### MLRun CE
In the community edition, you can use your own SMTP server.
To configure it, manually create the `mlrun-smtp-config` kubernetes secret with the default
params for the SMTP server (`server_host`, `server_port`, `username`, `password`, etc..).
After creating or editing the secret, refresh the mlrun SMTP configuration by running the `refresh_smtp_configuration` method.

## Configuring notifications for runs

In any `run` method you can configure the notifications via their model. For example:
40 changes: 37 additions & 3 deletions mlrun/__init__.py
Original file line number Diff line number Diff line change
@@ -213,7 +213,41 @@ def set_env_from_file(env_file: str, return_dict: bool = False) -> Optional[dict
env_vars = dotenv.dotenv_values(env_file)
if None in env_vars.values():
raise MLRunInvalidArgumentError("env file lines must be in the form key=value")
for key, value in env_vars.items():
environ[key] = value # Load to local environ

ordered_env_vars = order_env_vars(env_vars)
for key, value in ordered_env_vars.items():
environ[key] = value

mlconf.reload() # reload mlrun configuration
return env_vars if return_dict else None
return ordered_env_vars if return_dict else None


def order_env_vars(env_vars: dict[str, str]) -> dict[str, str]:
"""
Order and process environment variables by first handling specific ordered keys,
then processing the remaining keys in the given dictionary.
The function ensures that environment variables defined in the `ordered_keys` list
are added to the result dictionary first. Any other environment variables from
`env_vars` are then added in the order they appear in the input dictionary.
:param env_vars: A dictionary where each key is the name of an environment variable (str),
and each value is the corresponding environment variable value (str).
:return: A dictionary with the processed environment variables, ordered with the specific
keys first, followed by the rest in their original order.
"""
ordered_keys = mlconf.get_ordered_keys()

ordered_env_vars: dict[str, str] = {}

# First, add the ordered keys to the dictionary
for key in ordered_keys:
if key in env_vars:
ordered_env_vars[key] = env_vars[key]

# Then, add the remaining keys (those not in ordered_keys)
for key, value in env_vars.items():
if key not in ordered_keys:
ordered_env_vars[key] = value

return ordered_env_vars
32 changes: 25 additions & 7 deletions mlrun/common/schemas/model_monitoring/constants.py
Original file line number Diff line number Diff line change
@@ -183,6 +183,25 @@ class WriterEventKind(MonitoringStrEnum):
STATS = "stats"


class ControllerEvent(MonitoringStrEnum):
KIND = "kind"
ENDPOINT_ID = "endpoint_id"
ENDPOINT_NAME = "endpoint_name"
PROJECT = "project"
TIMESTAMP = "timestamp"
FIRST_REQUEST = "first_request"
FEATURE_SET_URI = "feature_set_uri"
ENDPOINT_TYPE = "endpoint_type"
ENDPOINT_POLICY = "endpoint_policy"
# Note: currently under endpoint policy we will have a dictionary including the keys: "application_names"
# and "base_period"


class ControllerEventKind(MonitoringStrEnum):
NOP_EVENT = "nop_event"
REGULAR_EVENT = "regular_event"


class MetricData(MonitoringStrEnum):
METRIC_NAME = "metric_name"
METRIC_VALUE = "metric_value"
@@ -228,27 +247,26 @@ class ModelEndpointTarget(MonitoringStrEnum):
SQL = "sql"


class StreamKind(MonitoringStrEnum):
V3IO_STREAM = "v3io_stream"
KAFKA = "kafka"


class TSDBTarget(MonitoringStrEnum):
V3IO_TSDB = "v3io-tsdb"
TDEngine = "tdengine"


class DefaultProfileName(StrEnum):
STREAM = "mm-infra-stream"
TSDB = "mm-infra-tsdb"


class ProjectSecretKeys:
ACCESS_KEY = "MODEL_MONITORING_ACCESS_KEY"
STREAM_PATH = "STREAM_PATH"
TSDB_CONNECTION = "TSDB_CONNECTION"
TSDB_PROFILE_NAME = "TSDB_PROFILE_NAME"
STREAM_PROFILE_NAME = "STREAM_PROFILE_NAME"

@classmethod
def mandatory_secrets(cls):
return [
cls.STREAM_PATH,
cls.STREAM_PROFILE_NAME,
cls.TSDB_CONNECTION,
]

42 changes: 38 additions & 4 deletions mlrun/config.py
Original file line number Diff line number Diff line change
@@ -537,6 +537,8 @@
},
"pagination": {
"default_page_size": 200,
"page_limit": 1000000,
"page_size_limit": 1000000,
"pagination_cache": {
"interval": 60,
"ttl": 3600,
@@ -594,6 +596,22 @@
"max_replicas": 1,
},
},
"controller_stream_args": {
"v3io": {
"shard_count": 10,
"retention_period_hours": 24,
"num_workers": 10,
"min_replicas": 1,
"max_replicas": 1,
},
"kafka": {
"partition_count": 10,
"replication_factor": 1,
"num_workers": 10,
"min_replicas": 1,
"max_replicas": 1,
},
},
# Store prefixes are used to handle model monitoring storing policies based on project and kind, such as events,
# stream, and endpoints.
"store_prefixes": {
@@ -608,8 +626,6 @@
"parquet_batching_timeout_secs": timedelta(minutes=1).total_seconds(),
# See mlrun.model_monitoring.db.tsdb.ObjectTSDBFactory for available options
"tsdb_connection": "",
# See mlrun.common.schemas.model_monitoring.constants.StreamKind for available options
"stream_connection": "",
"tdengine": {
"timeout": 10,
"retries": 1,
@@ -799,7 +815,7 @@
# maximum allowed value for count in criteria field inside AlertConfig
"max_criteria_count": 100,
# interval for periodic events generation job
"events_generation_interval": "30",
"events_generation_interval": 30, # seconds
},
"auth_with_client_id": {
"enabled": False,
@@ -1282,19 +1298,30 @@ def get_model_monitoring_file_target_path(
function_name
and function_name
!= mlrun.common.schemas.model_monitoring.constants.MonitoringFunctionNames.STREAM
and function_name
!= mlrun.common.schemas.model_monitoring.constants.MonitoringFunctionNames.APPLICATION_CONTROLLER
):
return mlrun.mlconf.model_endpoint_monitoring.store_prefixes.user_space.format(
project=project,
kind=kind
if function_name is None
else f"{kind}-{function_name.lower()}",
)
elif kind == "stream":
elif (
kind == "stream"
and function_name
!= mlrun.common.schemas.model_monitoring.constants.MonitoringFunctionNames.APPLICATION_CONTROLLER
):
return mlrun.mlconf.model_endpoint_monitoring.store_prefixes.user_space.format(
project=project,
kind=kind,
)
else:
if (
function_name
== mlrun.common.schemas.model_monitoring.constants.MonitoringFunctionNames.APPLICATION_CONTROLLER
):
kind = function_name
return mlrun.mlconf.model_endpoint_monitoring.store_prefixes.default.format(
project=project,
kind=kind,
@@ -1363,6 +1390,13 @@ def is_explicit_ack_enabled(self) -> bool:
>= semver.VersionInfo.parse("1.12.10")
)

@staticmethod
def get_ordered_keys():
# Define the keys to process first
return [
"MLRUN_HTTPDB__HTTP__VERIFY" # Ensure this key is processed first for proper connection setup
]


# Global configuration
config = Config.from_dict(default_config)
74 changes: 58 additions & 16 deletions mlrun/datastore/datastore_profile.py
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@
import json
import typing
import warnings
from urllib.parse import ParseResult, urlparse, urlunparse
from urllib.parse import ParseResult, urlparse

import pydantic.v1
from mergedeep import merge
@@ -211,9 +211,10 @@ def attributes(self):
attributes["partitions"] = self.partitions
sasl = attributes.pop("sasl", {})
if self.sasl_user and self.sasl_pass:
sasl["enabled"] = True
sasl["enable"] = True
sasl["user"] = self.sasl_user
sasl["password"] = self.sasl_pass
sasl["mechanism"] = "PLAIN"
if sasl:
attributes["sasl"] = sasl
return attributes
@@ -312,7 +313,7 @@ def url_with_credentials(self):
query=parsed_url.query,
fragment=parsed_url.fragment,
)
return urlunparse(new_parsed_url)
return new_parsed_url.geturl()

def secrets(self) -> dict:
res = {}
@@ -473,6 +474,59 @@ def url(self, subpath):
return f"webhdfs://{self.host}:{self.http_port}{subpath}"


class TDEngineDatastoreProfile(DatastoreProfile):
"""
A profile that holds the required parameters for a TDEngine database, with the websocket scheme.
https://docs.tdengine.com/developer-guide/connecting-to-tdengine/#websocket-connection
"""

type: str = pydantic.v1.Field("taosws")
_private_attributes = ["password"]
user: str
# The password cannot be empty in real world scenarios. It's here just because of the profiles completion design.
password: typing.Optional[str]
host: str
port: int

def dsn(self) -> str:
"""Get the Data Source Name of the configured TDEngine profile."""
return f"{self.type}://{self.user}:{self.password}@{self.host}:{self.port}"

@classmethod
def from_dsn(cls, dsn: str, profile_name: str) -> "TDEngineDatastoreProfile":
"""
Construct a TDEngine profile from DSN (connection string) and a name for the profile.
:param dsn: The DSN (Data Source Name) of the TDEngine database, e.g.: ``"taosws://root:taosdata@localhost:6041"``.
:param profile_name: The new profile's name.
:return: The TDEngine profile.
"""
parsed_url = urlparse(dsn)
return cls(
name=profile_name,
user=parsed_url.username,
password=parsed_url.password,
host=parsed_url.hostname,
port=parsed_url.port,
)


_DATASTORE_TYPE_TO_PROFILE_CLASS: dict[str, type[DatastoreProfile]] = {
"v3io": DatastoreProfileV3io,
"s3": DatastoreProfileS3,
"redis": DatastoreProfileRedis,
"basic": DatastoreProfileBasic,
"kafka_target": DatastoreProfileKafkaTarget,
"kafka_source": DatastoreProfileKafkaSource,
"dbfs": DatastoreProfileDBFS,
"gcs": DatastoreProfileGCS,
"az": DatastoreProfileAzureBlob,
"hdfs": DatastoreProfileHdfs,
"taosws": TDEngineDatastoreProfile,
"config": ConfigProfile,
}


class DatastoreProfile2Json(pydantic.v1.BaseModel):
@staticmethod
def _to_json(attributes):
@@ -523,19 +577,7 @@ def safe_literal_eval(value):

decoded_dict = {k: safe_literal_eval(v) for k, v in decoded_dict.items()}
datastore_type = decoded_dict.get("type")
ds_profile_factory = {
"v3io": DatastoreProfileV3io,
"s3": DatastoreProfileS3,
"redis": DatastoreProfileRedis,
"basic": DatastoreProfileBasic,
"kafka_target": DatastoreProfileKafkaTarget,
"kafka_source": DatastoreProfileKafkaSource,
"dbfs": DatastoreProfileDBFS,
"gcs": DatastoreProfileGCS,
"az": DatastoreProfileAzureBlob,
"hdfs": DatastoreProfileHdfs,
"config": ConfigProfile,
}
ds_profile_factory = _DATASTORE_TYPE_TO_PROFILE_CLASS
if datastore_type in ds_profile_factory:
return ds_profile_factory[datastore_type].parse_obj(decoded_dict)
else:
Loading

0 comments on commit 44190cb

Please sign in to comment.