Skip to content
This repository has been archived by the owner on Sep 20, 2024. It is now read-only.

Ftrack: Replace Queue with deque in event handlers logic #2204

Merged
merged 3 commits into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import os
import collections
import copy
import json
import queue
import time
import datetime
import atexit
Expand Down Expand Up @@ -340,13 +338,13 @@ def get_found_data(entity):
self._avalon_archived_by_id[mongo_id] = entity

def _bubble_changeability(self, unchangeable_ids):
unchangeable_queue = queue.Queue()
unchangeable_queue = collections.deque()
for entity_id in unchangeable_ids:
unchangeable_queue.put((entity_id, False))
unchangeable_queue.append((entity_id, False))

processed_parents_ids = []
while not unchangeable_queue.empty():
entity_id, child_is_archived = unchangeable_queue.get()
while unchangeable_queue:
entity_id, child_is_archived = unchangeable_queue.popleft()
# skip if already processed
if entity_id in processed_parents_ids:
continue
Expand Down Expand Up @@ -388,7 +386,7 @@ def _bubble_changeability(self, unchangeable_ids):
parent_id = entity["data"]["visualParent"]
if parent_id is None:
continue
unchangeable_queue.put((parent_id, child_is_archived))
unchangeable_queue.append((parent_id, child_is_archived))

def reset_variables(self):
"""Reset variables so each event callback has clear env."""
Expand Down Expand Up @@ -1050,7 +1048,7 @@ def check_names_synchronizable(self, names):
key=(lambda entity: len(entity["link"]))
)

children_queue = queue.Queue()
children_queue = collections.deque()
for entity in synchronizable_ents:
parent_avalon_ent = self.avalon_ents_by_ftrack_id[
entity["parent_id"]
Expand All @@ -1060,10 +1058,10 @@ def check_names_synchronizable(self, names):
for child in entity["children"]:
if child.entity_type.lower() == "task":
continue
children_queue.put(child)
children_queue.append(child)

while not children_queue.empty():
entity = children_queue.get()
while children_queue:
entity = children_queue.popleft()
ftrack_id = entity["id"]
name = entity["name"]
ent_by_ftrack_id = self.avalon_ents_by_ftrack_id.get(ftrack_id)
Expand Down Expand Up @@ -1093,7 +1091,7 @@ def check_names_synchronizable(self, names):
for child in entity["children"]:
if child.entity_type.lower() == "task":
continue
children_queue.put(child)
children_queue.append(child)

def create_entity_in_avalon(self, ftrack_ent, parent_avalon):
proj, ents = self.avalon_entities
Expand Down Expand Up @@ -1278,7 +1276,7 @@ def process_renamed(self):
"Processing renamed entities: {}".format(str(ent_infos))
)

changeable_queue = queue.Queue()
changeable_queue = collections.deque()
for ftrack_id, ent_info in ent_infos.items():
entity_type = ent_info["entity_type"]
if entity_type == "Task":
Expand Down Expand Up @@ -1306,7 +1304,7 @@ def process_renamed(self):

mongo_id = avalon_ent["_id"]
if self.changeability_by_mongo_id[mongo_id]:
changeable_queue.put((ftrack_id, avalon_ent, new_name))
changeable_queue.append((ftrack_id, avalon_ent, new_name))
else:
ftrack_ent = self.ftrack_ents_by_id[ftrack_id]
ftrack_ent["name"] = avalon_ent["name"]
Expand Down Expand Up @@ -1348,8 +1346,8 @@ def process_renamed(self):

old_names = []
# Process renaming in Avalon DB
while not changeable_queue.empty():
ftrack_id, avalon_ent, new_name = changeable_queue.get()
while changeable_queue:
ftrack_id, avalon_ent, new_name = changeable_queue.popleft()
mongo_id = avalon_ent["_id"]
old_name = avalon_ent["name"]

Expand Down Expand Up @@ -1390,13 +1388,13 @@ def process_renamed(self):
# - it's name may be changed in next iteration
same_name_ftrack_id = same_name_avalon_ent["data"]["ftrackId"]
same_is_unprocessed = False
for item in list(changeable_queue.queue):
for item in changeable_queue:
if same_name_ftrack_id == item[0]:
same_is_unprocessed = True
break

if same_is_unprocessed:
changeable_queue.put((ftrack_id, avalon_ent, new_name))
changeable_queue.append((ftrack_id, avalon_ent, new_name))
continue

self.duplicated.append(ftrack_id)
Expand Down Expand Up @@ -2008,12 +2006,12 @@ def process_hier_cleanup(self):
# ftrack_parenting = collections.defaultdict(list)
entities_dict = collections.defaultdict(dict)

children_queue = queue.Queue()
parent_queue = queue.Queue()
children_queue = collections.deque()
parent_queue = collections.deque()

