Skip to content
This repository has been archived by the owner on Sep 20, 2024. It is now read-only.

Add alternative sites for Site Sync #2206

Merged
merged 32 commits into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
96c2cc0
OP-1920 - create command for background running of Site Sync server
kalisp Oct 25, 2021
19b5d47
OP-1920 - skip upload/download for same files
kalisp Oct 25, 2021
421ac77
OP-1920 - override get_local_site_id from env var
kalisp Oct 25, 2021
02b78dc
OP-1920 - added syncserver command
kalisp Oct 25, 2021
a96b0b8
OP-1920 - fixes of names, tray not triggering
kalisp Oct 25, 2021
faf987f
Merge branch 'develop' of github.com:pypeclub/OpenPype into feature/O…
kalisp Oct 29, 2021
be36900
OP-1920 - added always_accessible_on to Settings
kalisp Oct 29, 2021
c69c153
OP-1920 - implemented always_accessible_on
kalisp Oct 29, 2021
ba87f9b
OP-1920 - added documentation
kalisp Oct 29, 2021
6f3944d
Hound
kalisp Oct 29, 2021
de8cfef
OP-1920 - renamed reset site method
kalisp Oct 29, 2021
1772e7b
OP-1905 - implemented exit on key interrupt
kalisp Oct 29, 2021
a283413
OP-1937 - merge feature/OP-1920_Add-command-line-running-of-Site-Sync…
kalisp Nov 3, 2021
8a37b90
OP-1937 - added alternative_sites to System Setting for a site
kalisp Nov 3, 2021
26b9bba
OP-1937 - added alternative_sites integrate_new
kalisp Nov 4, 2021
691faaf
OP-1937 - added alternative_sites to upload/download
kalisp Nov 4, 2021
f19b391
OP-1937 - fixes to stabilize sftp provider if wrong settings
kalisp Nov 4, 2021
3a76b4d
Merge remote-tracking branch 'origin/develop' into feature/OP-1937_Ad…
kalisp Nov 4, 2021
ab36fdf
OP-1937 - added documentation
kalisp Nov 4, 2021
db02c03
OP-1937 - fix broken import in Python2
kalisp Nov 4, 2021
56c716c
Merge remote-tracking branch 'origin/bugfix/workfile_use_avalon_widge…
kalisp Nov 4, 2021
1e1b9b0
OP-1937 - added file id to DB
kalisp Nov 4, 2021
8ba0e60
OP-1937 - fix - integrate_new
kalisp Nov 4, 2021
828c601
OP-1937 - fix - failure when Site Sync is not enabled
kalisp Nov 4, 2021
199717a
OP-1937 - fix - wrong icon used
kalisp Nov 4, 2021
8d00883
Hound
kalisp Nov 4, 2021
f3309cb
Merge remote-tracking branch 'origin/develop' into feature/OP-1937_Ad…
kalisp Nov 4, 2021
24c1aed
OP-1920 - renamed env var SITE_SYNC_LOCAL_ID to OPENPYPE_LOCAL_ID
kalisp Nov 4, 2021
c00c4db
Merge branch 'feature/OP-1920_Add-command-line-running-of-Site-Sync-s…
kalisp Nov 4, 2021
d2d486d
Merged develop
kalisp Nov 4, 2021
efc75d7
Merged develop
kalisp Nov 4, 2021
87191d8
OP-1937 - fix alternate sites for default studio site
kalisp Nov 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions openpype/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,28 @@ def run(script):
def runtests(folder, mark, pyargs):
"""Run all automatic tests after proper initialization via start.py"""
PypeCommands().run_tests(folder, mark, pyargs)


