Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Load app definition files from Golem releases CDN (#5114)
Browse files Browse the repository at this point in the history
Adds logic for downloading app definition JSON files from CDN in Task API AppManager. The definition files are distinguished based on their file name (which contains the hash of the file's contents). Updating apps is done on AppManager init and then periodically (once a day using DailyJobsService). For every new definition downloaded an RPC event is fired (evt.apps.new_definition).
  • Loading branch information
kmazurek authored Mar 5, 2020
1 parent 1ebd28f commit 7835ab4
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 15 deletions.
118 changes: 118 additions & 0 deletions golem/apps/downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import abc
import datetime
import logging
from pathlib import Path
import typing
import xml.etree.ElementTree as xml

from dataclasses import dataclass
import dateutil.parser as date_parser
import requests

from golem.apps import save_app_to_json_file, AppDefinition
from golem.core.variables import APP_DEFINITIONS_CDN_URL

logger = logging.getLogger(__name__)


class FromXml(abc.ABC):
""" Base class for objects which can be parsed from XML. This is used to
provide basic support for handling XML objects with namespaces. """
def __init__(self, ns_map: typing.Dict[str, str]):
self._namespace_map = ns_map

def _get_element(
self,
element: xml.Element,
name: str):
key, _ = list(self._namespace_map.items())[0]
return element.find(f'{key}:{name}', self._namespace_map)

def _get_elements(
self,
element: xml.Element,
name: str
) -> typing.List[xml.Element]:
key, _ = list(self._namespace_map.items())[0]
return element.findall(f'{key}:{name}', self._namespace_map)


@dataclass
class Contents(FromXml):
""" Represents a single `Contents` entry in a bucket listing. Such an entry
corresponds to an object stored within that bucket. """
etag: str
key: str
last_modified: datetime.datetime
size: int # size in bytes

def __init__(self, root: xml.Element, ns_map: typing.Dict[str, str]):
super().__init__(ns_map)
self.key = self._get_element(root, 'Key').text
self.etag = self._get_element(root, 'ETag').text
self.size = int(self._get_element(root, 'Size').text)
self.last_modified = date_parser.isoparse(
self._get_element(root, 'LastModified').text)


@dataclass
class ListBucketResult(FromXml):
""" Contains metadata about objects stored in an S3 bucket. """
contents: typing.List[Contents]

def __init__(self, root: xml.Element):
namespace_map = {'ns': _get_namespace(root)}
super().__init__(namespace_map)

self.contents = [
Contents(e, self._namespace_map)
for e in self._get_elements(root, 'Contents')]


def _get_namespace(element: xml.Element):
""" Hacky way of extracting the namespace from an XML element.
This assumes the document uses Clark's notation for tags
(i.e. {uri}local_part or local_part for empty namespace). """
tag = element.tag
return tag[tag.find("{")+1:tag.rfind("}")]


def get_bucket_listing() -> ListBucketResult:
response = requests.get(APP_DEFINITIONS_CDN_URL)
response.raise_for_status()
root: xml.Element = xml.fromstring(response.content)
return ListBucketResult(root)


def download_definition(
key: str,
destination: Path) -> AppDefinition:
logger.debug(
'download_definition. key=%s, destination=%s', key, destination)
response = requests.get(f'{APP_DEFINITIONS_CDN_URL}{key}')
response.raise_for_status()
definition = AppDefinition.from_json(response.text)
save_app_to_json_file(definition, destination)
return definition


def download_definitions(app_dir: Path) -> typing.List[AppDefinition]:
""" Download app definitions from Golem Factory CDN. Only downloads
definitions which are not already present locally.
:param: app_dir: path to directory containing local app definitions.
:return: list of newly downloaded app definitions. """
new_definitions = []
bucket_listing = get_bucket_listing()
logger.debug(
'download_definitions. app_dir=%s, bucket_listing=%r',
app_dir,
bucket_listing
)

for metadata in bucket_listing.contents:
definition_path = app_dir / metadata.key
if not (definition_path).exists():
new_definitions.append(
download_definition(metadata.key, definition_path))

return new_definitions
53 changes: 44 additions & 9 deletions golem/apps/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,39 @@
from typing import Dict, List, Tuple
from pathlib import Path

from golem.apps import AppId, AppDefinition, load_apps_from_dir
from golem.apps.default import save_built_in_app_definitions
from dataclasses import asdict
from requests.exceptions import RequestException

from golem.apps import (
AppId,
AppDefinition,
app_json_file_name,
load_apps_from_dir
)
from golem.apps.downloader import download_definitions
from golem.model import AppConfiguration
from golem.report import EventPublisher
from golem.rpc.mapping.rpceventnames import App

logger = logging.getLogger(__name__)


class AppManager:
""" Manager class for applications using Task API. """

def __init__(self, app_dir: Path, save_apps=True) -> None:
def __init__(self, app_dir: Path, download_apps: bool = True) -> None:
self.app_dir: Path = app_dir
self.app_dir.mkdir(exist_ok=True)
self._apps: Dict[AppId, AppDefinition] = {}
self._state = AppStates()
self._app_file_names: Dict[AppId, Path] = dict()

# Save build in apps, then load apps from path
new_apps: List[AppId] = []
if save_apps:
new_apps = save_built_in_app_definitions(app_dir)
# Download default apps then load all apps from path
if download_apps:
self.update_apps(register_apps=False)
for app_def_path, app_def in load_apps_from_dir(app_dir):
self.register_app(app_def)
self._app_file_names[app_def.id] = app_def_path
for app_id in new_apps:
self.set_enabled(app_id, True)

def registered(self, app_id) -> bool:
return app_id in self._apps
Expand Down Expand Up @@ -81,6 +90,32 @@ def delete(self, app_id: AppId) -> bool:
self._app_file_names[app_id].unlink()
return True

def update_apps(self, register_apps: bool = True):
""" Download new app definitions if available. For each definition
downloaded publish an RPC event to notify clients.
:param register_apps: if True, new definitions will be
registered in the manager. """
try:
new_apps = download_definitions(self.app_dir)
except RequestException as e:
logger.error('Failed to download new app definitions. %s', e)
return

for app in new_apps:
logger.info(
'New application definition downloaded. '
'app_name=%s, app_version=%s, app_id=%r',
app.name,
app.version,
app.id
)
if register_apps:
self.register_app(app)
app_file_path = self.app_dir / app_json_file_name(app)
self._app_file_names[app.id] = app_file_path

EventPublisher.publish(App.evt_new_definiton, asdict(app))


class AppStates:

Expand Down
15 changes: 10 additions & 5 deletions golem/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def __init__( # noqa pylint: disable=too-many-arguments,too-many-locals
TaskArchiverService(self.task_archiver),
MessageHistoryService(),
DoWorkService(self),
DailyJobsService(),
DailyJobsService(self),
]

clean_resources_older_than = \
Expand Down Expand Up @@ -1736,13 +1736,14 @@ def _run(self):


class DailyJobsService(LoopingCallService):
def __init__(self):
def __init__(self, client: Client):
super().__init__(
interval_seconds=datetime.timedelta(days=1).total_seconds(),
interval_seconds=int(datetime.timedelta(days=1).total_seconds()),
)
self._client = client

def _run(self) -> None:
jobs = (
jobs = [
nodeskeeper.sweep,
msg_queue.sweep,
broadcast.sweep,
Expand All @@ -1753,7 +1754,11 @@ def _run(self) -> None:
datetime.datetime.utcnow(),
datetime.datetime.now() - datetime.datetime.utcnow(),
),
)
]

if self._client.task_server:
jobs.append(self._client.task_server.app_manager.update_apps)

logger.info('Running daily jobs')
for job in jobs:
try:
Expand Down
2 changes: 2 additions & 0 deletions golem/core/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
# How long should peer be banned when failing on resources (seconds)
ACL_BLOCK_TIMEOUT_RESOURCE = 2 * 3600 # s

APP_DEFINITIONS_CDN_URL = 'https://golem-app-definitions.cdn.golem.network/'


###############
# PROTOCOL ID #
Expand Down
4 changes: 4 additions & 0 deletions golem/rpc/mapping/rpceventnames.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class UI:
evt_lock_config = 'evt.ui.widget.config.lock'


class App:
evt_new_definiton = 'evt.apps.new_definition'


NAMESPACES = [
Golem,
Environment,
Expand Down
59 changes: 59 additions & 0 deletions tests/golem/apps/test_app_downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from mock import Mock, patch

import requests

import golem.apps.downloader as downloader
from golem.testutils import TempDirFixture

ROOT_PATH = 'golem.apps.downloader'

APP_KEY = 'test-app_0.1.0_asdf1234.json'
BUCKET_LISTING_XML = f'''<?xml version="1.0" encoding="UTF-8"?>
<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Name>golem-app-definitions</Name>
<Contents>
<Key>{APP_KEY}</Key>
<LastModified>2020-02-28T08:49:34.000Z</LastModified>
<ETag>&quot;1c5dbeaaf0589820b799448664d24864&quot;</ETag>
<Size>357</Size>
<StorageClass>STANDARD</StorageClass>
</Contents>
</ListBucketResult>
'''


class TestAppDownloader(TempDirFixture):

@patch(f'{ROOT_PATH}.get_bucket_listing')
@patch(f'{ROOT_PATH}.download_definition')
def test_download_definitions(self, download_mock, bucket_listing_mock):
apps_path = self.new_path / 'apps'
apps_path.mkdir(exist_ok=True)
existing_app_path = apps_path / APP_KEY
existing_app_path.touch()
new_app_key = 'downloaded_app.json'
metadata = [
Mock(spec=downloader.Contents, key=APP_KEY),
Mock(spec=downloader.Contents, key=new_app_key),
]
bucket_listing_mock.return_value = Mock(
spec=downloader.ListBucketResult, contents=metadata)

new_definitions = downloader.download_definitions(apps_path)

self.assertEqual(len(new_definitions), 1)
download_mock.assert_called_once_with(
new_app_key, apps_path / new_app_key)
self.assertEqual(download_mock.call_count, 1)

@patch('requests.get')
def test_get_bucket_listing(self, mock_get):
response = Mock(spec=requests.Response)
response.status_code = 200
response.content = BUCKET_LISTING_XML
mock_get.return_value = response

result = downloader.get_bucket_listing()

self.assertEqual(len(result.contents), 1)
self.assertEqual(result.contents[0].key, APP_KEY)
22 changes: 21 additions & 1 deletion tests/golem/apps/test_app_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from mock import Mock
from mock import Mock, patch

from golem.apps.manager import AppManager
from golem.apps import (
Expand All @@ -8,6 +8,7 @@
)
from golem.testutils import TempDirFixture, DatabaseFixture

ROOT_PATH = 'golem.apps.manager'
APP_DEF = AppDefinition(
name='test_app',
requestor_env='test_env',
Expand All @@ -29,6 +30,25 @@ def setUp(self):
self.app_manager = AppManager(app_path, False)


class TestUpdateApps(AppManagerTestBase):

@patch(f'{ROOT_PATH}.download_definitions')
@patch(f'{ROOT_PATH}.EventPublisher')
def test_update(self, publisher_mock, download_mock):
download_mock.return_value = [APP_DEF]
self.app_manager.update_apps()
self.assertEqual(self.app_manager.apps(), [(APP_ID, APP_DEF)])
self.assertEqual(self.app_manager.app(APP_ID), APP_DEF)
self.assertFalse(self.app_manager.enabled(APP_ID))
self.assertEqual(publisher_mock.publish.call_count, 1)

# Definition already exists locally
download_mock.return_value = []
self.app_manager.update_apps()
self.assertEqual(self.app_manager.apps(), [(APP_ID, APP_DEF)])
self.assertEqual(publisher_mock.publish.call_count, 1)


class TestRegisterApp(AppManagerTestBase):

def test_register_app(self):
Expand Down

0 comments on commit 7835ab4

Please sign in to comment.