diff --git a/golem/apps/downloader.py b/golem/apps/downloader.py new file mode 100644 index 0000000000..39910366fc --- /dev/null +++ b/golem/apps/downloader.py @@ -0,0 +1,118 @@ +import abc +import datetime +import logging +from pathlib import Path +import typing +import xml.etree.ElementTree as xml + +from dataclasses import dataclass +import dateutil.parser as date_parser +import requests + +from golem.apps import save_app_to_json_file, AppDefinition +from golem.core.variables import APP_DEFINITIONS_CDN_URL + +logger = logging.getLogger(__name__) + + +class FromXml(abc.ABC): + """ Base class for objects which can be parsed from XML. This is used to + provide basic support for handling XML objects with namespaces. """ + def __init__(self, ns_map: typing.Dict[str, str]): + self._namespace_map = ns_map + + def _get_element( + self, + element: xml.Element, + name: str): + key, _ = list(self._namespace_map.items())[0] + return element.find(f'{key}:{name}', self._namespace_map) + + def _get_elements( + self, + element: xml.Element, + name: str + ) -> typing.List[xml.Element]: + key, _ = list(self._namespace_map.items())[0] + return element.findall(f'{key}:{name}', self._namespace_map) + + +@dataclass +class Contents(FromXml): + """ Represents a single `Contents` entry in a bucket listing. Such an entry + corresponds to an object stored within that bucket. """ + etag: str + key: str + last_modified: datetime.datetime + size: int # size in bytes + + def __init__(self, root: xml.Element, ns_map: typing.Dict[str, str]): + super().__init__(ns_map) + self.key = self._get_element(root, 'Key').text + self.etag = self._get_element(root, 'ETag').text + self.size = int(self._get_element(root, 'Size').text) + self.last_modified = date_parser.isoparse( + self._get_element(root, 'LastModified').text) + + +@dataclass +class ListBucketResult(FromXml): + """ Contains metadata about objects stored in an S3 bucket. """ + contents: typing.List[Contents] + + def __init__(self, root: xml.Element): + namespace_map = {'ns': _get_namespace(root)} + super().__init__(namespace_map) + + self.contents = [ + Contents(e, self._namespace_map) + for e in self._get_elements(root, 'Contents')] + + +def _get_namespace(element: xml.Element): + """ Hacky way of extracting the namespace from an XML element. + This assumes the document uses Clark's notation for tags + (i.e. {uri}local_part or local_part for empty namespace). """ + tag = element.tag + return tag[tag.find("{")+1:tag.rfind("}")] + + +def get_bucket_listing() -> ListBucketResult: + response = requests.get(APP_DEFINITIONS_CDN_URL) + response.raise_for_status() + root: xml.Element = xml.fromstring(response.content) + return ListBucketResult(root) + + +def download_definition( + key: str, + destination: Path) -> AppDefinition: + logger.debug( + 'download_definition. key=%s, destination=%s', key, destination) + response = requests.get(f'{APP_DEFINITIONS_CDN_URL}{key}') + response.raise_for_status() + definition = AppDefinition.from_json(response.text) + save_app_to_json_file(definition, destination) + return definition + + +def download_definitions(app_dir: Path) -> typing.List[AppDefinition]: + """ Download app definitions from Golem Factory CDN. Only downloads + definitions which are not already present locally. + :param: app_dir: path to directory containing local app definitions. + :return: list of newly downloaded app definitions. """ + new_definitions = [] + bucket_listing = get_bucket_listing() + logger.debug( + 'download_definitions. app_dir=%s, bucket_listing=%r', + app_dir, + bucket_listing + ) + + for metadata in bucket_listing.contents: + definition_path = app_dir / metadata.key + if not (definition_path).exists(): + new_definitions.append( + download_definition(metadata.key, definition_path)) + + return new_definitions diff --git a/golem/apps/manager.py b/golem/apps/manager.py index 8d0cea6840..2d4197019c 100644 --- a/golem/apps/manager.py +++ b/golem/apps/manager.py @@ -2,9 +2,19 @@ from typing import Dict, List, Tuple from pathlib import Path -from golem.apps import AppId, AppDefinition, load_apps_from_dir -from golem.apps.default import save_built_in_app_definitions +from dataclasses import asdict +from requests.exceptions import RequestException + +from golem.apps import ( + AppId, + AppDefinition, + app_json_file_name, + load_apps_from_dir +) +from golem.apps.downloader import download_definitions from golem.model import AppConfiguration +from golem.report import EventPublisher +from golem.rpc.mapping.rpceventnames import App logger = logging.getLogger(__name__) @@ -12,20 +22,19 @@ class AppManager: """ Manager class for applications using Task API. """ - def __init__(self, app_dir: Path, save_apps=True) -> None: + def __init__(self, app_dir: Path, download_apps: bool = True) -> None: + self.app_dir: Path = app_dir + self.app_dir.mkdir(exist_ok=True) self._apps: Dict[AppId, AppDefinition] = {} self._state = AppStates() self._app_file_names: Dict[AppId, Path] = dict() - # Save build in apps, then load apps from path - new_apps: List[AppId] = [] - if save_apps: - new_apps = save_built_in_app_definitions(app_dir) + # Download default apps then load all apps from path + if download_apps: + self.update_apps(register_apps=False) for app_def_path, app_def in load_apps_from_dir(app_dir): self.register_app(app_def) self._app_file_names[app_def.id] = app_def_path - for app_id in new_apps: - self.set_enabled(app_id, True) def registered(self, app_id) -> bool: return app_id in self._apps @@ -81,6 +90,32 @@ def delete(self, app_id: AppId) -> bool: self._app_file_names[app_id].unlink() return True + def update_apps(self, register_apps: bool = True): + """ Download new app definitions if available. For each definition + downloaded publish an RPC event to notify clients. + :param register_apps: if True, new definitions will be + registered in the manager. """ + try: + new_apps = download_definitions(self.app_dir) + except RequestException as e: + logger.error('Failed to download new app definitions. %s', e) + return + + for app in new_apps: + logger.info( + 'New application definition downloaded. ' + 'app_name=%s, app_version=%s, app_id=%r', + app.name, + app.version, + app.id + ) + if register_apps: + self.register_app(app) + app_file_path = self.app_dir / app_json_file_name(app) + self._app_file_names[app.id] = app_file_path + + EventPublisher.publish(App.evt_new_definiton, asdict(app)) + class AppStates: diff --git a/golem/client.py b/golem/client.py index 44f2eb499d..8938cdb827 100644 --- a/golem/client.py +++ b/golem/client.py @@ -193,7 +193,7 @@ def __init__( # noqa pylint: disable=too-many-arguments,too-many-locals TaskArchiverService(self.task_archiver), MessageHistoryService(), DoWorkService(self), - DailyJobsService(), + DailyJobsService(self), ] clean_resources_older_than = \ @@ -1736,13 +1736,14 @@ def _run(self): class DailyJobsService(LoopingCallService): - def __init__(self): + def __init__(self, client: Client): super().__init__( - interval_seconds=datetime.timedelta(days=1).total_seconds(), + interval_seconds=int(datetime.timedelta(days=1).total_seconds()), ) + self._client = client def _run(self) -> None: - jobs = ( + jobs = [ nodeskeeper.sweep, msg_queue.sweep, broadcast.sweep, @@ -1753,7 +1754,11 @@ def _run(self) -> None: datetime.datetime.utcnow(), datetime.datetime.now() - datetime.datetime.utcnow(), ), - ) + ] + + if self._client.task_server: + jobs.append(self._client.task_server.app_manager.update_apps) + logger.info('Running daily jobs') for job in jobs: try: diff --git a/golem/core/variables.py b/golem/core/variables.py index b891a07850..330f9df7d2 100644 --- a/golem/core/variables.py +++ b/golem/core/variables.py @@ -75,6 +75,8 @@ # How long should peer be banned when failing on resources (seconds) ACL_BLOCK_TIMEOUT_RESOURCE = 2 * 3600 # s +APP_DEFINITIONS_CDN_URL = 'https://golem-app-definitions.cdn.golem.network/' + ############### # PROTOCOL ID # diff --git a/golem/rpc/mapping/rpceventnames.py b/golem/rpc/mapping/rpceventnames.py index da09983b8e..dafda226a2 100644 --- a/golem/rpc/mapping/rpceventnames.py +++ b/golem/rpc/mapping/rpceventnames.py @@ -49,6 +49,10 @@ class UI: evt_lock_config = 'evt.ui.widget.config.lock' +class App: + evt_new_definiton = 'evt.apps.new_definition' + + NAMESPACES = [ Golem, Environment, diff --git a/tests/golem/apps/test_app_downloader.py b/tests/golem/apps/test_app_downloader.py new file mode 100644 index 0000000000..443785b660 --- /dev/null +++ b/tests/golem/apps/test_app_downloader.py @@ -0,0 +1,59 @@ +from mock import Mock, patch + +import requests + +import golem.apps.downloader as downloader +from golem.testutils import TempDirFixture + +ROOT_PATH = 'golem.apps.downloader' + +APP_KEY = 'test-app_0.1.0_asdf1234.json' +BUCKET_LISTING_XML = f''' + + golem-app-definitions + + {APP_KEY} + 2020-02-28T08:49:34.000Z + "1c5dbeaaf0589820b799448664d24864" + 357 + STANDARD + + +''' + + +class TestAppDownloader(TempDirFixture): + + @patch(f'{ROOT_PATH}.get_bucket_listing') + @patch(f'{ROOT_PATH}.download_definition') + def test_download_definitions(self, download_mock, bucket_listing_mock): + apps_path = self.new_path / 'apps' + apps_path.mkdir(exist_ok=True) + existing_app_path = apps_path / APP_KEY + existing_app_path.touch() + new_app_key = 'downloaded_app.json' + metadata = [ + Mock(spec=downloader.Contents, key=APP_KEY), + Mock(spec=downloader.Contents, key=new_app_key), + ] + bucket_listing_mock.return_value = Mock( + spec=downloader.ListBucketResult, contents=metadata) + + new_definitions = downloader.download_definitions(apps_path) + + self.assertEqual(len(new_definitions), 1) + download_mock.assert_called_once_with( + new_app_key, apps_path / new_app_key) + self.assertEqual(download_mock.call_count, 1) + + @patch('requests.get') + def test_get_bucket_listing(self, mock_get): + response = Mock(spec=requests.Response) + response.status_code = 200 + response.content = BUCKET_LISTING_XML + mock_get.return_value = response + + result = downloader.get_bucket_listing() + + self.assertEqual(len(result.contents), 1) + self.assertEqual(result.contents[0].key, APP_KEY) diff --git a/tests/golem/apps/test_app_manager.py b/tests/golem/apps/test_app_manager.py index 79dd61af26..2f688b83c7 100644 --- a/tests/golem/apps/test_app_manager.py +++ b/tests/golem/apps/test_app_manager.py @@ -1,4 +1,4 @@ -from mock import Mock +from mock import Mock, patch from golem.apps.manager import AppManager from golem.apps import ( @@ -8,6 +8,7 @@ ) from golem.testutils import TempDirFixture, DatabaseFixture +ROOT_PATH = 'golem.apps.manager' APP_DEF = AppDefinition( name='test_app', requestor_env='test_env', @@ -29,6 +30,25 @@ def setUp(self): self.app_manager = AppManager(app_path, False) +class TestUpdateApps(AppManagerTestBase): + + @patch(f'{ROOT_PATH}.download_definitions') + @patch(f'{ROOT_PATH}.EventPublisher') + def test_update(self, publisher_mock, download_mock): + download_mock.return_value = [APP_DEF] + self.app_manager.update_apps() + self.assertEqual(self.app_manager.apps(), [(APP_ID, APP_DEF)]) + self.assertEqual(self.app_manager.app(APP_ID), APP_DEF) + self.assertFalse(self.app_manager.enabled(APP_ID)) + self.assertEqual(publisher_mock.publish.call_count, 1) + + # Definition already exists locally + download_mock.return_value = [] + self.app_manager.update_apps() + self.assertEqual(self.app_manager.apps(), [(APP_ID, APP_DEF)]) + self.assertEqual(publisher_mock.publish.call_count, 1) + + class TestRegisterApp(AppManagerTestBase): def test_register_app(self):