Skip to content
This repository has been archived by the owner on Sep 20, 2024. It is now read-only.

Bugfix Sync server tweaks #1430

Merged
merged 7 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions openpype/modules/sync_server/providers/gdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
from openpype.api import Logger
from openpype.api import get_system_settings
from ..utils import time_function
from ..utils import time_function, ResumableError
import time


Expand Down Expand Up @@ -63,7 +63,14 @@ def __init__(self, project_name, site_name, tree=None, presets=None):
return

self.service = self._get_gd_service()
self.root = self._prepare_root_info()
try:
self.root = self._prepare_root_info()
except errors.HttpError:
log.warning("HttpError in sync loop, "
"trying next loop",
exc_info=True)
raise ResumableError

self._tree = tree
self.active = True

Expand Down
2 changes: 1 addition & 1 deletion openpype/modules/sync_server/providers/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,4 @@ class and batch limit.
# 7 denotes number of files that could be synced in single loop - learned by
# trial and error
factory.register_provider('gdrive', GDriveHandler, 7)
factory.register_provider('local_drive', LocalDriveHandler, 10)
factory.register_provider('local_drive', LocalDriveHandler, 50)
50 changes: 36 additions & 14 deletions openpype/modules/sync_server/sync_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .providers import lib
from openpype.lib import PypeLogger

from .utils import SyncStatus
from .utils import SyncStatus, ResumableError


log = PypeLogger().get_logger("SyncServer")
Expand Down Expand Up @@ -232,6 +232,7 @@ def __init__(self, module):
self.loop = None
self.is_running = False
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
self.timer = None

def run(self):
self.is_running = True
Expand Down Expand Up @@ -266,8 +267,8 @@ async def sync_loop(self):
Returns:

"""
try:
while self.is_running and not self.module.is_paused():
while self.is_running and not self.module.is_paused():
try:
import time
start_time = None
self.module.set_sync_project_settings() # clean cache
Expand Down Expand Up @@ -384,17 +385,27 @@ async def sync_loop(self):

duration = time.time() - start_time
log.debug("One loop took {:.2f}s".format(duration))
await asyncio.sleep(self.module.get_loop_delay(collection))
except ConnectionResetError:
log.warning("ConnectionResetError in sync loop, trying next loop",
exc_info=True)
except CancelledError:
# just stopping server
pass
except Exception:
self.stop()
log.warning("Unhandled exception in sync loop, stopping server",
exc_info=True)

delay = self.module.get_loop_delay(collection)
log.debug("Waiting for {} seconds to new loop".format(delay))
self.timer = asyncio.create_task(self.run_timer(delay))
await asyncio.gather(self.timer)

except ConnectionResetError:
log.warning("ConnectionResetError in sync loop, "
"trying next loop",
exc_info=True)
except CancelledError:
# just stopping server
pass
except ResumableError:
log.warning("ResumableError in sync loop, "
"trying next loop",
exc_info=True)
except Exception:
self.stop()
log.warning("Unhandled except. in sync loop, stopping server",
exc_info=True)

def stop(self):
"""Sets is_running flag to false, 'check_shutdown' shuts server down"""
Expand All @@ -417,6 +428,17 @@ async def check_shutdown(self):
await asyncio.sleep(0.07)
self.loop.stop()

async def run_timer(self, delay):
"""Wait for 'delay' seconds to start next loop"""
await asyncio.sleep(delay)

def reset_timer(self):
"""Called when waiting for next loop should be skipped"""
log.debug("Resetting timer")
if self.timer:
self.timer.cancel()
self.timer = None

def _working_sites(self, collection):
if self.module.is_project_paused(collection):
log.debug("Both sites same, skipping")
Expand Down
27 changes: 25 additions & 2 deletions openpype/modules/sync_server/sync_server_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,24 @@ def get_remote_site(self, project_name):

return remote_site

def reset_timer(self):
"""
Called when waiting for next loop should be skipped.

