This repository has been archived by the owner on Sep 20, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 129
/
collect_anatomy_instance_data.py
308 lines (253 loc) · 10.4 KB
/
collect_anatomy_instance_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
"""
Requires:
context -> anatomyData
context -> projectEntity
context -> assetEntity
instance -> asset
instance -> subset
instance -> family
Optional:
instance -> version
instance -> resolutionWidth
instance -> resolutionHeight
instance -> fps
Provides:
instance -> projectEntity
instance -> assetEntity
instance -> anatomyData
instance -> version
instance -> latestVersion
"""
import copy
import json
import collections
from avalon import io
import pyblish.api
class CollectAnatomyInstanceData(pyblish.api.ContextPlugin):
"""Collect Instance specific Anatomy data.
Plugin is running for all instances on context even not active instances.
"""
order = pyblish.api.CollectorOrder + 0.49
label = "Collect Anatomy Instance data"
follow_workfile_version = False
def process(self, context):
self.log.info("Collecting anatomy data for all instances.")
self.fill_missing_asset_docs(context)
self.fill_latest_versions(context)
self.fill_anatomy_data(context)
self.log.info("Anatomy Data collection finished.")
def fill_missing_asset_docs(self, context):
self.log.debug("Qeurying asset documents for instances.")
context_asset_doc = context.data.get("assetEntity")
instances_with_missing_asset_doc = collections.defaultdict(list)
for instance in context:
instance_asset_doc = instance.data.get("assetEntity")
_asset_name = instance.data["asset"]
# There is possibility that assetEntity on instance is already set
# which can happen in standalone publisher
if (
instance_asset_doc
and instance_asset_doc["name"] == _asset_name
):
continue
# Check if asset name is the same as what is in context
# - they may be different, e.g. in NukeStudio
if context_asset_doc and context_asset_doc["name"] == _asset_name:
instance.data["assetEntity"] = context_asset_doc
else:
instances_with_missing_asset_doc[_asset_name].append(instance)
if not instances_with_missing_asset_doc:
self.log.debug("All instances already had right asset document.")
return
asset_names = list(instances_with_missing_asset_doc.keys())
self.log.debug("Querying asset documents with names: {}".format(
", ".join(["\"{}\"".format(name) for name in asset_names])
))
asset_docs = io.find({
"type": "asset",
"name": {"$in": asset_names}
})
asset_docs_by_name = {
asset_doc["name"]: asset_doc
for asset_doc in asset_docs
}
not_found_asset_names = []
for asset_name, instances in instances_with_missing_asset_doc.items():
asset_doc = asset_docs_by_name.get(asset_name)
if not asset_doc:
not_found_asset_names.append(asset_name)
continue
for _instance in instances:
_instance.data["assetEntity"] = asset_doc
if not_found_asset_names:
joined_asset_names = ", ".join(
["\"{}\"".format(name) for name in not_found_asset_names]
)
self.log.warning((
"Not found asset documents with names \"{}\"."
).format(joined_asset_names))
def fill_latest_versions(self, context):
"""Try to find latest version for each instance's subset.
Key "latestVersion" is always set to latest version or `None`.
Args:
context (pyblish.Context)
Returns:
None
"""
self.log.debug("Qeurying latest versions for instances.")
hierarchy = {}
subset_filters = []
for instance in context:
# Make sure `"latestVersion"` key is set
latest_version = instance.data.get("latestVersion")
instance.data["latestVersion"] = latest_version
# Skip instances withou "assetEntity"
asset_doc = instance.data.get("assetEntity")
if not asset_doc:
continue
# Store asset ids and subset names for queries
asset_id = asset_doc["_id"]
subset_name = instance.data["subset"]
# Prepare instance hiearchy for faster filling latest versions
if asset_id not in hierarchy:
hierarchy[asset_id] = {}
if subset_name not in hierarchy[asset_id]:
hierarchy[asset_id][subset_name] = []
hierarchy[asset_id][subset_name].append(instance)
subset_filters.append({
"parent": asset_id,
"name": subset_name
})
subset_docs = []
if subset_filters:
subset_docs = list(io.find({
"type": "subset",
"$or": subset_filters
}))
subset_ids = [
subset_doc["_id"]
for subset_doc in subset_docs
]
last_version_by_subset_id = self._query_last_versions(subset_ids)
for subset_doc in subset_docs:
subset_id = subset_doc["_id"]
last_version = last_version_by_subset_id.get(subset_id)
if last_version is None:
continue
asset_id = subset_doc["parent"]
subset_name = subset_doc["name"]
_instances = hierarchy[asset_id][subset_name]
for _instance in _instances:
_instance.data["latestVersion"] = last_version
def _query_last_versions(self, subset_ids):
"""Retrieve all latest versions for entered subset_ids.
Args:
subset_ids (list): List of subset ids with type `ObjectId`.
Returns:
dict: Key is subset id and value is last version name.
"""
_pipeline = [
# Find all versions of those subsets
{"$match": {
"type": "version",
"parent": {"$in": subset_ids}
}},
# Sorting versions all together
{"$sort": {"name": 1}},
# Group them by "parent", but only take the last
{"$group": {
"_id": "$parent",
"_version_id": {"$last": "$_id"},
"name": {"$last": "$name"}
}}
]
last_version_by_subset_id = {}
for doc in io.aggregate(_pipeline):
subset_id = doc["_id"]
last_version_by_subset_id[subset_id] = doc["name"]
return last_version_by_subset_id
def fill_anatomy_data(self, context):
self.log.debug("Storing anatomy data to instance data.")
project_doc = context.data["projectEntity"]
context_asset_doc = context.data.get("assetEntity")
project_task_types = project_doc["config"]["tasks"]
for instance in context:
if self.follow_workfile_version:
version_number = context.data('version')
else:
version_number = instance.data.get("version")
# If version is not specified for instance or context
if version_number is None:
# TODO we should be able to change default version by studio
# preferences (like start with version number `0`)
version_number = 1
# use latest version (+1) if already any exist
latest_version = instance.data["latestVersion"]
if latest_version is not None:
version_number += int(latest_version)
anatomy_updates = {
"asset": instance.data["asset"],
"family": instance.data["family"],
"subset": instance.data["subset"],
"version": version_number
}
# Hiearchy
asset_doc = instance.data.get("assetEntity")
if (
asset_doc
and (
not context_asset_doc
or asset_doc["_id"] != context_asset_doc["_id"]
)
):
parents = asset_doc["data"].get("parents") or list()
parent_name = project_doc["name"]
if parents:
parent_name = parents[-1]
anatomy_updates["hierarchy"] = "/".join(parents)
anatomy_updates["parent"] = parent_name
# Task
task_name = instance.data.get("task")
if task_name:
asset_tasks = asset_doc["data"]["tasks"]
task_type = asset_tasks.get(task_name, {}).get("type")
task_code = (
project_task_types
.get(task_type, {})
.get("short_name")
)
anatomy_updates["task"] = {
"name": task_name,
"type": task_type,
"short": task_code
}
# Additional data
resolution_width = instance.data.get("resolutionWidth")
if resolution_width:
anatomy_updates["resolution_width"] = resolution_width
resolution_height = instance.data.get("resolutionHeight")
if resolution_height:
anatomy_updates["resolution_height"] = resolution_height
pixel_aspect = instance.data.get("pixelAspect")
if pixel_aspect:
anatomy_updates["pixel_aspect"] = float(
"{:0.2f}".format(float(pixel_aspect))
)
fps = instance.data.get("fps")
if fps:
anatomy_updates["fps"] = float("{:0.2f}".format(float(fps)))
anatomy_data = copy.deepcopy(context.data["anatomyData"])
anatomy_data.update(anatomy_updates)
# Store anatomy data
instance.data["projectEntity"] = project_doc
instance.data["anatomyData"] = anatomy_data
instance.data["version"] = version_number
# Log collected data
instance_name = instance.data["name"]
instance_label = instance.data.get("label")
if instance_label:
instance_name += "({})".format(instance_label)
self.log.debug("Anatomy data for instance {}: {}".format(
instance_name,
json.dumps(anatomy_data, indent=4)
))