From 7e12c471374b5a3826d22cd3f486dfd153ab2d9a Mon Sep 17 00:00:00 2001 From: clearml <> Date: Thu, 26 Sep 2024 10:42:31 +0300 Subject: [PATCH] Add support for last change time in triggers using tags --- clearml/automation/trigger.py | 72 +++++++++++++++++++------- clearml/backend_api/session/session.py | 16 ++++-- 2 files changed, 67 insertions(+), 21 deletions(-) diff --git a/clearml/automation/trigger.py b/clearml/automation/trigger.py index 09c89bbd..25d99a5a 100644 --- a/clearml/automation/trigger.py +++ b/clearml/automation/trigger.py @@ -15,8 +15,9 @@ @attrs class BaseTrigger(BaseScheduleJob): - _only_fields = {"id", "name", "last_update", } + _only_fields = {"id", "name", "last_update", "last_change"} _update_field = None + _change_field = None project = attrib(default=None, type=str) match_name = attrib(default=None, type=str) @@ -31,14 +32,31 @@ class BaseTrigger(BaseScheduleJob): # allowing us to ignore repeating object updates triggering multiple times _triggered_instances = attrib(type=dict, default=None) # type: Dict[str, datetime] - def build_query(self, ref_time): - return { - 'name': self.match_name or None, - 'project': [self.project] if self.project else None, - 'tags': ((self.tags or []) + (self.required_tags or [])) or None, - self._update_field: ">{}".format(ref_time.isoformat() if ref_time else self.last_update.isoformat()) + def build_query(self, ref_time, client=None): + # type: (datetime, APIClient) -> dict + server_supports_datetime_or_query = ( + client and ( + (client.session.feature_set == "basic" and client.session.check_min_server_version("1.16.3")) + or (client.session.feature_set != "basic" and client.session.check_min_server_version("3.22.6")) + ) + ) + + query = { + "name": self.match_name or None, + "project": [self.project] if self.project else None, + "tags": ((self.tags or []) + (self.required_tags or [])) or None, } + if not server_supports_datetime_or_query: + query[self._update_field] = ">{}".format(ref_time.isoformat() if ref_time else self.last_update.isoformat()) + else: + query["_or_"] = { + "fields": [self._update_field, self._change_field], + "datetime": [">{}".format(ref_time.isoformat() if ref_time else self.last_update.isoformat())] + } + + return query + def verify(self): # type: () -> None super(BaseTrigger, self).verify() @@ -56,19 +74,25 @@ def verify(self): def get_key(self): return getattr(self, '_key', None) + def get_ref_time(self, obj): + return max( + getattr(obj, self._update_field, 0), + getattr(obj, self._change_field, 0) + ) + @attrs class ModelTrigger(BaseTrigger): _task_param = '${model.id}' _key = "models" - _only_fields = {"id", "name", "last_update", "ready", "tags"} _update_field = "last_update" + _change_field = "last_change" on_publish = attrib(type=bool, default=None) on_archive = attrib(type=bool, default=None) - def build_query(self, ref_time): - query = super(ModelTrigger, self).build_query(ref_time) + def build_query(self, ref_time, client=None): + query = super(ModelTrigger, self).build_query(ref_time, client) if self.on_publish: query.update({'ready': True}) if self.on_archive: @@ -76,19 +100,23 @@ def build_query(self, ref_time): query.update({'system_tags': system_tags}) return query + @property + def _only_fields(self): + return {"id", "name", "ready", "tags", self._update_field, self._change_field} + @attrs class DatasetTrigger(BaseTrigger): _task_param = '${dataset.id}' _key = "tasks" - _only_fields = {"id", "name", "last_update", "status", "completed", "tags"} _update_field = "last_update" + _change_field = "last_change" on_publish = attrib(type=bool, default=None) on_archive = attrib(type=bool, default=None) - def build_query(self, ref_time): - query = super(DatasetTrigger, self).build_query(ref_time) + def build_query(self, ref_time, client=None): + query = super(DatasetTrigger, self).build_query(ref_time, client) query.update({ 'system_tags': list(set(query.get('system_tags', []) + ['dataset'])), 'task_types': list(set(query.get('task_types', []) + [str(Task.TaskTypes.data_processing)])), @@ -101,13 +129,17 @@ def build_query(self, ref_time): return query + @property + def _only_fields(self): + return {"id", "name", "status", "completed", "tags", self._update_field, self._change_field} + @attrs class TaskTrigger(BaseTrigger): _task_param = '${task.id}' _key = "tasks" - _only_fields = {"id", "name", "last_update", "status", "completed", "tags"} _update_field = "last_update" + _change_field = "last_change" metrics = attrib(default=None, type=str) variant = attrib(default=None, type=str) @@ -116,8 +148,8 @@ class TaskTrigger(BaseTrigger): exclude_dev = attrib(default=None, type=bool) on_status = attrib(type=list, default=None) - def build_query(self, ref_time): - query = super(TaskTrigger, self).build_query(ref_time) + def build_query(self, ref_time, client=None): + query = super(TaskTrigger, self).build_query(ref_time, client) if self.exclude_dev: system_tags = list(set(query.get('system_tags', []) + ['-development'])) query.update({'system_tags': system_tags}) @@ -144,6 +176,10 @@ def verify(self): if self.value_sign and self.value_sign not in valid_signs: raise ValueError("Invalid value_sign `{}`, valid options are: {}".format(self.value_sign, valid_signs)) + @property + def _only_fields(self): + return {"id", "name", "status", "completed", "tags", self._update_field, self._change_field} + @attrs class ExecutedTrigger(ExecutedJob): @@ -505,9 +541,9 @@ def _step(self): objects = getattr(self._client, trigger.get_key()).get_all( _allow_extra_fields_=True, only_fields=list(trigger._only_fields or []), - **trigger.build_query(ref_time) + **trigger.build_query(ref_time, self._client) ) - trigger.last_update = max([o.last_update for o in objects] or [ref_time]) + trigger.last_update = max([trigger.get_ref_time(o) for o in objects] or [ref_time]) if not objects: continue except Exception as ex: diff --git a/clearml/backend_api/session/session.py b/clearml/backend_api/session/session.py index 7139e2a6..7cea91d1 100644 --- a/clearml/backend_api/session/session.py +++ b/clearml/backend_api/session/session.py @@ -84,6 +84,7 @@ class Session(TokenManager): _client = [(__package__.partition(".")[0], __version__)] api_version = '2.9' # this default version should match the lowest api version we have under service + server_version = '1.0.0' max_api_version = '2.9' feature_set = 'basic' default_demo_host = "https://demoapi.demo.clear.ml" @@ -234,8 +235,10 @@ def _connect(self): api_version = token_dict.get('api_version') if not api_version: api_version = '2.2' if token_dict.get('env', '') == 'prod' else Session.api_version - if token_dict.get('server_version'): - self.add_client('clearml-server', token_dict.get('server_version')) + + Session.server_version = token_dict.get('server_version') + if Session.server_version: + self.add_client('clearml-server', Session.server_version) Session.max_api_version = Session.api_version = str(api_version) Session.feature_set = str(token_dict.get('feature_set', self.feature_set) or "basic") @@ -745,6 +748,13 @@ def get_files_server_host(cls, config=None): return urlunparse(parsed) + @classmethod + def check_min_server_version(cls, min_server_version): + """ + Return True if Session.server_version is greater or equal >= to min_server_version + """ + return cls._version_tuple(cls.server_version) >= cls._version_tuple(str(min_server_version)) + @classmethod def check_min_api_version(cls, min_api_version, raise_error=False): """ @@ -766,7 +776,7 @@ def check_min_api_version(cls, min_api_version, raise_error=False): pass cls.max_api_version = cls.api_version = cls._offline_default_version else: - # if the requested version is lower then the minimum we support, + # if the requested version is lower than the minimum we support, # no need to actually check what the server has, we assume it must have at least our version. if cls._version_tuple(cls.api_version) >= cls._version_tuple(str(min_api_version)): return True