In case of user's involvement (reset site), start that right away.
"""
self.sync_server_thread.reset_timer()

def get_enabled_projects(self):
"""Returns list of projects which have SyncServer enabled."""
enabled_projects = []
for project in self.connection.projects():
project_name = project["name"]
project_settings = self.get_sync_project_setting(project_name)
if project_settings:
enabled_projects.append(project_name)

return enabled_projects
""" End of Public API """

def get_local_file_path(self, collection, site_name, file_path):
Expand All @@ -413,7 +431,7 @@ def get_local_file_path(self, collection, site_name, file_path):
return local_file_path

def _get_remote_sites_from_settings(self, sync_settings):
if not self.enabled or not sync_settings['enabled']:
if not self.enabled or not sync_settings.get('enabled'):
return []

remote_sites = [self.DEFAULT_SITE, self.LOCAL_SITE]
Expand All @@ -424,7 +442,7 @@ def _get_remote_sites_from_settings(self, sync_settings):

def _get_enabled_sites_from_settings(self, sync_settings):
sites = [self.DEFAULT_SITE]
if self.enabled and sync_settings['enabled']:
if self.enabled and sync_settings.get('enabled'):
sites.append(self.LOCAL_SITE)

return sites
Expand All @@ -445,6 +463,11 @@ def tray_init(self):
if not self.enabled:
return

enabled_projects = self.get_enabled_projects()
if not enabled_projects:
self.enabled = False
return

self.lock = threading.Lock()

try:
Expand Down
2 changes: 1 addition & 1 deletion openpype/modules/sync_server/tray/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def __init__(self, sync_server, parent=None):
layout.addWidget(footer)

self.setLayout(body_layout)
self.setWindowTitle("Sync Server")
self.setWindowTitle("Sync Queue")

self.projects.project_changed.connect(
lambda: repres.table_view.model().set_project(
Expand Down
14 changes: 13 additions & 1 deletion openpype/modules/sync_server/tray/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ def sort(self, index, order):
Sort is happening on a DB side, model is reset, db queried
again.

It remembers one last sort, adds it as secondary after new sort.

Args:
index (int): column index
order (int): 0|
Expand All @@ -184,7 +186,17 @@ def sort(self, index, order):
else:
order = -1

self.sort = {self.SORT_BY_COLUMN[index]: order, '_id': 1}
backup_sort = dict(self.sort)

self.sort = {self.SORT_BY_COLUMN[index]: order} # reset
# add last one
for key, val in backup_sort.items():
if key != '_id':
self.sort[key] = val
break
# add default one
self.sort['_id'] = 1

self.query = self.get_query()
# import json
# log.debug(json.dumps(self.query, indent=4).\
Expand Down
3 changes: 3 additions & 0 deletions openpype/modules/sync_server/tray/widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ def _add_site(self, selected_ids=None, site_name=None):
representation_id))
except ValueError as exp:
self.message_generated.emit("Error {}".format(str(exp)))
self.sync_server.reset_timer()

def _remove_site(self, selected_ids=None, site_name=None):
"""
Expand Down Expand Up @@ -343,6 +344,7 @@ def _remove_site(self, selected_ids=None, site_name=None):

self.model.refresh(
load_records=self.model._rec_loaded)
self.sync_server.reset_timer()

def _reset_site(self, selected_ids=None, site_name=None):
"""
Expand All @@ -368,6 +370,7 @@ def _reset_site(self, selected_ids=None, site_name=None):

self.model.refresh(
load_records=self.model._rec_loaded)
self.sync_server.reset_timer()

def _open_in_explorer(self, selected_ids=None, site_name=None):
log.debug("Open in Explorer {}:{}".format(selected_ids, site_name))
Expand Down
5 changes: 5 additions & 0 deletions openpype/modules/sync_server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
log = Logger().get_logger("SyncServer")


class ResumableError(Exception):
"""Error which could be temporary, skip current loop, try next time"""
pass


class SyncStatus:
DO_NOTHING = 0
DO_UPLOAD = 1
Expand Down