Skip to content
This repository has been archived by the owner on Sep 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2204 from pypeclub/feature/replace_queue_with_deque
Browse files Browse the repository at this point in the history
Ftrack: Replace Queue with deque in event handlers logic
  • Loading branch information
iLLiCiTiT authored Nov 3, 2021
2 parents 74257d5 + 9b0dc76 commit afedaf6
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 110 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import os
import collections
import copy
import json
import queue
import time
import datetime
import atexit
Expand Down Expand Up @@ -340,13 +338,13 @@ def get_found_data(entity):
self._avalon_archived_by_id[mongo_id] = entity

def _bubble_changeability(self, unchangeable_ids):
unchangeable_queue = queue.Queue()
unchangeable_queue = collections.deque()
for entity_id in unchangeable_ids:
unchangeable_queue.put((entity_id, False))
unchangeable_queue.append((entity_id, False))

processed_parents_ids = []
while not unchangeable_queue.empty():
entity_id, child_is_archived = unchangeable_queue.get()
while unchangeable_queue:
entity_id, child_is_archived = unchangeable_queue.popleft()
# skip if already processed
if entity_id in processed_parents_ids:
continue
Expand Down Expand Up @@ -388,7 +386,7 @@ def _bubble_changeability(self, unchangeable_ids):
parent_id = entity["data"]["visualParent"]
if parent_id is None:
continue
unchangeable_queue.put((parent_id, child_is_archived))
unchangeable_queue.append((parent_id, child_is_archived))

def reset_variables(self):
"""Reset variables so each event callback has clear env."""
Expand Down Expand Up @@ -1050,7 +1048,7 @@ def check_names_synchronizable(self, names):
key=(lambda entity: len(entity["link"]))
)

children_queue = queue.Queue()
children_queue = collections.deque()
for entity in synchronizable_ents:
parent_avalon_ent = self.avalon_ents_by_ftrack_id[
entity["parent_id"]
Expand All @@ -1060,10 +1058,10 @@ def check_names_synchronizable(self, names):
for child in entity["children"]:
if child.entity_type.lower() == "task":
continue
children_queue.put(child)
children_queue.append(child)

while not children_queue.empty():
entity = children_queue.get()
while children_queue:
entity = children_queue.popleft()
ftrack_id = entity["id"]
name = entity["name"]
ent_by_ftrack_id = self.avalon_ents_by_ftrack_id.get(ftrack_id)
Expand Down Expand Up @@ -1093,7 +1091,7 @@ def check_names_synchronizable(self, names):
for child in entity["children"]:
if child.entity_type.lower() == "task":
continue
children_queue.put(child)
children_queue.append(child)

def create_entity_in_avalon(self, ftrack_ent, parent_avalon):
proj, ents = self.avalon_entities
Expand Down Expand Up @@ -1278,7 +1276,7 @@ def process_renamed(self):
"Processing renamed entities: {}".format(str(ent_infos))
)

changeable_queue = queue.Queue()
changeable_queue = collections.deque()
for ftrack_id, ent_info in ent_infos.items():
entity_type = ent_info["entity_type"]
if entity_type == "Task":
Expand Down Expand Up @@ -1306,7 +1304,7 @@ def process_renamed(self):

mongo_id = avalon_ent["_id"]
if self.changeability_by_mongo_id[mongo_id]:
changeable_queue.put((ftrack_id, avalon_ent, new_name))
changeable_queue.append((ftrack_id, avalon_ent, new_name))
else:
ftrack_ent = self.ftrack_ents_by_id[ftrack_id]
ftrack_ent["name"] = avalon_ent["name"]
Expand Down Expand Up @@ -1348,8 +1346,8 @@ def process_renamed(self):

old_names = []
# Process renaming in Avalon DB
while not changeable_queue.empty():
ftrack_id, avalon_ent, new_name = changeable_queue.get()
while changeable_queue:
ftrack_id, avalon_ent, new_name = changeable_queue.popleft()
mongo_id = avalon_ent["_id"]
old_name = avalon_ent["name"]

Expand Down Expand Up @@ -1390,13 +1388,13 @@ def process_renamed(self):
# - it's name may be changed in next iteration
same_name_ftrack_id = same_name_avalon_ent["data"]["ftrackId"]
same_is_unprocessed = False
for item in list(changeable_queue.queue):
for item in changeable_queue:
if same_name_ftrack_id == item[0]:
same_is_unprocessed = True
break

if same_is_unprocessed:
changeable_queue.put((ftrack_id, avalon_ent, new_name))
changeable_queue.append((ftrack_id, avalon_ent, new_name))
continue

self.duplicated.append(ftrack_id)
Expand Down Expand Up @@ -2008,12 +2006,12 @@ def process_hier_cleanup(self):
# ftrack_parenting = collections.defaultdict(list)
entities_dict = collections.defaultdict(dict)

