Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Update to Task API v 0.16.0
Browse files Browse the repository at this point in the history
* Import environment IDs from Task API instead of declaring them in
  Golem code.
* Read the environment ID and prerequisites from CreateTask reply. Save
  the obtained values in RequestedTask model.

Signed-off-by: Adam Wierzbicki <awierzbicki@golem.network>
  • Loading branch information
Wiezzel committed Sep 10, 2019
1 parent 1347ae6 commit 1d86bdc
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 16 deletions.
2 changes: 1 addition & 1 deletion golem/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def execute_sql(self, sql, params=None, require_commit=True):


class Database:
SCHEMA_VERSION = 35
SCHEMA_VERSION = 36

def __init__(self, # noqa pylint: disable=too-many-arguments
db: peewee.Database,
Expand Down
17 changes: 17 additions & 0 deletions golem/database/schemas/036_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# pylint: disable=no-member
# pylint: disable=unused-argument
# pylint: disable=unused-variable

import peewee as pw

SCHEMA_VERSION = 36


def migrate(migrator, database, fake=False, **kwargs):
migrator.add_fields(
'requestedtask',
env_id=pw.CharField(max_length=255, null=True))


def rollback(migrator, database, fake=False, **kwargs):
migrator.remove_fields('requestedtask', 'env_id')
2 changes: 1 addition & 1 deletion golem/envs/docker/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from dataclasses import dataclass, field, asdict
from docker.errors import APIError
from golem_task_api.envs import DOCKER_CPU_ENV_ID
from twisted.internet.defer import Deferred, inlineCallbacks, succeed
from twisted.internet.threads import deferToThread
from urllib3.contrib.pyopenssl import WrappedSocket
Expand Down Expand Up @@ -47,7 +48,6 @@
mem = CONSTRAINT_KEYS['mem']
cpu = CONSTRAINT_KEYS['cpu']

DOCKER_CPU_ENV_ID = 'docker_cpu'
DOCKER_CPU_METADATA = EnvMetadata(
id=DOCKER_CPU_ENV_ID,
description='Docker environment using CPU'
Expand Down
2 changes: 1 addition & 1 deletion golem/envs/docker/gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, Dict, List, Optional

from dataclasses import dataclass, field
from golem_task_api.envs import DOCKER_GPU_ENV_ID

from golem.core.common import update_dict
from golem.envs import (
Expand All @@ -22,7 +23,6 @@
logger = getLogger(__name__)


DOCKER_GPU_ENV_ID = 'docker_gpu'
DOCKER_GPU_METADATA = EnvMetadata(
id=DOCKER_GPU_ENV_ID,
description='Docker environment using GPU'
Expand Down
1 change: 1 addition & 0 deletions golem/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ class RequestedTask(BaseModel):
name = CharField(null=True)
status = StringEnumField(enum_type=taskstate.TaskStatus, null=False)

env_id = CharField(null=True)
prerequisites = JsonField(null=False, default='{}')

task_timeout = IntegerField(null=False) # milliseconds
Expand Down
7 changes: 5 additions & 2 deletions golem/task/requestedtaskmanager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from datetime import timedelta
import logging
from pathlib import Path
Expand Down Expand Up @@ -121,7 +122,6 @@ def create_task(
app_id=golem_params.app_id,
name=golem_params.name,
status=TaskStatus.creating,
# prerequisites='{}',
task_timeout=golem_params.task_timeout,
subtask_timeout=golem_params.subtask_timeout,
start_time=default_now(),
Expand Down Expand Up @@ -169,11 +169,14 @@ async def init_task(self, task_id: TaskId) -> None:

app_client = await self._get_app_client(task.app_id)
logger.debug('init_task(task_id=%r) - creating task', task_id)
await app_client.create_task(
reply = await app_client.create_task(
task.task_id,
task.max_subtasks,
task.app_params,
)
task.env_id = reply.env_id
task.prerequisites = json.loads(reply.prerequisites_json)
task.save()
logger.debug('init_task(task_id=%r) after', task_id)

@staticmethod
Expand Down
5 changes: 3 additions & 2 deletions golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dataclasses import dataclass
from golem_messages.message.tasks import ComputeTaskDef, TaskHeader
from golem_task_api import ProviderAppClient, constants as task_api_constants
from golem_task_api.envs import DOCKER_CPU_ENV_ID, DOCKER_GPU_ENV_ID
from pydispatch import dispatcher
from twisted.internet import defer

Expand All @@ -22,8 +23,8 @@
from golem.docker.manager import DockerManager
from golem.docker.task_thread import DockerTaskThread
from golem.envs import EnvId
from golem.envs.docker.cpu import DockerCPUConfig, DOCKER_CPU_ENV_ID
from golem.envs.docker.gpu import DockerGPUConfig, DOCKER_GPU_ENV_ID
from golem.envs.docker.cpu import DockerCPUConfig
from golem.envs.docker.gpu import DockerGPUConfig
from golem.hardware import scale_memory, MemSize
from golem.manager.nodestatesnapshot import ComputingSubtaskStateSnapshot
from golem.resource.dirmanager import DirManager
Expand Down
6 changes: 2 additions & 4 deletions tests/golem/task/test_newtaskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from golem_messages.message import ComputeTaskDef
from golem_task_api import ProviderAppClient, TaskApiService
from golem_task_api.envs import DOCKER_CPU_ENV_ID
from twisted.internet import defer
from twisted.trial.unittest import TestCase as TwistedTestCase

Expand All @@ -14,10 +15,7 @@
from golem.core.deferred import deferred_from_future
from golem.core.statskeeper import IntStatsKeeper
from golem.envs import Runtime
from golem.envs.docker.cpu import (
DOCKER_CPU_ENV_ID,
DockerCPUConfig
)
from golem.envs.docker.cpu import DockerCPUConfig
from golem.task.envmanager import EnvironmentManager
from golem.task.taskcomputer import NewTaskComputer
from golem.testutils import TempDirFixture
Expand Down
43 changes: 39 additions & 4 deletions tests/golem/task/test_requestedtaskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# ^^ Pytest fixtures in the same file require the same name

import asyncio
import json
from pathlib import Path

from freezegun import freeze_time
Expand Down Expand Up @@ -78,23 +79,34 @@ def test_create_task(self):
async def test_init_task(self, mock_client):
# given
task_id = self._create_task()
env_id = 'test_env'
prerequisites = {'key': 'value'}
mock_client.create_task.return_value = Mock(
env_id=env_id,
prerequisites_json=json.dumps(prerequisites)
)
# when
await self.rtm.init_task(task_id)
row = RequestedTask.get(RequestedTask.task_id == task_id)
# then
assert row.status == TaskStatus.creating
assert row.env_id == env_id
assert row.prerequisites == prerequisites
mock_client.create_task.assert_called_once_with(
row.task_id,
row.max_subtasks,
row.app_params
)
self.app_manager.enabled.assert_called_once_with(row.app_id)

@pytest.mark.usefixtures('mock_client')
@pytest.mark.asyncio
async def test_init_task_wrong_status(self):
async def test_init_task_wrong_status(self, mock_client):
# given
task_id = self._create_task()
mock_client.create_task.return_value = Mock(
env_id='env_id',
prerequisites_json='null'
)
# when
await self.rtm.init_task(task_id)
# Start task to change the status
Expand All @@ -103,11 +115,14 @@ async def test_init_task_wrong_status(self):
with pytest.raises(RuntimeError):
await self.rtm.init_task(task_id)

@pytest.mark.usefixtures('mock_client')
@pytest.mark.asyncio
async def test_start_task(self):
async def test_start_task(self, mock_client):
# given
task_id = self._create_task()
mock_client.create_task.return_value = Mock(
env_id='env_id',
prerequisites_json='null'
)
await self.rtm.init_task(task_id)
# when
self.rtm.start_task(task_id)
Expand All @@ -127,6 +142,10 @@ def test_task_not_exists(self):
async def test_has_pending_subtasks(self, mock_client):
# given
mock_client.has_pending_subtasks.return_value = True
mock_client.create_task.return_value = Mock(
env_id='env_id',
prerequisites_json='null'
)

task_id = self._create_task()
await self.rtm.init_task(task_id)
Expand All @@ -141,6 +160,10 @@ async def test_has_pending_subtasks(self, mock_client):
async def test_get_next_subtask(self, mock_client):
# given
self._add_next_subtask_to_client_mock(mock_client)
mock_client.create_task.return_value = Mock(
env_id='env_id',
prerequisites_json='null'
)

task_id = self._create_task()
await self.rtm.init_task(task_id)
Expand All @@ -164,6 +187,10 @@ async def test_verify(self, mock_client):
# given
self._add_next_subtask_to_client_mock(mock_client)
mock_client.verify.return_value = True
mock_client.create_task.return_value = Mock(
env_id='env_id',
prerequisites_json='null'
)

task_id = self._create_task()
await self.rtm.init_task(task_id)
Expand Down Expand Up @@ -195,6 +222,10 @@ async def test_verify_failed(self, mock_client):
# given
self._add_next_subtask_to_client_mock(mock_client)
mock_client.verify.return_value = False
mock_client.create_task.return_value = Mock(
env_id='env_id',
prerequisites_json='null'
)

task_id = self._create_task()
await self.rtm.init_task(task_id)
Expand Down Expand Up @@ -223,6 +254,10 @@ async def test_verify_failed(self, mock_client):
async def test_abort(self, mock_client):
# given
self._add_next_subtask_to_client_mock(mock_client)
mock_client.create_task.return_value = Mock(
env_id='env_id',
prerequisites_json='null'
)

task_id = self._create_task()
await self.rtm.init_task(task_id)
Expand Down
2 changes: 1 addition & 1 deletion tests/golem/task/test_taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from golem_messages.factories.datastructures import p2p as dt_p2p_factory
from golem_messages.message import ComputeTaskDef
from golem_messages.utils import encode_hex as encode_key_id, pubkey_to_address
from golem_task_api.envs import DOCKER_CPU_ENV_ID
from requests import HTTPError

from golem import testutils
Expand All @@ -35,7 +36,6 @@
UnsupportReason,
)
from golem.envs import Environment as NewEnv
from golem.envs.docker.cpu import DOCKER_CPU_ENV_ID
from golem.network.hyperdrive.client import HyperdriveClientOptions, \
HyperdriveClient, to_hyperg_peer
from golem.resource import resourcemanager
Expand Down

0 comments on commit 1d86bdc

Please sign in to comment.