From ce24026f2b99c599f573f719a4e20916e3bd5913 Mon Sep 17 00:00:00 2001 From: Petr Kalis Date: Tue, 6 Oct 2020 16:10:55 +0200 Subject: [PATCH 1/3] Fix - renaming or deleting tasks in Ftrack wasn't synced Part for manual syncing through Action --- pype/modules/ftrack/lib/avalon_sync.py | 89 +++++++++++++++++++++++--- 1 file changed, 79 insertions(+), 10 deletions(-) diff --git a/pype/modules/ftrack/lib/avalon_sync.py b/pype/modules/ftrack/lib/avalon_sync.py index 28114c7fdc3..7ff5283d6ab 100644 --- a/pype/modules/ftrack/lib/avalon_sync.py +++ b/pype/modules/ftrack/lib/avalon_sync.py @@ -24,9 +24,9 @@ # Current schemas for avalon types EntitySchemas = { - "project": "avalon-core:project-2.1", - "asset": "avalon-core:asset-3.0", - "config": "avalon-core:config-1.1" + "project": "pype:project-2.1", + "asset": "pype:asset-3.0", + "config": "pype:config-1.1" } # Group name of custom attributes @@ -103,15 +103,40 @@ def get_pype_attr(session, split_hierarchical=True): return custom_attributes -def from_dict_to_set(data): +def from_dict_to_set(data, is_project): """ Converts 'data' into $set part of MongoDB update command. + Sets new or modified keys. + Tasks are updated completely, not per task. (Eg. change in any of the + tasks results in full update of "tasks" from Ftrack. Args: - data: (dictionary) - up-to-date data from Ftrack + data (dictionary): up-to-date data from Ftrack + is_project (boolean): true for project Returns: (dictionary) - { "$set" : "{..}"} """ + not_set = object() + task_changes = not_set + if ( + is_project + and "config" in data + and "tasks" in data["config"] + ): + task_changes = data["config"].pop("tasks") + task_changes_key = "config.tasks" + if not data["config"]: + data.pop("config") + elif ( + not is_project + and "data" in data + and "tasks" in data["data"] + ): + task_changes = data["data"].pop("tasks") + task_changes_key = "data.tasks" + if not data["data"]: + data.pop("data") + result = {"$set": {}} dict_queue = queue.Queue() dict_queue.put((None, data)) @@ -128,6 +153,9 @@ def from_dict_to_set(data): result["$set"][new_key] = value continue dict_queue.put((new_key, value)) + + if task_changes is not not_set and task_changes_key: + result["$set"][task_changes_key] = task_changes return result @@ -659,7 +687,7 @@ def duplicity_regex_check(self): # Tasks must be checked too for task in entity_dict["tasks"].items(): task_name, task = task - passed = task_name + passed = task_names.get(task_name) if passed is None: passed = check_regex( task_name, "task", schema_patterns=_schema_patterns @@ -731,7 +759,7 @@ def filter_by_duplicate_regex(self): for id in ids: if id not in self.entities_dict: continue - self.entities_dict[id]["tasks"].remove(name) + self.entities_dict[id]["tasks"].pop(name) ent_path = self.get_ent_path(id) self.log.warning(failed_regex_msg.format( "/".join([ent_path, name]) @@ -1680,6 +1708,18 @@ def prepare_changes(self): self.updates[avalon_id] ) + # double check changes in tasks, some task could be renamed or + # deleted in Ftrack - not captured otherwise + final_entity = self.entities_dict[ftrack_id]["final_entity"] + if final_entity["data"].get("tasks", {}) != \ + avalon_entity["data"].get("tasks", {}): + if "data" not in self.updates[avalon_id]: + self.updates[avalon_id]["data"] = {} + + self.updates[avalon_id]["data"]["tasks"] = ( + final_entity["data"]["tasks"] + ) + def synchronize(self): self.log.debug("* Synchronization begins") avalon_project_id = self.ftrack_avalon_mapper.get(self.ft_project_id) @@ -2027,15 +2067,20 @@ def _check_changeability(self, parent_id=None): self._changeability_by_mongo_id[mongo_id] = is_changeable def update_entities(self): + """ + Runs changes converted to "$set" queries in bulk. + """ mongo_changes_bulk = [] for mongo_id, changes in self.updates.items(): - filter = {"_id": ObjectId(mongo_id)} - change_data = from_dict_to_set(changes) + mongo_id = ObjectId(mongo_id) + is_project = mongo_id == self.avalon_project_id + change_data = from_dict_to_set(changes, is_project) + + filter = {"_id": mongo_id} mongo_changes_bulk.append(UpdateOne(filter, change_data)) if not mongo_changes_bulk: # TODO LOG return - log.debug("mongo_changes_bulk:: {}".format(mongo_changes_bulk)) self.dbcon.bulk_write(mongo_changes_bulk) def reload_parents(self, hierarchy_changing_ids): @@ -2107,6 +2152,18 @@ def prepare_project_changes(self): ) def compare_dict(self, dict_new, dict_old, _ignore_keys=[]): + """ + Recursively compares and list changes between dictionaries + 'dict_new' and 'dict_old'. + Keys in '_ignore_keys' are skipped and not compared. + Args: + dict_new (dictionary): + dict_old (dictionary): + _ignore_keys (list): + + Returns: + (dictionary) of new or updated keys and theirs values + """ # _ignore_keys may be used for keys nested dict like"data.visualParent" changes = {} ignore_keys = [] @@ -2148,6 +2205,18 @@ def compare_dict(self, dict_new, dict_old, _ignore_keys=[]): return changes def merge_dicts(self, dict_new, dict_old): + """ + Apply all new or updated keys from 'dict_new' on 'dict_old'. + Recursively. + Doesn't recognise that 'dict_new' doesn't contain some keys + anymore. + Args: + dict_new (dictionary): from Ftrack most likely + dict_old (dictionary): current in DB + + Returns: + (dictionary) of applied changes to original dictionary + """ for key, value in dict_new.items(): if key not in dict_old: dict_old[key] = value From c08a6adc570162045ee9008177df4712a57ef5ad Mon Sep 17 00:00:00 2001 From: Petr Kalis Date: Wed, 7 Oct 2020 15:35:22 +0200 Subject: [PATCH 2/3] Fix - renaming or deleting tasks in Ftrack wasn't synced Part for syncing through Event server --- .../ftrack/events/action_sync_to_avalon.py | 2 +- .../ftrack/events/event_sync_to_avalon.py | 82 ++++++++++++++----- 2 files changed, 63 insertions(+), 21 deletions(-) diff --git a/pype/modules/ftrack/events/action_sync_to_avalon.py b/pype/modules/ftrack/events/action_sync_to_avalon.py index 4e119228c34..01db8154134 100644 --- a/pype/modules/ftrack/events/action_sync_to_avalon.py +++ b/pype/modules/ftrack/events/action_sync_to_avalon.py @@ -15,7 +15,7 @@ class SyncToAvalonServer(BaseAction): - Data(dictionary): - VisualParent(ObjectId) - Avalon Id of parent asset - Parents(array of string) - All parent names except project - - Tasks(array of string) - Tasks on asset + - Tasks(dictionary of dictionaries) - Tasks on asset - FtrackId(string) - entityType(string) - entity's type on Ftrack * All Custom attributes in group 'Avalon' diff --git a/pype/modules/ftrack/events/event_sync_to_avalon.py b/pype/modules/ftrack/events/event_sync_to_avalon.py index d0c9ea2e966..b96741626c1 100644 --- a/pype/modules/ftrack/events/event_sync_to_avalon.py +++ b/pype/modules/ftrack/events/event_sync_to_avalon.py @@ -472,6 +472,16 @@ def filter_updated(self, updates): return filtered_updates def get_ent_path(self, ftrack_id): + """ + Looks for entity in FTrack with 'ftrack_id'. If found returns + concatenated paths from its 'link' elemenent's names. Describes + location of entity in tree. + Args: + ftrack_id (string): entityId of FTrack entity + + Returns: + (string) - example : "/test_project/assets/my_asset" + """ entity = self.ftrack_ents_by_id.get(ftrack_id) if not entity: entity = self.process_session.query( @@ -491,7 +501,7 @@ def launch(self, session, event): self.process_session.commit() except Exception: self.set_process_session(session) - + self.log.debug("start launch") # Reset object values for each launch self.reset_variables() self._cur_event = event @@ -502,7 +512,7 @@ def launch(self, session, event): "move": {}, "add": {} } - + self.log.debug("event_data:: {}".format(event["data"])) entities_info = event["data"]["entities"] found_actions = set() for ent_info in entities_info: @@ -741,8 +751,9 @@ def process_removed(self): avalon_ent["data"]["tasks"] ) - if removed_name in self.task_changes_by_avalon_id[mongo_id]: - self.task_changes_by_avalon_id[mongo_id].remove( + if removed_name in self.task_changes_by_avalon_id[mongo_id].\ + keys(): + self.task_changes_by_avalon_id[mongo_id].pop( removed_name ) @@ -1068,11 +1079,13 @@ def create_entity_in_avalon(self, ftrack_ent, parent_avalon): ) # Tasks - tasks = [] + tasks = {} for child in ftrack_ent["children"]: if child.entity_type.lower() != "task": continue - tasks.append(child["name"]) + self.log.debug("child:: {}".format(child)) + task_type = self._get_task_type(child['entityId']) + tasks[child["name"]] = {"type": task_type} # Visual Parent vis_par = None @@ -1423,8 +1436,8 @@ def process_renamed(self): avalon_ent["data"]["tasks"] ) - if old_name in self.task_changes_by_avalon_id[mongo_id]: - self.task_changes_by_avalon_id[mongo_id].remove(old_name) + if old_name in self.task_changes_by_avalon_id[mongo_id].keys(): + self.task_changes_by_avalon_id[mongo_id].pop(old_name) else: parent_ftrack_ent = self.ftrack_ents_by_id.get(parent_id) if not parent_ftrack_ent: @@ -1442,17 +1455,21 @@ def process_renamed(self): continue child_names.append(child["name"]) - tasks = [task for task in ( - self.task_changes_by_avalon_id[mongo_id] - )] - for task in tasks: - if task not in child_names: - self.task_changes_by_avalon_id[mongo_id].remove( - task + tasks = copy.deepcopy( + self.task_changes_by_avalon_id[mongo_id].keys() + ) + + for task_name in tasks: + if task_name not in child_names: + self.task_changes_by_avalon_id[mongo_id].pop( + task_name ) - if new_name not in self.task_changes_by_avalon_id[mongo_id]: - self.task_changes_by_avalon_id[mongo_id].append(new_name) + task_type = self._get_task_type(ent_info['entityId']) + if new_name not in self.task_changes_by_avalon_id[mongo_id].keys(): + self.task_changes_by_avalon_id[mongo_id][new_name] = { + "type": task_type + } # not_found are not processed since all not found are # not found because they are not synchronizable @@ -1688,8 +1705,12 @@ def process_added(self): self.regex_failed.append(ent_info["entityId"]) continue - if new_name not in self.task_changes_by_avalon_id[mongo_id]: - self.task_changes_by_avalon_id[mongo_id].append(new_name) + task_type = self._get_task_type(ent_info['entityId']) + if new_name not in \ + self.task_changes_by_avalon_id[mongo_id].keys(): + self.task_changes_by_avalon_id[mongo_id][new_name] = { + "type": task_type + } def _mongo_id_configuration( self, @@ -2293,7 +2314,9 @@ def update_entities(self): mongo_changes_bulk = [] for mongo_id, changes in self.updates.items(): filter = {"_id": mongo_id} - change_data = avalon_sync.from_dict_to_set(changes) + avalon_ent = self.avalon_ents_by_id[mongo_id] + is_project = avalon_ent["type"] == "project" + change_data = avalon_sync.from_dict_to_set(changes, is_project) mongo_changes_bulk.append(UpdateOne(filter, change_data)) if not mongo_changes_bulk: @@ -2477,6 +2500,25 @@ def report(self): ) return True + def _get_task_type(self, entityId): + """ + Returns task type ('Props', 'Art') from Task 'entityId' + Args: + entityId (string): entityId of Task + + Returns: + (string) - None if Task not found + """ + task_type = None + entity = self.process_session.query( + self.entities_query_by_id.format( + self.cur_project["id"], entityId + ) + ).first() + if entity: + task_type = entity["type"]["name"] + return task_type + def register(session, plugins_presets): '''Register plugin. Called when used as an plugin.''' From d6e86f852a09be5630d2933498fbaf676ad42a10 Mon Sep 17 00:00:00 2001 From: Petr Kalis Date: Wed, 7 Oct 2020 20:39:29 +0200 Subject: [PATCH 3/3] Fix - change of task type wasn't propagated Rewrite of approach, simplified, in any task change is noticed, parents of changed tasks are captured, all task entities are pulled from Ftrack and Task dictionaries are recreated from scratch. --- .../ftrack/events/event_sync_to_avalon.py | 473 +++++++++--------- 1 file changed, 249 insertions(+), 224 deletions(-) diff --git a/pype/modules/ftrack/events/event_sync_to_avalon.py b/pype/modules/ftrack/events/event_sync_to_avalon.py index b96741626c1..f3ef38b8682 100644 --- a/pype/modules/ftrack/events/event_sync_to_avalon.py +++ b/pype/modules/ftrack/events/event_sync_to_avalon.py @@ -40,6 +40,12 @@ class SyncToAvalonEvent(BaseEvent): "select id, name, parent_id, link, custom_attributes from TypedContext" " where project_id is \"{}\" and id in ({})" ) + + # useful for getting all tasks for asset + entities_query_by_parent_id = ( + "select id, name, parent_id, link, custom_attributes from TypedContext" + " where project_id is \"{}\" and parent_id in ({})" + ) entities_name_query_by_name = ( "select id, name from TypedContext" " where project_id is \"{}\" and name in ({})" @@ -313,9 +319,6 @@ def get_found_data(entity): if self._avalon_archived_by_id is not None: self._avalon_archived_by_id[mongo_id] = entity - if mongo_id in self.task_changes_by_avalon_id: - self.task_changes_by_avalon_id.pop(mongo_id) - def _bubble_changeability(self, unchangeable_ids): unchangeable_queue = queue.Queue() for entity_id in unchangeable_ids: @@ -383,8 +386,6 @@ def reset_variables(self): self._avalon_archived_by_id = None self._avalon_archived_by_name = None - self.task_changes_by_avalon_id = {} - self._avalon_custom_attributes = None self._ent_types_by_name = None @@ -398,6 +399,10 @@ def reset_variables(self): self.ftrack_updated = {} self.ftrack_removed = {} + # set of ftrack ids with modified tasks + # handled separately by full wipeout and replace from FTrack + self.modified_tasks_ftrackids = set() + self.moved_in_avalon = [] self.renamed_in_avalon = [] self.hier_cust_attrs_changes = collections.defaultdict(list) @@ -496,12 +501,24 @@ def get_ent_path(self, ftrack_id): return "/".join([ent["name"] for ent in entity["link"]]) def launch(self, session, event): + """ + Main entry port for synchronization. + Goes through event (can contain multiple changes) and decides if + the event is interesting for us (interest_entTypes). + It separates changes into add|remove|update. + All task changes are handled together by refresh from Ftrack. + Args: + session (object): session to Ftrack + event (dictionary): event content + + Returns: + (boolean or None) + """ # Try to commit and if any error happen then recreate session try: self.process_session.commit() except Exception: self.set_process_session(session) - self.log.debug("start launch") # Reset object values for each launch self.reset_variables() self._cur_event = event @@ -512,7 +529,7 @@ def launch(self, session, event): "move": {}, "add": {} } - self.log.debug("event_data:: {}".format(event["data"])) + entities_info = event["data"]["entities"] found_actions = set() for ent_info in entities_info: @@ -537,26 +554,45 @@ def launch(self, session, event): continue ftrack_id = ftrack_id[0] + # task modified, collect parent id of task, handle separately + if entityType.lower() == 'task': + self.modified_tasks_ftrackids.add(ent_info["parentId"]) + if action == "move": ent_keys = ent_info["keys"] - # Seprate update info from move action + # Separate update info from move action if len(ent_keys) > 1: _ent_info = ent_info.copy() for ent_key in ent_keys: if ent_key == "parent_id": + # task parents modified, collect both + if entityType.lower() == 'task': + self.modified_tasks_ftrackids.add( + ent_info["changes"]["new"]) + self.modified_tasks_ftrackids.add( + ent_info["changes"]["old"]) _ent_info["changes"].pop(ent_key, None) _ent_info["keys"].remove(ent_key) else: ent_info["changes"].pop(ent_key, None) ent_info["keys"].remove(ent_key) + if entityType.lower() != 'task': + entities_by_action["update"][ftrack_id] = _ent_info + else: + if entityType.lower() == 'task': + self.modified_tasks_ftrackids.add( + ent_info["changes"]["parent_id"]["new"]) + self.modified_tasks_ftrackids.add( + ent_info["changes"]["parent_id"]["old"] + ) - entities_by_action["update"][ftrack_id] = _ent_info - - found_actions.add(action) - entities_by_action[action][ftrack_id] = ent_info + # regular change process handles all other than Tasks + if entityType.lower() != 'task': + found_actions.add(action) + entities_by_action[action][ftrack_id] = ent_info found_actions = list(found_actions) - if not found_actions: + if not found_actions and not self.modified_tasks_ftrackids: return True # Check if auto sync was turned on/off @@ -632,26 +668,17 @@ def launch(self, session, event): ft_project["full_name"], debug_msg )) # Get ftrack entities - find all ftrack ids first - ftrack_ids = [] - for ftrack_id in updated: - ftrack_ids.append(ftrack_id) + ftrack_ids = set(updated.keys()) - for action, ftrack_ids in entities_by_action.items(): + for action, _ftrack_ids in entities_by_action.items(): # skip updated (already prepared) and removed (not exist in ftrack) - if action == "remove": - continue - - for ftrack_id in ftrack_ids: - if ftrack_id not in ftrack_ids: - ftrack_ids.append(ftrack_id) + if action not in ("remove", "update"): + ftrack_ids.union(set(_ftrack_ids)) - if ftrack_ids: - joined_ids = ", ".join(["\"{}\"".format(id) for id in ftrack_ids]) - ftrack_entities = self.process_session.query( - self.entities_query_by_id.format(ft_project["id"], joined_ids) - ).all() - for entity in ftrack_entities: - self.ftrack_ents_by_id[entity["id"]] = entity + # collect entity records data which might not be in event + for entity in self._get_entities_for_ftrack_ids(ft_project["id"], + ftrack_ids): + self.ftrack_ents_by_id[entity["id"]] = entity # Filter updates where name is changing for ftrack_id, ent_info in updated.items(): @@ -698,9 +725,11 @@ def launch(self, session, event): time_6 = time.time() # 6.) Process changes in hierarchy or hier custom attribues self.process_hier_cleanup() + time_7 = time.time() + self.process_task_updates() if self.updates: self.update_entities() - time_7 = time.time() + time_8 = time.time() time_removed = time_2 - time_1 time_renamed = time_3 - time_2 @@ -708,11 +737,15 @@ def launch(self, session, event): time_moved = time_5 - time_4 time_updated = time_6 - time_5 time_cleanup = time_7 - time_6 - time_total = time_7 - time_1 - self.log.debug("Process time: {} <{}, {}, {}, {}, {}, {}>".format( - time_total, time_removed, time_renamed, time_added, time_moved, - time_updated, time_cleanup - )) + time_task_updates = time_8 - time_7 + time_total = time_8 - time_1 + self.log.debug( + "Process time: {:.2f} <{:.2f}, {:.2f}, {:.2f}, ".format( + time_total, time_removed, time_renamed, time_added) + + "{:.2f}, {:.2f}, {:.2f}, {:.2f}>".format( + time_moved, time_updated, time_cleanup, time_task_updates + ) + ) except Exception: msg = "An error has happened during synchronization" @@ -724,6 +757,9 @@ def launch(self, session, event): return True def process_removed(self): + """ + Handles removed entities (not removed tasks - handle separately). + """ if not self.ftrack_removed: return ent_infos = self.ftrack_removed @@ -734,31 +770,12 @@ def process_removed(self): recreate_ents = [] removed_names = [] for ftrack_id, removed in ent_infos.items(): - entity_type = removed["entity_type"] - parent_id = removed["parentId"] - removed_name = removed["changes"]["name"]["old"] if entity_type == "Task": - avalon_ent = self.avalon_ents_by_ftrack_id.get(parent_id) - if not avalon_ent: - self.log.debug(( - "Parent entity of task was not found in avalon <{}>" - ).format(self.get_ent_path(parent_id))) - continue - - mongo_id = avalon_ent["_id"] - if mongo_id not in self.task_changes_by_avalon_id: - self.task_changes_by_avalon_id[mongo_id] = ( - avalon_ent["data"]["tasks"] - ) - - if removed_name in self.task_changes_by_avalon_id[mongo_id].\ - keys(): - self.task_changes_by_avalon_id[mongo_id].pop( - removed_name - ) - continue + entity_type = removed["entity_type"] + removed_name = removed["changes"]["name"]["old"] + avalon_ent = self.avalon_ents_by_ftrack_id.get(ftrack_id) if not avalon_ent: continue @@ -1084,7 +1101,8 @@ def create_entity_in_avalon(self, ftrack_ent, parent_avalon): if child.entity_type.lower() != "task": continue self.log.debug("child:: {}".format(child)) - task_type = self._get_task_type(child['entityId']) + task_type = self._get_task_type(self.cur_project["id"], + child['entityId']) tasks[child["name"]] = {"type": task_type} # Visual Parent @@ -1280,22 +1298,15 @@ def process_renamed(self): "Processing renamed entities: {}".format(str(ent_infos)) ) - renamed_tasks = {} - not_found = {} changeable_queue = queue.Queue() for ftrack_id, ent_info in ent_infos.items(): entity_type = ent_info["entity_type"] - new_name = ent_info["changes"]["name"]["new"] - old_name = ent_info["changes"]["name"]["old"] if entity_type == "Task": - parent_id = ent_info["parentId"] - renamed_tasks[parent_id] = { - "new": new_name, - "old": old_name, - "ent_info": ent_info - } continue + new_name = ent_info["changes"]["name"]["new"] + old_name = ent_info["changes"]["name"]["old"] + ent_path = self.get_ent_path(ftrack_id) avalon_ent = self.avalon_ents_by_ftrack_id.get(ftrack_id) if not avalon_ent: @@ -1413,64 +1424,6 @@ def process_renamed(self): if old_names: self.check_names_synchronizable(old_names) - for parent_id, task_change in renamed_tasks.items(): - avalon_ent = self.avalon_ents_by_ftrack_id.get(parent_id) - ent_info = task_change["ent_info"] - if not avalon_ent: - not_found[ent_info["entityId"]] = ent_info - continue - - new_name = task_change["new"] - old_name = task_change["old"] - passed_regex = avalon_sync.check_regex( - new_name, "task", schema_patterns=self.regex_schemas - ) - if not passed_regex: - ftrack_id = ent_info["enityId"] - self.regex_failed.append(ftrack_id) - continue - - mongo_id = avalon_ent["_id"] - if mongo_id not in self.task_changes_by_avalon_id: - self.task_changes_by_avalon_id[mongo_id] = ( - avalon_ent["data"]["tasks"] - ) - - if old_name in self.task_changes_by_avalon_id[mongo_id].keys(): - self.task_changes_by_avalon_id[mongo_id].pop(old_name) - else: - parent_ftrack_ent = self.ftrack_ents_by_id.get(parent_id) - if not parent_ftrack_ent: - parent_ftrack_ent = self.process_session.query( - self.entities_query_by_id.format( - self.cur_project["id"], parent_id - ) - ).first() - - if parent_ftrack_ent: - self.ftrack_ents_by_id[parent_id] = parent_ftrack_ent - child_names = [] - for child in parent_ftrack_ent["children"]: - if child.entity_type.lower() != "task": - continue - child_names.append(child["name"]) - - tasks = copy.deepcopy( - self.task_changes_by_avalon_id[mongo_id].keys() - ) - - for task_name in tasks: - if task_name not in child_names: - self.task_changes_by_avalon_id[mongo_id].pop( - task_name - ) - - task_type = self._get_task_type(ent_info['entityId']) - if new_name not in self.task_changes_by_avalon_id[mongo_id].keys(): - self.task_changes_by_avalon_id[mongo_id][new_name] = { - "type": task_type - } - # not_found are not processed since all not found are # not found because they are not synchronizable @@ -1488,7 +1441,6 @@ def process_added(self): # Skip if already exit in avalon db or tasks entities # - happen when was created by any sync event/action pop_out_ents = [] - new_tasks_by_parent = collections.defaultdict(list) for ftrack_id, ent_info in ent_infos.items(): if self.avalon_ents_by_ftrack_id.get(ftrack_id): pop_out_ents.append(ftrack_id) @@ -1501,9 +1453,6 @@ def process_added(self): entity_type = ent_info["entity_type"] if entity_type == "Task": - parent_id = ent_info["parentId"] - new_tasks_by_parent[parent_id].append(ent_info) - pop_out_ents.append(ftrack_id) continue name = ( @@ -1680,86 +1629,11 @@ def process_added(self): self.create_entity_in_avalon(entity, parent_avalon) - for parent_id, ent_infos in new_tasks_by_parent.items(): - avalon_ent = self.avalon_ents_by_ftrack_id.get(parent_id) - if not avalon_ent: - # TODO logging - self.log.debug(( - "Skipping synchronization of task" - " because parent was not found in Avalon DB <{}>" - ).format(self.get_ent_path(parent_id))) - continue - - mongo_id = avalon_ent["_id"] - if mongo_id not in self.task_changes_by_avalon_id: - self.task_changes_by_avalon_id[mongo_id] = ( - avalon_ent["data"]["tasks"] - ) - - for ent_info in ent_infos: - new_name = ent_info["changes"]["name"]["new"] - passed_regex = avalon_sync.check_regex( - new_name, "task", schema_patterns=self.regex_schemas - ) - if not passed_regex: - self.regex_failed.append(ent_info["entityId"]) - continue - - task_type = self._get_task_type(ent_info['entityId']) - if new_name not in \ - self.task_changes_by_avalon_id[mongo_id].keys(): - self.task_changes_by_avalon_id[mongo_id][new_name] = { - "type": task_type - } - - def _mongo_id_configuration( - self, - ent_info, - cust_attrs, - hier_attrs, - temp_dict - ): - # Use hierarchical mongo id attribute if possible. - if "_hierarchical" not in temp_dict: - hier_mongo_id_configuration_id = None - for attr in hier_attrs: - if attr["key"] == CUST_ATTR_ID_KEY: - hier_mongo_id_configuration_id = attr["id"] - break - temp_dict["_hierarchical"] = hier_mongo_id_configuration_id - - hier_mongo_id_configuration_id = temp_dict.get("_hierarchical") - if hier_mongo_id_configuration_id is not None: - return hier_mongo_id_configuration_id - - # Legacy part for cases that MongoID attribute is per entity type. - entity_type = ent_info["entity_type"] - mongo_id_configuration_id = temp_dict.get(entity_type) - if mongo_id_configuration_id is not None: - return mongo_id_configuration_id - - for attr in cust_attrs: - key = attr["key"] - if key != CUST_ATTR_ID_KEY: - continue - - if attr["entity_type"] != ent_info["entityType"]: - continue - - if ( - ent_info["entityType"] == "task" and - attr["object_type_id"] != ent_info["objectTypeId"] - ): - continue - - mongo_id_configuration_id = attr["id"] - break - - temp_dict[entity_type] = mongo_id_configuration_id - - return mongo_id_configuration_id - def process_moved(self): + """ + Handles moved entities to different place in hiearchy. + (Not tasks - handled separately.) + """ if not self.ftrack_moved: return @@ -1893,7 +1767,9 @@ def process_moved(self): ) def process_updated(self): - # Only custom attributes changes should get here + """ + Only custom attributes changes should get here + """ if not self.ftrack_updated: return @@ -1991,8 +1867,7 @@ def process_hier_cleanup(self): if ( not self.moved_in_avalon and not self.renamed_in_avalon and - not self.hier_cust_attrs_changes and - not self.task_changes_by_avalon_id + not self.hier_cust_attrs_changes ): return @@ -2021,14 +1896,6 @@ def process_hier_cleanup(self): if not all_keys and key not in hier_cust_attrs_keys: hier_cust_attrs_keys.append(key) - # Tasks preparation **** - for mongo_id, tasks in self.task_changes_by_avalon_id.items(): - avalon_ent = self.avalon_ents_by_id[mongo_id] - if "data" not in self.updates[mongo_id]: - self.updates[mongo_id]["data"] = {} - - self.updates[mongo_id]["data"]["tasks"] = tasks - # Parents preparation *** mongo_to_ftrack_parents = {} missing_ftrack_ents = {} @@ -2310,7 +2177,69 @@ def process_hier_cleanup(self): self.update_entities() + def process_task_updates(self): + """ + Pull task information for selected ftrack ids to replace stored + existing in Avalon. + Solves problem of changing type (even Status in the future) of + task without storing ftrack id for task in the DB. (Which doesn't + bring much advantage currently and it could be troublesome for + all hosts or plugins (for example Nuke) to collect and store. + Returns: + None + """ + self.log.debug( + "Processing task changes for parents: {}".format( + self.modified_tasks_ftrackids + ) + ) + if not self.modified_tasks_ftrackids: + return + entities = self._get_entities_for_ftrack_ids( + self.cur_project["id"], + self.modified_tasks_ftrackids) + + ftrack_mongo_mapping_found = {} + not_found_ids = [] + tasks_per_ftrack_id = {} + + # prepare all tasks per parentId, eg. Avalon asset record + for entity in entities: + ftrack_id = entity["parent_id"] + if ftrack_id not in tasks_per_ftrack_id: + tasks_per_ftrack_id[ftrack_id] = {} + + passed_regex = avalon_sync.check_regex( + entity["name"], "task", + schema_patterns=self.regex_schemas + ) + if not passed_regex: + entity_id = entity["id"] + self.regex_failed.append(entity_id) + continue + + task = {"type": entity["type"]["name"]} + tasks_per_ftrack_id[ftrack_id][entity["name"]] = task + + # find avalon entity by parentId + # should be there as create was run first + for ftrack_id in tasks_per_ftrack_id.keys(): + avalon_entity = self.avalon_ents_by_ftrack_id.get(ftrack_id) + if not avalon_entity: + not_found_ids.append(ftrack_id) + continue + ftrack_mongo_mapping_found[ftrack_id] = avalon_entity["_id"] + + self._update_avalon_tasks(ftrack_mongo_mapping_found, + tasks_per_ftrack_id) + def update_entities(self): + """ + Update Avalon entities by mongo bulk changes. + Expects self.updates which are transfered to $set part of update + command. + Resets self.updates afterwards. + """ mongo_changes_bulk = [] for mongo_id, changes in self.updates.items(): filter = {"_id": mongo_id} @@ -2500,10 +2429,82 @@ def report(self): ) return True - def _get_task_type(self, entityId): + def _update_avalon_tasks(self, ftrack_mongo_mapping_found, + tasks_per_ftrack_id): """ - Returns task type ('Props', 'Art') from Task 'entityId' + Prepare new "tasks" content for existing records in Avalon. Args: + ftrack_mongo_mapping_found (dictionary): ftrack parentId to + Avalon _id mapping + tasks_per_ftrack_id (dictionary): task dictionaries per ftrack + parentId + + Returns: + None + """ + mongo_changes_bulk = [] + for ftrack_id, mongo_id in ftrack_mongo_mapping_found.items(): + filter = {"_id": mongo_id} + change_data = {"$set": {}} + change_data["$set"]["data.tasks"] = tasks_per_ftrack_id[ftrack_id] + mongo_changes_bulk.append(UpdateOne(filter, change_data)) + if not mongo_changes_bulk: + return + + self.dbcon.bulk_write(mongo_changes_bulk) + + def _mongo_id_configuration( + self, + ent_info, + cust_attrs, + hier_attrs, + temp_dict + ): + # Use hierarchical mongo id attribute if possible. + if "_hierarchical" not in temp_dict: + hier_mongo_id_configuration_id = None + for attr in hier_attrs: + if attr["key"] == CUST_ATTR_ID_KEY: + hier_mongo_id_configuration_id = attr["id"] + break + temp_dict["_hierarchical"] = hier_mongo_id_configuration_id + + hier_mongo_id_configuration_id = temp_dict.get("_hierarchical") + if hier_mongo_id_configuration_id is not None: + return hier_mongo_id_configuration_id + + # Legacy part for cases that MongoID attribute is per entity type. + entity_type = ent_info["entity_type"] + mongo_id_configuration_id = temp_dict.get(entity_type) + if mongo_id_configuration_id is not None: + return mongo_id_configuration_id + + for attr in cust_attrs: + key = attr["key"] + if key != CUST_ATTR_ID_KEY: + continue + + if attr["entity_type"] != ent_info["entityType"]: + continue + + if ( + ent_info["entityType"] == "task" and + attr["object_type_id"] != ent_info["objectTypeId"] + ): + continue + + mongo_id_configuration_id = attr["id"] + break + + temp_dict[entity_type] = mongo_id_configuration_id + + return mongo_id_configuration_id + + def _get_task_type(self, project_id, entityId): + """ + Returns task type ('Props', 'Art') from Task 'entityId'. + Args: + project_id (string): entityId (string): entityId of Task Returns: @@ -2512,13 +2513,37 @@ def _get_task_type(self, entityId): task_type = None entity = self.process_session.query( self.entities_query_by_id.format( - self.cur_project["id"], entityId + project_id, entityId ) ).first() if entity: task_type = entity["type"]["name"] return task_type + def _get_entities_for_ftrack_ids(self, ft_project_id, ftrack_ids): + """ + Query Ftrack API and return all entities for particular + 'ft_project' and their parent_id in 'ftrack_ids'. + It is much faster to run this once for multiple ids than run it + for each separately. + Used mainly for collecting task information + Args: + ft_project_id (string): + ftrack_ids (list): of strings + + Returns: + (list) of Ftrack entities + """ + ftrack_entities = [] + if ftrack_ids: + joined_ids = ", ".join(["\"{}\"".format(id) for id in ftrack_ids]) + ftrack_entities = self.process_session.query( + self.entities_query_by_parent_id.format(ft_project_id, + joined_ids) + ).all() + + return ftrack_entities + def register(session, plugins_presets): '''Register plugin. Called when used as an plugin.'''