@main.command()
@click.option("-d", "--debug",
is_flag=True, help=("Run process in debug mode"))
@click.option("-a", "--active_site", required=True,
help="Name of active stie")
def syncserver(debug, active_site):
"""Run sync site server in background.

Some Site Sync use cases need to expose site to another one.
For example if majority of artists work in studio, they are not using
SS at all, but if you want to expose published assets to 'studio' site
to SFTP for only a couple of artists, some background process must
mark published assets to live on multiple sites (they might be
physically in same location - mounted shared disk).

Process mimics OP Tray with specific 'active_site' name, all
configuration for this "dummy" user comes from Setting or Local
Settings (configured by starting OP Tray with env
var OPENPYPE_LOCAL_ID set to 'active_site'.
"""
if debug:
os.environ['OPENPYPE_DEBUG'] = '3'
PypeCommands().syncserver(active_site)
5 changes: 5 additions & 0 deletions openpype/lib/local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,11 @@ def get_local_site_id():

Identifier is created if does not exists yet.
"""
# override local id from environment
# used for background syncing
if os.environ.get("OPENPYPE_LOCAL_ID"):
return os.environ["OPENPYPE_LOCAL_ID"]

registry = OpenPypeSettingsRegistry()
try:
return registry.get_item("localId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def upload_file(self, source_path, target_path,
if not os.path.isfile(source_path):
raise FileNotFoundError("Source file {} doesn't exist."
.format(source_path))

if overwrite:
thread = threading.Thread(target=self._copy,
args=(source_path, target_path))
Expand Down Expand Up @@ -176,7 +177,10 @@ def get_configurable_items_for_site(self):

def _copy(self, source_path, target_path):
print("copying {}->{}".format(source_path, target_path))
shutil.copy(source_path, target_path)
try:
shutil.copy(source_path, target_path)
except shutil.SameFileError:
print("same files, skipping")

def _mark_progress(self, collection, file, representation, server, site,
source_path, target_path, direction):
Expand Down
16 changes: 9 additions & 7 deletions openpype/modules/default_modules/sync_server/providers/sftp.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import os
import os.path
import time
import sys
import six
import threading
import platform

Expand All @@ -14,6 +12,7 @@
pysftp = None
try:
import pysftp
import paramiko
except (ImportError, SyntaxError):
pass

Expand All @@ -37,7 +36,6 @@ class SFTPHandler(AbstractProvider):

def __init__(self, project_name, site_name, tree=None, presets=None):
self.presets = None
self.active = False
self.project_name = project_name
self.site_name = site_name
self.root = None
Expand All @@ -64,7 +62,6 @@ def __init__(self, project_name, site_name, tree=None, presets=None):
self.sftp_key_pass = provider_presets["sftp_key_pass"]

self._tree = None
self.active = True

@property
def conn(self):
Expand All @@ -80,7 +77,9 @@ def is_active(self):
Returns:
(boolean)
"""
return self.conn is not None
return self.presets.get(self.CODE) and \
self.presets[self.CODE].get("sftp_host") and \
self.conn is not None

