Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Load app definition files from Golem releases CDN #5114

Merged
merged 13 commits into from
Mar 5, 2020
118 changes: 118 additions & 0 deletions golem/apps/downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import abc
import datetime
import logging
from pathlib import Path
import typing
import xml.etree.ElementTree as xml

from dataclasses import dataclass
import dateutil.parser as date_parser
import requests

from golem.apps import save_app_to_json_file, AppDefinition
from golem.core.variables import APP_DEFINITIONS_CDN_URL

logger = logging.getLogger(__name__)


class FromXml(abc.ABC):
""" Base class for objects which can be parsed from XML. This is used to
provide basic support for handling XML objects with namespaces. """
def __init__(self, ns_map: typing.Dict[str, str]):
self._namespace_map = ns_map

def _get_element(
self,
element: xml.Element,
name: str):
key, _ = list(self._namespace_map.items())[0]
return element.find(f'{key}:{name}', self._namespace_map)

def _get_elements(
self,
element: xml.Element,
name: str
) -> typing.List[xml.Element]:
key, _ = list(self._namespace_map.items())[0]
return element.findall(f'{key}:{name}', self._namespace_map)


@dataclass
class Contents(FromXml):
""" Represents a single `Contents` entry in a bucket listing. Such an entry
corresponds to an object stored within that bucket. """
etag: str
key: str
last_modified: datetime.datetime
size: int # size in bytes

def __init__(self, root: xml.Element, ns_map: typing.Dict[str, str]):
super().__init__(ns_map)
self.key = self._get_element(root, 'Key').text
self.etag = self._get_element(root, 'ETag').text
self.size = int(self._get_element(root, 'Size').text)
self.last_modified = date_parser.isoparse(
self._get_element(root, 'LastModified').text)


@dataclass
class ListBucketResult(FromXml):
""" Contains metadata about objects stored in an S3 bucket. """
contents: typing.List[Contents]

def __init__(self, root: xml.Element):
namespace_map = {'ns': _get_namespace(root)}
super().__init__(namespace_map)

self.contents = [
Contents(e, self._namespace_map)
for e in self._get_elements(root, 'Contents')]


def _get_namespace(element: xml.Element):
""" Hacky way of extracting the namespace from an XML element.
This assumes the document uses Clark's notation for tags
(i.e. {uri}local_part or local_part for empty namespace). """
tag = element.tag
return tag[tag.find("{")+1:tag.rfind("}")]


def get_bucket_listing() -> ListBucketResult:
response = requests.get(APP_DEFINITIONS_CDN_URL)
response.raise_for_status()
root: xml.Element = xml.fromstring(response.content)
return ListBucketResult(root)


def download_definition(
key: str,
destination: Path) -> AppDefinition:
logger.debug(
'download_definition. key=%s, destination=%s', key, destination)
response = requests.get(f'{APP_DEFINITIONS_CDN_URL}{key}')
response.raise_for_status()
definition = AppDefinition.from_json(response.text)
save_app_to_json_file(definition, destination)
return definition


def download_definitions(app_dir: Path) -> typing.List[AppDefinition]:
""" Download app definitions from Golem Factory CDN. Only downloads
definitions which are not already present locally.
:param: app_dir: path to directory containing local app definitions.
:return: list of newly downloaded app definitions. """
new_definitions = []
bucket_listing = get_bucket_listing()
logger.debug(
'download_definitions. app_dir=%s, bucket_listing=%r',
app_dir,
bucket_listing
)

for metadata in bucket_listing.contents:
definition_path = app_dir / metadata.key
if not (definition_path).exists():
new_definitions.append(
download_definition(metadata.key, definition_path))

return new_definitions
53 changes: 44 additions & 9 deletions golem/apps/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,39 @@
from typing import Dict, List, Tuple
from pathlib import Path

