Skip to content
This repository has been archived by the owner on Sep 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1430 from pypeclub/bugfix/sync_server_tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
mkolar authored Apr 29, 2021
2 parents cbf5183 + 1ceed81 commit 1fa3863
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 21 deletions.
11 changes: 9 additions & 2 deletions openpype/modules/sync_server/providers/gdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
from openpype.api import Logger
from openpype.api import get_system_settings
from ..utils import time_function
from ..utils import time_function, ResumableError
import time


Expand Down Expand Up @@ -63,7 +63,14 @@ def __init__(self, project_name, site_name, tree=None, presets=None):
return

self.service = self._get_gd_service()
self.root = self._prepare_root_info()
try:
self.root = self._prepare_root_info()
except errors.HttpError:
log.warning("HttpError in sync loop, "
"trying next loop",
exc_info=True)
raise ResumableError

self._tree = tree
self.active = True

Expand Down
2 changes: 1 addition & 1 deletion openpype/modules/sync_server/providers/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,4 @@ class and batch limit.
# 7 denotes number of files that could be synced in single loop - learned by
# trial and error
factory.register_provider('gdrive', GDriveHandler, 7)
factory.register_provider('local_drive', LocalDriveHandler, 10)
factory.register_provider('local_drive', LocalDriveHandler, 50)
50 changes: 36 additions & 14 deletions openpype/modules/sync_server/sync_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .providers import lib
from openpype.lib import PypeLogger

from .utils import SyncStatus
from .utils import SyncStatus, ResumableError


log = PypeLogger().get_logger("SyncServer")
Expand Down Expand Up @@ -232,6 +232,7 @@ def __init__(self, module):
self.loop = None
self.is_running = False
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
self.timer = None