@classmethod
def get_system_settings_schema(cls):
Expand Down Expand Up @@ -108,7 +107,7 @@ def get_project_settings_schema(cls):
editable = [
# credentials could be overriden on Project or User level
{
'key': "sftp_server",
'key': "sftp_host",
'label': "SFTP host name",
'type': 'text'
},
Expand Down Expand Up @@ -421,7 +420,10 @@ def _get_conn(self):
if self.sftp_key_pass:
conn_params['private_key_pass'] = self.sftp_key_pass

return pysftp.Connection(**conn_params)
try:
return pysftp.Connection(**conn_params)
except paramiko.ssh_exception.SSHException:
log.warning("Couldn't connect", exc_info=True)

def _mark_progress(self, collection, file, representation, server, site,
source_path, target_path, direction):
Expand Down
9 changes: 9 additions & 0 deletions openpype/modules/default_modules/sync_server/sync_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ async def upload(module, collection, file, representation, provider_name,
remote_site_name,
True
)

module.handle_alternate_site(collection, representation, remote_site_name,
file["_id"], file_id)

return file_id


Expand Down Expand Up @@ -131,6 +135,10 @@ async def download(module, collection, file, representation, provider_name,
local_site,
True
)

module.handle_alternate_site(collection, representation, remote_site_name,
file["_id"], file_id)

return file_id


Expand Down Expand Up @@ -246,6 +254,7 @@ def run(self):

asyncio.ensure_future(self.check_shutdown(), loop=self.loop)
asyncio.ensure_future(self.sync_loop(), loop=self.loop)
log.info("Sync Server Started")
self.loop.run_forever()
except Exception:
log.warning(
Expand Down
123 changes: 99 additions & 24 deletions openpype/modules/default_modules/sync_server/sync_server_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def initialize(self, module_settings):

# some parts of code need to run sequentially, not in async
self.lock = None
self._sync_system_settings = None
# settings for all enabled projects for sync
self._sync_project_settings = None
self.sync_server_thread = None # asyncio requires new thread
Expand Down Expand Up @@ -152,9 +153,9 @@ def add_site(self, collection, representation_id, site_name=None,
if not site_name:
site_name = self.DEFAULT_SITE

self.reset_provider_for_file(collection,
representation_id,
site_name=site_name, force=force)
self.reset_site_on_representation(collection,
representation_id,
site_name=site_name, force=force)

# public facing API
def remove_site(self, collection, representation_id, site_name,
Expand All @@ -176,10 +177,10 @@ def remove_site(self, collection, representation_id, site_name,
if not self.get_sync_project_setting(collection):
raise ValueError("Project not configured")

self.reset_provider_for_file(collection,
representation_id,
site_name=site_name,
remove=True)
self.reset_site_on_representation(collection,
representation_id,
site_name=site_name,
remove=True)
if remove_local_files:
self._remove_local_file(collection, representation_id, site_name)

Expand Down Expand Up @@ -314,8 +315,8 @@ def pause_representation(self, collection, representation_id, site_name):
"""
log.info("Pausing SyncServer for {}".format(representation_id))
self._paused_representations.add(representation_id)
self.reset_provider_for_file(collection, representation_id,
site_name=site_name, pause=True)
self.reset_site_on_representation(collection, representation_id,
site_name=site_name, pause=True)

def unpause_representation(self, collection, representation_id, site_name):
"""
Expand All @@ -334,8 +335,8 @@ def unpause_representation(self, collection, representation_id, site_name):
except KeyError:
pass
# self.paused_representations is not persistent
self.reset_provider_for_file(collection, representation_id,
site_name=site_name, pause=False)
self.reset_site_on_representation(collection, representation_id,
site_name=site_name, pause=False)

def is_representation_paused(self, representation_id,
check_parents=False, project_name=None):
Expand Down Expand Up @@ -769,6 +770,57 @@ def get_enabled_projects(self):
enabled_projects.append(project_name)

return enabled_projects

def handle_alternate_site(self, collection, representation, processed_site,
file_id, synced_file_id):
"""
For special use cases where one site vendors another.

Current use case is sftp site vendoring (exposing) same data as
regular site (studio). Each site is accessible for different
audience. 'studio' for artists in a studio, 'sftp' for externals.

Change of file status on one site actually means same change on
'alternate' site. (eg. artists publish to 'studio', 'sftp' is using
same location >> file is accesible on 'sftp' site right away.

Args:
collection (str): name of project
representation (dict)
processed_site (str): real site_name of published/uploaded file
file_id (ObjectId): DB id of file handled
synced_file_id (str): id of the created file returned
by provider
"""
sites = self.sync_system_settings.get("sites", {})
sites[self.DEFAULT_SITE] = {"provider": "local_drive",
"alternative_sites": []}

alternate_sites = []
for site_name, site_info in sites.items():
conf_alternative_sites = site_info.get("alternative_sites", [])
if processed_site in conf_alternative_sites:
alternate_sites.append(site_name)
continue
if processed_site == site_name and conf_alternative_sites:
alternate_sites.extend(conf_alternative_sites)
continue

alternate_sites = set(alternate_sites)

for alt_site in alternate_sites:
query = {
"_id": representation["_id"]
}
elem = {"name": alt_site,
"created_dt": datetime.now(),
"id": synced_file_id}

self.log.debug("Adding alternate {} to {}".format(
alt_site, representation["_id"]))
self._add_site(collection, query,
[representation], elem,
site_name, file_id=file_id, force=True)
""" End of Public API """

def get_local_file_path(self, collection, site_name, file_path):
Expand Down Expand Up @@ -799,12 +851,19 @@ def _get_enabled_sites_from_settings(self, sync_settings):

def tray_init(self):
"""
Actual initialization of Sync Server.
Actual initialization of Sync Server for Tray.

Called when tray is initialized, it checks if module should be
enabled. If not, no initialization necessary.
"""
# import only in tray, because of Python2 hosts
self.server_init()

from .tray.app import SyncServerWindow
self.widget = SyncServerWindow(self)

def server_init(self):
"""Actual initialization of Sync Server."""
# import only in tray or Python3, because of Python2 hosts
from .sync_server import SyncServerThread

if not self.enabled:
Expand All @@ -816,6 +875,7 @@ def tray_init(self):
return

self.lock = threading.Lock()

self.sync_server_thread = SyncServerThread(self)

def tray_start(self):
Expand All @@ -829,6 +889,9 @@ def tray_start(self):
Returns:
None
"""
self.server_start()

def server_start(self):
if self.sync_project_settings and self.enabled:
self.sync_server_thread.start()
else:
Expand All @@ -841,6 +904,9 @@ def tray_exit(self):

Called from Module Manager
"""
self.server_exit()

def server_exit(self):
if not self.sync_server_thread:
return

Expand All @@ -850,6 +916,7 @@ def tray_exit(self):
log.info("Stopping sync server server")
self.sync_server_thread.is_running = False
self.sync_server_thread.stop()
log.info("Sync server stopped")
except Exception:
log.warning(
"Error has happened during Killing sync server",
Expand Down Expand Up @@ -892,6 +959,14 @@ def connection(self):

return self._connection

@property
def sync_system_settings(self):
if self._sync_system_settings is None:
self._sync_system_settings = get_system_settings()["modules"].\
get("sync_server")

return self._sync_system_settings

@property
def sync_project_settings(self):
if self._sync_project_settings is None:
Expand Down Expand Up @@ -977,9 +1052,7 @@ def get_all_site_configs(self, project_name=None):
(dict): {'studio': {'provider':'local_drive'...},
'MY_LOCAL': {'provider':....}}
"""
sys_sett = get_system_settings()
sync_sett = sys_sett["modules"].get("sync_server")

sync_sett = self.sync_system_settings
project_enabled = True
if project_name:
project_enabled = project_name in self.get_enabled_projects()
Expand Down Expand Up @@ -1037,10 +1110,9 @@ def get_provider_for_site(self, project_name=None, site=None):
if provider:
return provider

sys_sett = get_system_settings()
sync_sett = sys_sett["modules"].get("sync_server")
for site, detail in sync_sett.get("sites", {}).items():
sites[site] = detail.get("provider")
sync_sett = self.sync_system_settings
for conf_site, detail in sync_sett.get("sites", {}).items():
sites[conf_site] = detail.get("provider")

return sites.get(site, 'N/A')

Expand Down Expand Up @@ -1319,9 +1391,9 @@ def _get_site_rec(self, sites, site_name):

return -1, None

def reset_provider_for_file(self, collection, representation_id,
side=None, file_id=None, site_name=None,
remove=False, pause=None, force=False):
def reset_site_on_representation(self, collection, representation_id,
side=None, file_id=None, site_name=None,
remove=False, pause=None, force=False):
"""
Reset information about synchronization for particular 'file_id'
and provider.
Expand Down Expand Up @@ -1407,9 +1479,12 @@ def _reset_site_for_file(self, collection, query,
update = {
"$set": {"files.$[f].sites.$[s]": elem}
}
if not isinstance(file_id, ObjectId):
file_id = ObjectId(file_id)

arr_filter = [
{'s.name': site_name},
{'f._id': ObjectId(file_id)}
{'f._id': file_id}
]

self._update_site(collection, query, update, arr_filter)
Expand Down
Loading