from golem.apps import AppId, AppDefinition, load_apps_from_dir
from golem.apps.default import save_built_in_app_definitions
from dataclasses import asdict
from requests.exceptions import RequestException

from golem.apps import (
AppId,
AppDefinition,
app_json_file_name,
load_apps_from_dir
)
from golem.apps.downloader import download_definitions
from golem.model import AppConfiguration
from golem.report import EventPublisher
from golem.rpc.mapping.rpceventnames import App

logger = logging.getLogger(__name__)


class AppManager:
""" Manager class for applications using Task API. """

def __init__(self, app_dir: Path, save_apps=True) -> None:
def __init__(self, app_dir: Path, download_apps: bool = True) -> None:
self.app_dir: Path = app_dir
self.app_dir.mkdir(exist_ok=True)
self._apps: Dict[AppId, AppDefinition] = {}
self._state = AppStates()
self._app_file_names: Dict[AppId, Path] = dict()

# Save build in apps, then load apps from path
new_apps: List[AppId] = []
if save_apps:
new_apps = save_built_in_app_definitions(app_dir)
# Download default apps then load all apps from path
if download_apps:
self.update_apps(register_apps=False)
for app_def_path, app_def in load_apps_from_dir(app_dir):
self.register_app(app_def)
self._app_file_names[app_def.id] = app_def_path
for app_id in new_apps:
self.set_enabled(app_id, True)

def registered(self, app_id) -> bool:
return app_id in self._apps
Expand Down Expand Up @@ -81,6 +90,32 @@ def delete(self, app_id: AppId) -> bool:
self._app_file_names[app_id].unlink()
return True

def update_apps(self, register_apps: bool = True):
""" Download new app definitions if available. For each definition
downloaded publish an RPC event to notify clients.
:param register_apps: if True, new definitions will be
registered in the manager. """
try:
new_apps = download_definitions(self.app_dir)
except RequestException as e:
logger.error('Failed to download new app definitions. %s', e)
return

for app in new_apps:
logger.info(
'New application definition downloaded. '
'app_name=%s, app_version=%s, app_id=%r',
app.name,
app.version,
app.id
)
if register_apps:
self.register_app(app)
app_file_path = self.app_dir / app_json_file_name(app)
self._app_file_names[app.id] = app_file_path

EventPublisher.publish(App.evt_new_definiton, asdict(app))


class AppStates:

Expand Down
15 changes: 10 additions & 5 deletions golem/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def __init__( # noqa pylint: disable=too-many-arguments,too-many-locals
TaskArchiverService(self.task_archiver),
MessageHistoryService(),
DoWorkService(self),
DailyJobsService(),
DailyJobsService(self),
]

clean_resources_older_than = \
Expand Down Expand Up @@ -1734,13 +1734,14 @@ def _run(self):


class DailyJobsService(LoopingCallService):
def __init__(self):
def __init__(self, client: Client):
super().__init__(
interval_seconds=datetime.timedelta(days=1).total_seconds(),
interval_seconds=int(datetime.timedelta(days=1).total_seconds()),
)
self._client = client

def _run(self) -> None:
jobs = (
jobs = [
nodeskeeper.sweep,
msg_queue.sweep,
lambda: logger.info(
Expand All @@ -1750,7 +1751,11 @@ def _run(self) -> None:
datetime.datetime.utcnow(),
datetime.datetime.now() - datetime.datetime.utcnow(),
),
)
]

if self._client.task_server:
jobs.append(self._client.task_server.app_manager.update_apps)

logger.info('Running daily jobs')
for job in jobs:
try:
Expand Down
2 changes: 2 additions & 0 deletions golem/core/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
# How long should peer be banned when failing on resources (seconds)
ACL_BLOCK_TIMEOUT_RESOURCE = 2 * 3600 # s

APP_DEFINITIONS_CDN_URL = 'https://golem-app-definitions.cdn.golem.network/'