def run(self):
self.is_running = True
Expand Down Expand Up @@ -266,8 +267,8 @@ async def sync_loop(self):
Returns:
"""
try:
while self.is_running and not self.module.is_paused():
while self.is_running and not self.module.is_paused():
try:
import time
start_time = None
self.module.set_sync_project_settings() # clean cache
Expand Down Expand Up @@ -384,17 +385,27 @@ async def sync_loop(self):

duration = time.time() - start_time
log.debug("One loop took {:.2f}s".format(duration))
await asyncio.sleep(self.module.get_loop_delay(collection))
except ConnectionResetError:
log.warning("ConnectionResetError in sync loop, trying next loop",
exc_info=True)
except CancelledError:
# just stopping server
pass
except Exception:
self.stop()
log.warning("Unhandled exception in sync loop, stopping server",
exc_info=True)

delay = self.module.get_loop_delay(collection)
log.debug("Waiting for {} seconds to new loop".format(delay))
self.timer = asyncio.create_task(self.run_timer(delay))
await asyncio.gather(self.timer)

except ConnectionResetError:
log.warning("ConnectionResetError in sync loop, "
"trying next loop",
exc_info=True)
except CancelledError:
# just stopping server
pass
except ResumableError:
log.warning("ResumableError in sync loop, "
"trying next loop",
exc_info=True)
except Exception:
self.stop()
log.warning("Unhandled except. in sync loop, stopping server",
exc_info=True)

def stop(self):
"""Sets is_running flag to false, 'check_shutdown' shuts server down"""
Expand All @@ -417,6 +428,17 @@ async def check_shutdown(self):
await asyncio.sleep(0.07)
self.loop.stop()

async def run_timer(self, delay):
"""Wait for 'delay' seconds to start next loop"""
await asyncio.sleep(delay)

def reset_timer(self):
"""Called when waiting for next loop should be skipped"""
log.debug("Resetting timer")
if self.timer:
self.timer.cancel()
self.timer = None

def _working_sites(self, collection):
if self.module.is_project_paused(collection):
log.debug("Both sites same, skipping")
Expand Down
27 changes: 25 additions & 2 deletions openpype/modules/sync_server/sync_server_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,24 @@ def get_remote_site(self, project_name):

return remote_site

def reset_timer(self):
"""
Called when waiting for next loop should be skipped.
In case of user's involvement (reset site), start that right away.
"""
self.sync_server_thread.reset_timer()

def get_enabled_projects(self):
"""Returns list of projects which have SyncServer enabled."""
enabled_projects = []
for project in self.connection.projects():
project_name = project["name"]
project_settings = self.get_sync_project_setting(project_name)
if project_settings:
enabled_projects.append(project_name)

return enabled_projects
""" End of Public API """

def get_local_file_path(self, collection, site_name, file_path):
Expand All @@ -413,7 +431,7 @@ def get_local_file_path(self, collection, site_name, file_path):
return local_file_path

def _get_remote_sites_from_settings(self, sync_settings):
if not self.enabled or not sync_settings['enabled']:
if not self.enabled or not sync_settings.get('enabled'):
return []

remote_sites = [self.DEFAULT_SITE, self.LOCAL_SITE]
Expand All @@ -424,7 +442,7 @@ def _get_remote_sites_from_settings(self, sync_settings):

def _get_enabled_sites_from_settings(self, sync_settings):
sites = [self.DEFAULT_SITE]
if self.enabled and sync_settings['enabled']:
if self.enabled and sync_settings.get('enabled'):
sites.append(self.LOCAL_SITE)

return sites
Expand All @@ -445,6 +463,11 @@ def tray_init(self):
if not self.enabled:
return

enabled_projects = self.get_enabled_projects()
if not enabled_projects:
self.enabled = False
return

self.lock = threading.Lock()

try:
Expand Down
2 changes: 1 addition & 1 deletion openpype/modules/sync_server/tray/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def __init__(self, sync_server, parent=None):
layout.addWidget(footer)

self.setLayout(body_layout)
self.setWindowTitle("Sync Server")
self.setWindowTitle("Sync Queue")

self.projects.project_changed.connect(
lambda: repres.table_view.model().set_project(
Expand Down
14 changes: 13 additions & 1 deletion openpype/modules/sync_server/tray/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ def sort(self, index, order):
Sort is happening on a DB side, model is reset, db queried
again.
It remembers one last sort, adds it as secondary after new sort.
Args:
index (int): column index
order (int): 0|
Expand All @@ -184,7 +186,17 @@ def sort(self, index, order):
else:
order = -1

self.sort = {self.SORT_BY_COLUMN[index]: order, '_id': 1}
backup_sort = dict(self.sort)

self.sort = {self.SORT_BY_COLUMN[index]: order} # reset
# add last one
for key, val in backup_sort.items():
if key != '_id':
self.sort[key] = val
break
# add default one
self.sort['_id'] = 1

self.query = self.get_query()
# import json
# log.debug(json.dumps(self.query, indent=4).\
Expand Down
3 changes: 3 additions & 0 deletions openpype/modules/sync_server/tray/widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ def _add_site(self, selected_ids=None, site_name=None):
representation_id))
except ValueError as exp:
self.message_generated.emit("Error {}".format(str(exp)))
self.sync_server.reset_timer()

def _remove_site(self, selected_ids=None, site_name=None):
"""
Expand Down Expand Up @@ -343,6 +344,7 @@ def _remove_site(self, selected_ids=None, site_name=None):

self.model.refresh(
load_records=self.model._rec_loaded)
self.sync_server.reset_timer()

def _reset_site(self, selected_ids=None, site_name=None):
"""
Expand All @@ -368,6 +370,7 @@ def _reset_site(self, selected_ids=None, site_name=None):

self.model.refresh(
load_records=self.model._rec_loaded)
self.sync_server.reset_timer()

def _open_in_explorer(self, selected_ids=None, site_name=None):
log.debug("Open in Explorer {}:{}".format(selected_ids, site_name))
Expand Down
5 changes: 5 additions & 0 deletions openpype/modules/sync_server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
log = Logger().get_logger("SyncServer")


class ResumableError(Exception):
"""Error which could be temporary, skip current loop, try next time"""
pass


class SyncStatus:
DO_NOTHING = 0
DO_UPLOAD = 1
Expand Down

0 comments on commit 1fa3863

Please sign in to comment.