for mongo_id in hier_cust_attrs_ids:
avalon_ent = self.avalon_ents_by_id[mongo_id]
parent_queue.put(avalon_ent)
parent_queue.append(avalon_ent)
ftrack_id = avalon_ent["data"]["ftrackId"]
if ftrack_id not in entities_dict:
entities_dict[ftrack_id] = {
Expand All @@ -2040,10 +2038,10 @@ def process_hier_cleanup(self):
entities_dict[_ftrack_id]["parent_id"] = ftrack_id
if _ftrack_id not in entities_dict[ftrack_id]["children"]:
entities_dict[ftrack_id]["children"].append(_ftrack_id)
children_queue.put(children_ent)
children_queue.append(children_ent)

while not children_queue.empty():
avalon_ent = children_queue.get()
while children_queue:
avalon_ent = children_queue.popleft()
mongo_id = avalon_ent["_id"]
ftrack_id = avalon_ent["data"]["ftrackId"]
if ftrack_id in cust_attrs_ftrack_ids:
Expand All @@ -2066,10 +2064,10 @@ def process_hier_cleanup(self):
entities_dict[_ftrack_id]["parent_id"] = ftrack_id
if _ftrack_id not in entities_dict[ftrack_id]["children"]:
entities_dict[ftrack_id]["children"].append(_ftrack_id)
children_queue.put(children_ent)
children_queue.append(children_ent)

while not parent_queue.empty():
avalon_ent = parent_queue.get()
while parent_queue:
avalon_ent = parent_queue.popleft()
if avalon_ent["type"].lower() == "project":
continue

Expand Down Expand Up @@ -2100,7 +2098,7 @@ def process_hier_cleanup(self):
# if ftrack_id not in ftrack_parenting[parent_ftrack_id]:
# ftrack_parenting[parent_ftrack_id].append(ftrack_id)

parent_queue.put(parent_ent)
parent_queue.append(parent_ent)

# Prepare values to query
configuration_ids = set()
Expand Down Expand Up @@ -2174,11 +2172,13 @@ def process_hier_cleanup(self):
if value is not None:
project_values[key] = value

hier_down_queue = queue.Queue()
hier_down_queue.put((project_values, ftrack_project_id))
hier_down_queue = collections.deque()
hier_down_queue.append(
(project_values, ftrack_project_id)
)

while not hier_down_queue.empty():
hier_values, parent_id = hier_down_queue.get()
while hier_down_queue:
hier_values, parent_id = hier_down_queue.popleft()
for child_id in entities_dict[parent_id]["children"]:
_hier_values = hier_values.copy()
for name in hier_cust_attrs_keys:
Expand All @@ -2187,7 +2187,7 @@ def process_hier_cleanup(self):
_hier_values[name] = value

entities_dict[child_id]["hier_attrs"].update(_hier_values)
hier_down_queue.put((_hier_values, child_id))
hier_down_queue.append((_hier_values, child_id))

ftrack_mongo_mapping = {}
for mongo_id, ftrack_id in mongo_ftrack_mapping.items():
Expand Down Expand Up @@ -2302,11 +2302,12 @@ def update_entities(self):
"""
mongo_changes_bulk = []
for mongo_id, changes in self.updates.items():
filter = {"_id": mongo_id}
avalon_ent = self.avalon_ents_by_id[mongo_id]
is_project = avalon_ent["type"] == "project"
change_data = avalon_sync.from_dict_to_set(changes, is_project)
mongo_changes_bulk.append(UpdateOne(filter, change_data))
mongo_changes_bulk.append(
UpdateOne({"_id": mongo_id}, change_data)
)

if not mongo_changes_bulk:
return
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import collections
import uuid
from datetime import datetime
from queue import Queue

from bson.objectid import ObjectId
from openpype_modules.ftrack.lib import BaseAction, statics_icon
Expand Down Expand Up @@ -473,12 +472,12 @@ def launch(self, session, entities, event):
continue
ftrack_ids_to_delete.append(ftrack_id)

children_queue = Queue()
children_queue = collections.deque()
for mongo_id in assets_to_delete:
children_queue.put(mongo_id)
children_queue.append(mongo_id)

while not children_queue.empty():
mongo_id = children_queue.get()
while children_queue:
mongo_id = children_queue.popleft()
if mongo_id in asset_ids_to_archive:
continue

Expand All @@ -494,7 +493,7 @@ def launch(self, session, entities, event):
for child in children:
child_id = child["_id"]
if child_id not in asset_ids_to_archive:
children_queue.put(child_id)
children_queue.append(child_id)

# Prepare names of assets in ftrack and ids of subsets in mongo
asset_names_to_delete = []
Expand Down
Loading