###############
# PROTOCOL ID #
Expand Down
4 changes: 4 additions & 0 deletions golem/rpc/mapping/rpceventnames.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class UI:
evt_lock_config = 'evt.ui.widget.config.lock'


class App:
evt_new_definiton = 'evt.apps.new_definition'


NAMESPACES = [
Golem,
Environment,
Expand Down
59 changes: 59 additions & 0 deletions tests/golem/apps/test_app_downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from mock import Mock, patch

import requests

import golem.apps.downloader as downloader
from golem.testutils import TempDirFixture

ROOT_PATH = 'golem.apps.downloader'

APP_KEY = 'test-app_0.1.0_asdf1234.json'
BUCKET_LISTING_XML = f'''<?xml version="1.0" encoding="UTF-8"?>
<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Name>golem-app-definitions</Name>
<Contents>
<Key>{APP_KEY}</Key>
<LastModified>2020-02-28T08:49:34.000Z</LastModified>
<ETag>&quot;1c5dbeaaf0589820b799448664d24864&quot;</ETag>
<Size>357</Size>
<StorageClass>STANDARD</StorageClass>
</Contents>
</ListBucketResult>
'''


class TestAppDownloader(TempDirFixture):

@patch(f'{ROOT_PATH}.get_bucket_listing')
@patch(f'{ROOT_PATH}.download_definition')
def test_download_definitions(self, download_mock, bucket_listing_mock):
apps_path = self.new_path / 'apps'
apps_path.mkdir(exist_ok=True)
existing_app_path = apps_path / APP_KEY
existing_app_path.touch()
new_app_key = 'downloaded_app.json'
metadata = [
Mock(spec=downloader.Contents, key=APP_KEY),
Mock(spec=downloader.Contents, key=new_app_key),
]
bucket_listing_mock.return_value = Mock(
spec=downloader.ListBucketResult, contents=metadata)

new_definitions = downloader.download_definitions(apps_path)

self.assertEqual(len(new_definitions), 1)
download_mock.assert_called_once_with(
new_app_key, apps_path / new_app_key)
self.assertEqual(download_mock.call_count, 1)

@patch('requests.get')
def test_get_bucket_listing(self, mock_get):
response = Mock(spec=requests.Response)
response.status_code = 200
response.content = BUCKET_LISTING_XML
mock_get.return_value = response

result = downloader.get_bucket_listing()

self.assertEqual(len(result.contents), 1)
self.assertEqual(result.contents[0].key, APP_KEY)
22 changes: 21 additions & 1 deletion tests/golem/apps/test_app_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from mock import Mock
from mock import Mock, patch

from golem.apps.manager import AppManager
from golem.apps import (
Expand All @@ -8,6 +8,7 @@
)
from golem.testutils import TempDirFixture, DatabaseFixture

ROOT_PATH = 'golem.apps.manager'
APP_DEF = AppDefinition(
name='test_app',
requestor_env='test_env',
Expand All @@ -29,6 +30,25 @@ def setUp(self):
self.app_manager = AppManager(app_path, False)


class TestUpdateApps(AppManagerTestBase):

@patch(f'{ROOT_PATH}.download_definitions')
@patch(f'{ROOT_PATH}.EventPublisher')
def test_update(self, publisher_mock, download_mock):
download_mock.return_value = [APP_DEF]
self.app_manager.update_apps()
self.assertEqual(self.app_manager.apps(), [(APP_ID, APP_DEF)])
self.assertEqual(self.app_manager.app(APP_ID), APP_DEF)
self.assertFalse(self.app_manager.enabled(APP_ID))
self.assertEqual(publisher_mock.publish.call_count, 1)

# Definition already exists locally
download_mock.return_value = []
self.app_manager.update_apps()
self.assertEqual(self.app_manager.apps(), [(APP_ID, APP_DEF)])
self.assertEqual(publisher_mock.publish.call_count, 1)


class TestRegisterApp(AppManagerTestBase):

def test_register_app(self):
Expand Down