diff --git a/avalon/api.py b/avalon/api.py index 9a9c33c08..ff2793b0c 100644 --- a/avalon/api.py +++ b/avalon/api.py @@ -10,7 +10,10 @@ """ from . import schema - +from . mongodb import ( + AvalonMongoDB, + session_data_from_environment +) from .pipeline import ( install, uninstall, @@ -71,6 +74,9 @@ __all__ = [ + "AvalonMongoDB", + "session_data_from_environment", + "install", "uninstall", @@ -117,6 +123,7 @@ "deregister_plugin_path", "HOST_WORKFILE_EXTENSIONS", + "should_start_last_workfile", "format_template_with_optional_keys", "last_workfile_with_version", "last_workfile", diff --git a/avalon/io.py b/avalon/io.py index 8a9f77dd8..54da3aba5 100644 --- a/avalon/io.py +++ b/avalon/io.py @@ -2,19 +2,17 @@ import os import sys -import time import errno import shutil import logging import tempfile -import functools import contextlib from . import schema, Session from .vendor import requests +from .api import AvalonMongoDB, session_data_from_environment # Third-party dependencies -import pymongo from bson.objectid import ObjectId, InvalidId __all__ = [ @@ -37,78 +35,39 @@ ] self = sys.modules[__name__] + +self._is_installed = False +self._connection_object = AvalonMongoDB(Session) self._mongo_client = None +self._database = None + self._sentry_client = None self._sentry_logging_handler = None -self._database = None -self._is_installed = False log = logging.getLogger(__name__) PY2 = sys.version_info[0] == 2 -def extract_port_from_url(url): - if PY2: - from urlparse import urlparse - else: - from urllib.parse import urlparse - parsed_url = urlparse(url) - if parsed_url.scheme is None: - _url = "mongodb://{}".format(url) - parsed_url = urlparse(_url) - return parsed_url.port - - def install(): """Establish a persistent connection to the database""" if self._is_installed: return logging.basicConfig() - Session.update(_from_environment()) - - timeout = int(Session["AVALON_TIMEOUT"]) - mongo_url = Session["AVALON_MONGO"] - kwargs = { - "host": mongo_url, - "serverSelectionTimeoutMS": timeout - } - port = extract_port_from_url(mongo_url) - if port is not None: - kwargs["port"] = int(port) + self._connection_object.Session.update(_from_environment()) + self._connection_object.install() - self._mongo_client = pymongo.MongoClient(**kwargs) - - for retry in range(3): - try: - t1 = time.time() - self._mongo_client.server_info() - - except Exception: - log.error("Retrying..") - time.sleep(1) - timeout *= 1.5 - - else: - break - - else: - raise IOError( - "ERROR: Couldn't connect to %s in " - "less than %.3f ms" % (Session["AVALON_MONGO"], timeout)) - - log.info("Connected to %s, delay %.3f s" % ( - Session["AVALON_MONGO"], time.time() - t1)) + self._mongo_client = self._connection_object.mongo_client + self._database = self._connection_object.database _install_sentry() - self._database = self._mongo_client[Session["AVALON_DB"]] self._is_installed = True def _install_sentry(): - if "AVALON_SENTRY" not in Session: + if not Session.get("AVALON_SENTRY"): return try: @@ -133,93 +92,9 @@ def _install_sentry(): def _from_environment(): - session = { - item[0]: os.getenv(item[0], item[1]) - for item in ( - # Root directory of projects on disk - ("AVALON_PROJECTS", None), - - # Name of current Project - ("AVALON_PROJECT", None), - - # Name of current Asset - ("AVALON_ASSET", None), - - # Name of current silo - ("AVALON_SILO", None), - - # Name of current task - ("AVALON_TASK", None), - - # Name of current app - ("AVALON_APP", None), - - # Path to working directory - ("AVALON_WORKDIR", None), - - # Optional path to scenes directory (see Work Files API) - ("AVALON_SCENEDIR", None), - - # Optional hierarchy for the current Asset. This can be referenced - # as `{hierarchy}` in your file templates. - # This will be (re-)computed when you switch the context to another - # asset. It is computed by checking asset['data']['parents'] and - # joining those together with `os.path.sep`. - # E.g.: ['ep101', 'scn0010'] -> 'ep101/scn0010'. - ("AVALON_HIERARCHY", None), - - # Name of current Config - # TODO(marcus): Establish a suitable default config - ("AVALON_CONFIG", "no_config"), - - # Name of Avalon in graphical user interfaces - # Use this to customise the visual appearance of Avalon - # to better integrate with your surrounding pipeline - ("AVALON_LABEL", "Avalon"), - - # Used during any connections to the outside world - ("AVALON_TIMEOUT", "1000"), - - # Address to Asset Database - ("AVALON_MONGO", "mongodb://localhost:27017"), - - # Name of database used in MongoDB - ("AVALON_DB", "avalon"), - - # Address to Sentry - ("AVALON_SENTRY", None), - - # Address to Deadline Web Service - # E.g. http://192.167.0.1:8082 - ("AVALON_DEADLINE", None), - - # Enable features not necessarily stable, at the user's own risk - ("AVALON_EARLY_ADOPTER", None), - - # Address of central asset repository, contains - # the following interface: - # /upload - # /download - # /manager (optional) - ("AVALON_LOCATION", "http://127.0.0.1"), - - # Boolean of whether to upload published material - # to central asset repository - ("AVALON_UPLOAD", None), - - # Generic username and password - ("AVALON_USERNAME", "avalon"), - ("AVALON_PASSWORD", "secret"), - - # Unique identifier for instances in working files - ("AVALON_INSTANCE_ID", "avalon.instance"), - ("AVALON_CONTAINER_ID", "avalon.container"), - - # Enable debugging - ("AVALON_DEBUG", None), - - ) if os.getenv(item[0], item[1]) is not None - } + session = session_data_from_environment( + global_keys=True, context_keys=True + ) session["schema"] = "avalon-core:session-2.0" try: @@ -234,7 +109,7 @@ def _from_environment(): def uninstall(): """Close any connection to the database""" try: - self._mongo_client.close() + self._connection_object.uninstall() except AttributeError: pass @@ -243,38 +118,9 @@ def uninstall(): self._is_installed = False -def requires_install(f): - @functools.wraps(f) - def decorated(*args, **kwargs): - if not self._is_installed: - raise IOError("'io.%s()' requires install()" % f.__name__) - return f(*args, **kwargs) - - return decorated - - -def auto_reconnect(f): - """Handling auto reconnect in 3 retry times""" - retry_times = 3 - - @functools.wraps(f) - def decorated(*args, **kwargs): - for retry in range(1, retry_times + 1): - try: - return f(*args, **kwargs) - except pymongo.errors.AutoReconnect: - log.error("Reconnecting..") - if retry < retry_times: - time.sleep(0.1) - else: - raise - return decorated - - -@requires_install def active_project(): """Return the name of the active project""" - return Session["AVALON_PROJECT"] + return self._connection_object.active_project() def activate_project(project): @@ -282,7 +128,6 @@ def activate_project(project): print("io.activate_project is deprecated") -@requires_install def projects(): """List available projects @@ -290,24 +135,7 @@ def projects(): list of project documents """ - @auto_reconnect - def find_project(project): - return self._database[project].find_one({"type": "project"}) - - @auto_reconnect - def collections(): - return self._database.collection_names() - collection_names = collections() - - for project in collection_names: - if project in ("system.indexes",): - continue - - # Each collection will have exactly one project document - document = find_project(project) - - if document is not None: - yield document + return self._connection_object.projects() def locate(path): @@ -356,87 +184,65 @@ def locate(path): return parent -@auto_reconnect -def insert_one(item): +def insert_one(item, *args, **kwargs): assert isinstance(item, dict), "item must be of type " schema.validate(item) - return self._database[Session["AVALON_PROJECT"]].insert_one(item) + return self._connection_object.insert_one(item, *args, **kwargs) -@auto_reconnect -def insert_many(items, ordered=True): +def insert_many(items, *args, **kwargs): # check if all items are valid assert isinstance(items, list), "`items` must be of type " for item in items: assert isinstance(item, dict), "`item` must be of type " schema.validate(item) - return self._database[Session["AVALON_PROJECT"]].insert_many( - items, - ordered=ordered) + return self._connection_object.insert_many(items, *args, **kwargs) -@auto_reconnect -def find(filter, projection=None, sort=None): - return self._database[Session["AVALON_PROJECT"]].find( - filter=filter, - projection=projection, - sort=sort - ) +def find(*args, **kwargs): + return self._connection_object.find(*args, **kwargs) -@auto_reconnect -def find_one(filter, projection=None, sort=None): +def find_one(filter, *args, **kwargs): assert isinstance(filter, dict), "filter must be " - return self._database[Session["AVALON_PROJECT"]].find_one( - filter=filter, - projection=projection, - sort=sort - ) + return self._connection_object.find_one(filter, *args, **kwargs) -@auto_reconnect def save(*args, **kwargs): """Deprecated, please use `replace_one`""" - return self._database[Session["AVALON_PROJECT"]].save( - *args, **kwargs) + return self._connection_object.save(*args, **kwargs) + + +def replace_one(filter, replacement, *args, **kwargs): + return self._connection_object.replace_one( + filter, replacement, *args, **kwargs + ) -@auto_reconnect -def replace_one(filter, replacement): - return self._database[Session["AVALON_PROJECT"]].replace_one( - filter, replacement) +def update_one(*args, **kwargs): + return self._connection_object.update_one(*args, **kwargs) -@auto_reconnect -def update_many(filter, update): - return self._database[Session["AVALON_PROJECT"]].update_many( - filter, update) +def update_many(filter, update, *args, **kwargs): + return self._connection_object.update_many(filter, update, *args, **kwargs) -@auto_reconnect def distinct(*args, **kwargs): - return self._database[Session["AVALON_PROJECT"]].distinct( - *args, **kwargs) + return self._connection_object.distinct(*args, **kwargs) -@auto_reconnect def aggregate(*args, **kwargs): - return self._database[Session["AVALON_PROJECT"]].aggregate( - *args, **kwargs) + return self._connection_object.aggregate(*args, **kwargs) -@auto_reconnect def drop(*args, **kwargs): - return self._database[Session["AVALON_PROJECT"]].drop( - *args, **kwargs) + return self._connection_object.drop(*args, **kwargs) -@auto_reconnect def delete_many(*args, **kwargs): - return self._database[Session["AVALON_PROJECT"]].delete_many( - *args, **kwargs) + return self._connection_object.delete_many(*args, **kwargs) def parenthood(document): diff --git a/avalon/mongodb.py b/avalon/mongodb.py new file mode 100644 index 000000000..4ada65015 --- /dev/null +++ b/avalon/mongodb.py @@ -0,0 +1,430 @@ +import os +import sys +import time +import functools +import logging +import pymongo +import ctypes +from uuid import uuid4 + +from avalon import schema + + +def extract_port_from_url(url): + if sys.version_info[0] == 2: + from urlparse import urlparse + else: + from urllib.parse import urlparse + parsed_url = urlparse(url) + if parsed_url.scheme is None: + _url = "mongodb://{}".format(url) + parsed_url = urlparse(_url) + return parsed_url.port + + +def requires_install(func, obj=None): + @functools.wraps(func) + def decorated(*args, **kwargs): + if obj is not None: + _obj = obj + else: + _obj = args[0] + + if not _obj.is_installed(): + if _obj.auto_install: + _obj.install() + else: + raise IOError( + "'{}.{}()' requires to run install() first".format( + _obj.__class__.__name__, func.__name__ + ) + ) + return func(*args, **kwargs) + return decorated + + +def auto_reconnect(func, obj=None): + """Handling auto reconnect in 3 retry times""" + retry_times = 3 + reconnect_msg = "Reconnecting..." + + @functools.wraps(func) + def decorated(*args, **kwargs): + if obj is not None: + _obj = obj + else: + _obj = args[0] + for retry in range(1, retry_times + 1): + try: + return func(*args, **kwargs) + except pymongo.errors.AutoReconnect: + if hasattr(_obj, "log"): + _obj.log.warning(reconnect_msg) + else: + print(reconnect_msg) + + if retry >= retry_times: + raise + time.sleep(0.1) + return decorated + + +SESSION_CONTEXT_KEYS = ( + # Root directory of projects on disk + "AVALON_PROJECTS", + # Name of current Project + "AVALON_PROJECT", + # Name of current Asset + "AVALON_ASSET", + # Name of current silo + "AVALON_SILO", + # Name of current task + "AVALON_TASK", + # Name of current app + "AVALON_APP", + # Path to working directory + "AVALON_WORKDIR", + # Optional path to scenes directory (see Work Files API) + "AVALON_SCENEDIR", + # Optional hierarchy for the current Asset. This can be referenced + # as `{hierarchy}` in your file templates. + # This will be (re-)computed when you switch the context to another + # asset. It is computed by checking asset['data']['parents'] and + # joining those together with `os.path.sep`. + # E.g.: ['ep101', 'scn0010'] -> 'ep101/scn0010'. + "AVALON_HIERARCHY" +) + + +def session_data_from_environment(global_keys=True, context_keys=False): + session_data = {} + if context_keys: + for key in SESSION_CONTEXT_KEYS: + value = os.environ.get(key) + session_data[key] = value or "" + + if not global_keys: + return session_data + + for key, default_value in ( + # Name of current Config + # TODO(marcus): Establish a suitable default config + ("AVALON_CONFIG", "no_config"), + + # Name of Avalon in graphical user interfaces + # Use this to customise the visual appearance of Avalon + # to better integrate with your surrounding pipeline + ("AVALON_LABEL", "Avalon"), + + # Used during any connections to the outside world + ("AVALON_TIMEOUT", "1000"), + + # Address to Asset Database + ("AVALON_MONGO", "mongodb://localhost:27017"), + + # Name of database used in MongoDB + ("AVALON_DB", "avalon"), + + # Address to Sentry + ("AVALON_SENTRY", None), + + # Address to Deadline Web Service + # E.g. http://192.167.0.1:8082 + ("AVALON_DEADLINE", None), + + # Enable features not necessarily stable, at the user's own risk + ("AVALON_EARLY_ADOPTER", None), + + # Address of central asset repository, contains + # the following interface: + # /upload + # /download + # /manager (optional) + ("AVALON_LOCATION", "http://127.0.0.1"), + + # Boolean of whether to upload published material + # to central asset repository + ("AVALON_UPLOAD", None), + + # Generic username and password + ("AVALON_USERNAME", "avalon"), + ("AVALON_PASSWORD", "secret"), + + # Unique identifier for instances in working files + ("AVALON_INSTANCE_ID", "avalon.instance"), + ("AVALON_CONTAINER_ID", "avalon.container"), + + # Enable debugging + ("AVALON_DEBUG", None) + ): + value = os.environ.get(key) or default_value + if value is not None: + session_data[key] = value + + return session_data + + +class AvalonMongoConnection: + _mongo_client = None + _is_installed = False + _databases = {} + log = logging.getLogger("AvalonMongoConnection") + + @classmethod + def register_database(cls, dbcon): + if dbcon.id in cls._databases: + return + + cls._databases[dbcon.id] = { + "object": dbcon, + "installed": False + } + + @classmethod + def database(cls): + return cls._mongo_client[os.environ["AVALON_DB"]] + + @classmethod + def mongo_client(cls): + return cls._mongo_client + + @classmethod + def install(cls, dbcon): + if not cls._is_installed or cls._mongo_client is None: + cls._mongo_client = cls.create_connection() + cls._is_installed = True + + cls.register_database(dbcon) + cls._databases[dbcon.id]["installed"] = True + + cls.check_db_existence() + + @classmethod + def is_installed(cls, dbcon): + info = cls._databases.get(dbcon.id) + if not info: + return False + return cls._databases[dbcon.id]["installed"] + + @classmethod + def _uninstall(cls): + try: + cls._mongo_client.close() + except AttributeError: + pass + cls._is_installed = False + cls._mongo_client = None + + @classmethod + def uninstall(cls, dbcon, force=False): + if force: + for key in cls._databases: + cls._databases[key]["object"].uninstall() + cls._uninstall() + return + + cls._databases[dbcon.id]["installed"] = False + + cls.check_db_existence() + + any_is_installed = False + for key in cls._databases: + if cls._databases[key]["installed"]: + any_is_installed = True + break + + if not any_is_installed: + cls._uninstall() + + @classmethod + def check_db_existence(cls): + items_to_pop = set() + for db_id, info in cls._databases.items(): + obj = info["object"] + # TODO check if should check for 1 or more + cls.log.info(ctypes.c_long.from_address(id(obj)).value) + if ctypes.c_long.from_address(id(obj)).value == 1: + items_to_pop.add(db_id) + + for db_id in items_to_pop: + cls._databases.pop(db_id, None) + + @classmethod + def create_connection(cls): + timeout = int(os.environ["AVALON_TIMEOUT"]) + mongo_url = os.environ["AVALON_MONGO"] + kwargs = { + "host": mongo_url, + "serverSelectionTimeoutMS": timeout + } + + port = extract_port_from_url(mongo_url) + if port is not None: + kwargs["port"] = int(port) + + mongo_client = pymongo.MongoClient(**kwargs) + + for _retry in range(3): + try: + t1 = time.time() + mongo_client.server_info() + + except Exception: + cls.log.warning("Retrying...") + time.sleep(1) + timeout *= 1.5 + + else: + break + + else: + raise IOError(( + "ERROR: Couldn't connect to {} in less than {:.3f}ms" + ).format(mongo_url, timeout)) + + cls.log.info("Connected to {}, delay {:.3f}s".format( + mongo_url, time.time() - t1 + )) + return mongo_client + + +class AvalonMongoDB: + def __init__(self, session=None, auto_install=True): + self._id = uuid4() + self._database = None + self.auto_install = auto_install + + if session is None: + session = session_data_from_environment(context_keys=False) + + self.Session = session + + self.log = logging.getLogger(self.__class__.__name__) + + def __getattr__(self, attr_name): + attr = None + if self.is_installed() and self.auto_install: + self.install() + + if self.is_installed(): + attr = getattr( + self._database[self.active_project()], + attr_name, + None + ) + + if attr is None: + # Reraise attribute error + return self.__getattribute__(attr_name) + + # Decorate function + if callable(attr): + attr = auto_reconnect(attr) + return attr + + @property + def mongo_client(self): + AvalonMongoConnection.mongo_client() + + @property + def id(self): + return self._id + + @property + def database(self): + if not self.is_installed() and self.auto_install: + self.install() + + if self.is_installed(): + return self._database + + raise IOError( + "'{}.database' requires to run install() first".format( + self.__class__.__name__ + ) + ) + + def is_installed(self): + return AvalonMongoConnection.is_installed(self) + + def install(self): + """Establish a persistent connection to the database""" + if self.is_installed(): + return + + AvalonMongoConnection.install(self) + + self._database = AvalonMongoConnection.database() + + def uninstall(self): + """Close any connection to the database""" + try: + self._mongo_client.close() + except AttributeError: + pass + + AvalonMongoConnection.uninstall(self) + self._database = None + + @requires_install + def active_project(self): + """Return the name of the active project""" + return self.Session["AVALON_PROJECT"] + + @requires_install + @auto_reconnect + def projects(self): + """List available projects + + Returns: + list of project documents + + """ + for project_name in self._database.collection_names(): + if project_name in ("system.indexes",): + continue + + # Each collection will have exactly one project document + document = self._database[project_name].find_one({ + "type": "project" + }) + if document is not None: + yield document + + @auto_reconnect + def insert_one(self, item, *args, **kwargs): + assert isinstance(item, dict), "item must be of type " + schema.validate(item) + return self._database[self.active_project()].insert_one( + item, *args, **kwargs + ) + + @auto_reconnect + def insert_many(self, items, *args, **kwargs): + # check if all items are valid + assert isinstance(items, list), "`items` must be of type " + for item in items: + assert isinstance(item, dict), "`item` must be of type " + schema.validate(item) + + return self._database[self.active_project()].insert_many( + items, *args, **kwargs + ) + + def parenthood(self, document): + assert document is not None, "This is a bug" + + parents = list() + + while document.get("parent") is not None: + document = self.find_one({"_id": document["parent"]}) + if document is None: + break + + if document.get("type") == "master_version": + _document = self.find_one({"_id": document["version_id"]}) + document["data"] = _document["data"] + + parents.append(document) + + return parents diff --git a/avalon/tools/libraryloader/app.py b/avalon/tools/libraryloader/app.py index 2b6517a12..07c616b6f 100644 --- a/avalon/tools/libraryloader/app.py +++ b/avalon/tools/libraryloader/app.py @@ -1,8 +1,7 @@ -import os import sys import time -from .io_nonsingleton import DbConnector +from ...api import AvalonMongoDB from ...vendor.Qt import QtWidgets, QtCore from ... import style from .. import lib as tools_lib @@ -50,7 +49,7 @@ def __init__( container = QtWidgets.QWidget() - self.dbcon = DbConnector() + self.dbcon = AvalonMongoDB() self.dbcon.install() self.dbcon.activate_project(None) diff --git a/avalon/tools/libraryloader/io_nonsingleton.py b/avalon/tools/libraryloader/io_nonsingleton.py deleted file mode 100644 index f6d2c73ce..000000000 --- a/avalon/tools/libraryloader/io_nonsingleton.py +++ /dev/null @@ -1,452 +0,0 @@ -""" -Wrapper around interactions with the database - -Copy of io module in avalon-core. - - In this case not working as singleton with api.Session! -""" - -import os -import time -import errno -import shutil -import logging -import tempfile -import functools -import contextlib - -from ... import schema -from ...vendor import requests -from avalon.io import extract_port_from_url - -# Third-party dependencies -import pymongo - - -def auto_reconnect(func): - """Handling auto reconnect in 3 retry times""" - @functools.wraps(func) - def decorated(*args, **kwargs): - object = args[0] - for retry in range(3): - try: - return func(*args, **kwargs) - except pymongo.errors.AutoReconnect: - object.log.error("Reconnecting..") - time.sleep(0.1) - else: - raise - - return decorated - - -class DbConnector(object): - - log = logging.getLogger(__name__) - - def __init__(self): - self.Session = {} - self._mongo_client = None - self._sentry_client = None - self._sentry_logging_handler = None - self._database = None - self._is_installed = False - - def install(self): - """Establish a persistent connection to the database""" - if self._is_installed: - return - - logging.basicConfig() - self.Session.update(self._from_environment()) - - timeout = int(self.Session["AVALON_TIMEOUT"]) - mongo_url = self.Session["AVALON_MONGO"] - kwargs = { - "host": mongo_url, - "serverSelectionTimeoutMS": timeout - } - - port = extract_port_from_url(mongo_url) - if port is not None: - kwargs["port"] = int(port) - - self._mongo_client = pymongo.MongoClient(**kwargs) - - for retry in range(3): - try: - t1 = time.time() - self._mongo_client.server_info() - - except Exception: - self.log.error("Retrying..") - time.sleep(1) - timeout *= 1.5 - - else: - break - - else: - raise IOError( - "ERROR: Couldn't connect to %s in " - "less than %.3f ms" % (self.Session["AVALON_MONGO"], timeout)) - - self.log.info("Connected to %s, delay %.3f s" % ( - self.Session["AVALON_MONGO"], time.time() - t1)) - - self._install_sentry() - - self._database = self._mongo_client[self.Session["AVALON_DB"]] - self._is_installed = True - - def _install_sentry(self): - if "AVALON_SENTRY" not in self.Session: - return - - try: - from raven import Client - from raven.handlers.logging import SentryHandler - from raven.conf import setup_logging - except ImportError: - # Note: There was a Sentry address in this Session - return self.log.warning("Sentry disabled, raven not installed") - - client = Client(self.Session["AVALON_SENTRY"]) - - # Transmit log messages to Sentry - handler = SentryHandler(client) - handler.setLevel(logging.WARNING) - - setup_logging(handler) - - self._sentry_client = client - self._sentry_logging_handler = handler - self.log.info( - "Connected to Sentry @ %s" % self.Session["AVALON_SENTRY"] - ) - - def _from_environment(self): - Session = { - item[0]: os.getenv(item[0], item[1]) - for item in ( - # Root directory of projects on disk - ("AVALON_PROJECTS", None), - - # Name of current Project - ("AVALON_PROJECT", ""), - - # Name of current Asset - ("AVALON_ASSET", ""), - - # Name of current silo - ("AVALON_SILO", ""), - - # Name of current task - ("AVALON_TASK", None), - - # Name of current app - ("AVALON_APP", None), - - # Path to working directory - ("AVALON_WORKDIR", None), - - # Name of current Config - # TODO(marcus): Establish a suitable default config - ("AVALON_CONFIG", "no_config"), - - # Name of Avalon in graphical user interfaces - # Use this to customise the visual appearance of Avalon - # to better integrate with your surrounding pipeline - ("AVALON_LABEL", "Avalon"), - - # Used during any connections to the outside world - ("AVALON_TIMEOUT", "1000"), - - # Address to Asset Database - ("AVALON_MONGO", "mongodb://localhost:27017"), - - # Name of database used in MongoDB - ("AVALON_DB", "avalon"), - - # Address to Sentry - ("AVALON_SENTRY", None), - - # Address to Deadline Web Service - # E.g. http://192.167.0.1:8082 - ("AVALON_DEADLINE", None), - - # Enable features not necessarily stable. The user's own risk - ("AVALON_EARLY_ADOPTER", None), - - # Address of central asset repository, contains - # the following interface: - # /upload - # /download - # /manager (optional) - ("AVALON_LOCATION", "http://127.0.0.1"), - - # Boolean of whether to upload published material - # to central asset repository - ("AVALON_UPLOAD", None), - - # Generic username and password - ("AVALON_USERNAME", "avalon"), - ("AVALON_PASSWORD", "secret"), - - # Unique identifier for instances in working files - ("AVALON_INSTANCE_ID", "avalon.instance"), - ("AVALON_CONTAINER_ID", "avalon.container"), - - # Enable debugging - ("AVALON_DEBUG", None), - - ) if os.getenv(item[0], item[1]) is not None - } - - Session["schema"] = "avalon-core:session-2.0" - try: - schema.validate(Session) - except schema.ValidationError as e: - # TODO(marcus): Make this mandatory - self.log.warning(e) - - return Session - - def uninstall(self): - """Close any connection to the database""" - try: - self._mongo_client.close() - except AttributeError: - pass - - self._mongo_client = None - self._database = None - self._is_installed = False - - def active_project(self): - """Return the name of the active project""" - return self.Session["AVALON_PROJECT"] - - def activate_project(self, project_name): - self.Session["AVALON_PROJECT"] = project_name - - def projects(self): - """List available projects - - Returns: - list of project documents - - """ - - collection_names = self.collections() - for project in collection_names: - if project in ("system.indexes",): - continue - - # Each collection will have exactly one project document - document = self.find_project(project) - - if document is not None: - yield document - - def locate(self, path): - """Traverse a hierarchy from top-to-bottom - - Example: - representation = locate(["hulk", "Bruce", "modelDefault", 1, "ma"]) - - Returns: - representation (ObjectId) - - """ - - components = zip( - ("project", "asset", "subset", "version", "representation"), - path - ) - - parent = None - for type_, name in components: - latest = (type_ == "version") and name in (None, -1) - - try: - if latest: - parent = self.find_one( - filter={ - "type": type_, - "parent": parent - }, - projection={"_id": 1}, - sort=[("name", -1)] - )["_id"] - else: - parent = self.find_one( - filter={ - "type": type_, - "name": name, - "parent": parent - }, - projection={"_id": 1}, - )["_id"] - - except TypeError: - return None - - return parent - - @auto_reconnect - def collections(self): - return self._database.collection_names() - - @auto_reconnect - def find_project(self, project): - return self._database[project].find_one({"type": "project"}) - - @auto_reconnect - def insert_one(self, item): - assert isinstance(item, dict), "item must be of type " - schema.validate(item) - return self._database[self.Session["AVALON_PROJECT"]].insert_one(item) - - @auto_reconnect - def insert_many(self, items, ordered=True): - # check if all items are valid - assert isinstance(items, list), "`items` must be of type " - for item in items: - assert isinstance(item, dict), "`item` must be of type " - schema.validate(item) - - return self._database[self.Session["AVALON_PROJECT"]].insert_many( - items, - ordered=ordered) - - @auto_reconnect - def find(self, filter, projection=None, sort=None): - return self._database[self.Session["AVALON_PROJECT"]].find( - filter=filter, - projection=projection, - sort=sort - ) - - @auto_reconnect - def find_one(self, filter, projection=None, sort=None): - assert isinstance(filter, dict), "filter must be " - - return self._database[self.Session["AVALON_PROJECT"]].find_one( - filter=filter, - projection=projection, - sort=sort - ) - - @auto_reconnect - def save(self, *args, **kwargs): - return self._database[self.Session["AVALON_PROJECT"]].save( - *args, **kwargs) - - @auto_reconnect - def replace_one(self, filter, replacement): - return self._database[self.Session["AVALON_PROJECT"]].replace_one( - filter, replacement) - - @auto_reconnect - def update_many(self, filter, update): - return self._database[self.Session["AVALON_PROJECT"]].update_many( - filter, update) - - @auto_reconnect - def distinct(self, *args, **kwargs): - return self._database[self.Session["AVALON_PROJECT"]].distinct( - *args, **kwargs) - - @auto_reconnect - def aggregate(self, *args, **kwargs): - return self._database[self.Session["AVALON_PROJECT"]].aggregate( - *args, **kwargs) - - @auto_reconnect - def drop(self, *args, **kwargs): - return self._database[self.Session["AVALON_PROJECT"]].drop( - *args, **kwargs) - - @auto_reconnect - def delete_many(self, *args, **kwargs): - return self._database[self.Session["AVALON_PROJECT"]].delete_many( - *args, **kwargs) - - def parenthood(self, document): - assert document is not None, "This is a bug" - - parents = list() - - while document.get("parent") is not None: - document = self.find_one({"_id": document["parent"]}) - - if document is None: - break - - if document.get("type") == "master_version": - _document = self.find_one({"_id": document["version_id"]}) - document["data"] = _document["data"] - - parents.append(document) - - return parents - - @contextlib.contextmanager - def tempdir(self): - tempdir = tempfile.mkdtemp() - try: - yield tempdir - finally: - shutil.rmtree(tempdir) - - def download(self, src, dst): - """Download `src` to `dst` - - Arguments: - src (str): URL to source file - dst (str): Absolute path to destination file - - Yields tuple (progress, error): - progress (int): Between 0-100 - error (Exception): Any exception raised when first making connection - - """ - - try: - response = requests.get( - src, - stream=True, - auth=requests.auth.HTTPBasicAuth( - self.Session["AVALON_USERNAME"], - self.Session["AVALON_PASSWORD"] - ) - ) - except requests.ConnectionError as e: - yield None, e - return - - with self.tempdir() as dirname: - tmp = os.path.join(dirname, os.path.basename(src)) - - with open(tmp, "wb") as f: - total_length = response.headers.get("content-length") - - if total_length is None: # no content length header - f.write(response.content) - else: - downloaded = 0 - total_length = int(total_length) - for data in response.iter_content(chunk_size=4096): - downloaded += len(data) - f.write(data) - - yield int(100.0 * downloaded / total_length), None - - try: - os.makedirs(os.path.dirname(dst)) - except OSError as e: - # An already existing destination directory is fine. - if e.errno != errno.EEXIST: - raise - - shutil.copy(tmp, dst)