children_queue = queue.Queue()
parent_queue = queue.Queue()
children_queue = collections.deque()
parent_queue = collections.deque()

for mongo_id in hier_cust_attrs_ids:
avalon_ent = self.avalon_ents_by_id[mongo_id]
parent_queue.put(avalon_ent)
parent_queue.append(avalon_ent)
ftrack_id = avalon_ent["data"]["ftrackId"]
if ftrack_id not in entities_dict:
entities_dict[ftrack_id] = {
Expand All @@ -2040,10 +2038,10 @@ def process_hier_cleanup(self):
entities_dict[_ftrack_id]["parent_id"] = ftrack_id
if _ftrack_id not in entities_dict[ftrack_id]["children"]:
entities_dict[ftrack_id]["children"].append(_ftrack_id)
children_queue.put(children_ent)
children_queue.append(children_ent)

while not children_queue.empty():
avalon_ent = children_queue.get()
while children_queue:
avalon_ent = children_queue.popleft()
mongo_id = avalon_ent["_id"]
ftrack_id = avalon_ent["data"]["ftrackId"]
if ftrack_id in cust_attrs_ftrack_ids:
Expand All @@ -2066,10 +2064,10 @@ def process_hier_cleanup(self):
entities_dict[_ftrack_id]["parent_id"] = ftrack_id
if _ftrack_id not in entities_dict[ftrack_id]["children"]:
entities_dict[ftrack_id]["children"].append(_ftrack_id)
children_queue.put(children_ent)
children_queue.append(children_ent)

while not parent_queue.empty():
avalon_ent = parent_queue.get()
while parent_queue:
avalon_ent = parent_queue.popleft()
if avalon_ent["type"].lower() == "project":
continue

Expand Down Expand Up @@ -2100,7 +2098,7 @@ def process_hier_cleanup(self):
# if ftrack_id not in ftrack_parenting[parent_ftrack_id]:
# ftrack_parenting[parent_ftrack_id].append(ftrack_id)

parent_queue.put(parent_ent)
parent_queue.append(parent_ent)

# Prepare values to query
configuration_ids = set()
Expand Down Expand Up @@ -2174,11 +2172,13 @@ def process_hier_cleanup(self):
if value is not None:
project_values[key] = value

hier_down_queue = queue.Queue()
hier_down_queue.put((project_values, ftrack_project_id))
hier_down_queue = collections.deque()
hier_down_queue.append(
(project_values, ftrack_project_id)
)

while not hier_down_queue.empty():
hier_values, parent_id = hier_down_queue.get()
while hier_down_queue:
hier_values, parent_id = hier_down_queue.popleft()
for child_id in entities_dict[parent_id]["children"]:
_hier_values = hier_values.copy()
for name in hier_cust_attrs_keys:
Expand All @@ -2187,7 +2187,7 @@ def process_hier_cleanup(self):
_hier_values[name] = value

entities_dict[child_id]["hier_attrs"].update(_hier_values)
hier_down_queue.put((_hier_values, child_id))
hier_down_queue.append((_hier_values, child_id))

ftrack_mongo_mapping = {}
for mongo_id, ftrack_id in mongo_ftrack_mapping.items():
Expand Down Expand Up @@ -2302,11 +2302,12 @@ def update_entities(self):
"""
mongo_changes_bulk = []
for mongo_id, changes in self.updates.items():
filter = {"_id": mongo_id}
avalon_ent = self.avalon_ents_by_id[mongo_id]
is_project = avalon_ent["type"] == "project"
change_data = avalon_sync.from_dict_to_set(changes, is_project)
mongo_changes_bulk.append(UpdateOne(filter, change_data))
mongo_changes_bulk.append(
UpdateOne({"_id": mongo_id}, change_data)
)

if not mongo_changes_bulk:
return
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import collections
import uuid
from datetime import datetime
from queue import Queue

from bson.objectid import ObjectId
from openpype_modules.ftrack.lib import BaseAction, statics_icon
Expand Down Expand Up @@ -473,12 +472,12 @@ def launch(self, session, entities, event):
continue
ftrack_ids_to_delete.append(ftrack_id)

children_queue = Queue()
children_queue = collections.deque()
for mongo_id in assets_to_delete:
children_queue.put(mongo_id)
children_queue.append(mongo_id)

while not children_queue.empty():
mongo_id = children_queue.get()
while children_queue:
mongo_id = children_queue.popleft()
if mongo_id in asset_ids_to_archive:
continue

Expand All @@ -494,7 +493,7 @@ def launch(self, session, entities, event):
for child in children:
child_id = child["_id"]
if child_id not in asset_ids_to_archive:
children_queue.put(child_id)
children_queue.append(child_id)

# Prepare names of assets in ftrack and ids of subsets in mongo
asset_names_to_delete = []
Expand Down
Loading

0 comments on commit afedaf6

Please sign in to comment.