Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Added legacy fields to task-api comp.task rpc command #5018

Merged
merged 2 commits into from
Jan 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 54 additions & 2 deletions golem/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
TYPE_CHECKING,
)

from ethereum.utils import denoms
from golem_messages import datastructures as msg_datastructures
from golem_task_api.envs import DOCKER_GPU_ENV_ID
from pydispatch import dispatcher
from twisted.internet.defer import (
inlineCallbacks,
Expand All @@ -33,6 +35,7 @@
import golem
from golem import model
from golem.appconfig import TASKARCHIVE_MAINTENANCE_INTERVAL, AppConfig
from golem.apps.default import APPS
from golem.clientconfigdescriptor import ConfigApprover, ClientConfigDescriptor
from golem.core import variables
from golem.core.common import (
Expand Down Expand Up @@ -76,8 +79,8 @@
from golem.resource.hyperdrive.resourcesmanager import HyperdriveResourceManager
from golem.rpc import utils as rpc_utils
from golem.rpc.mapping.rpceventnames import Task, Network, Environment, UI
from golem.task import taskpreset
from golem.task import taskstate
from golem.task import taskpreset, taskstate
from golem.task.helpers import calculate_subtask_payment
from golem.task.taskarchiver import TaskArchiver
from golem.task.taskserver import TaskServer
from golem.task.tasktester import TaskTester
Expand Down Expand Up @@ -897,14 +900,63 @@ def get_task(self, task_id: str) -> Optional[dict]:
if not task:
return None
subtask_ids = rtm.get_requested_task_subtask_ids(task_id)
# time_started
if task.start_time is None:
time_started = get_timestamp_utc()
if task.status == taskstate.TaskStatus.errorCreating:
time_started -= 1
mfranciszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
else:
time_started = task.start_time.timestamp()
# proress and time_remaining
finished_subtasks = rtm.count_finished_subtasks(task.task_id)
progress = finished_subtasks / task.max_subtasks
time_remaining = None
if progress > 0.0 and not task.status.is_completed():
elapsed = task.elapsed_seconds
time_remaining = (elapsed / progress) - elapsed
# type
app_name = 'Unknown'
if task.app_id in APPS:
app = APPS[task.app_id]
app_name = app.name
# last_updated
if task.end_time is None:
last_updated = get_timestamp_utc()
else:
last_updated = task.end_time.timestamp()
# compute_on
compute_on = 'cpu'
if task.env_id == DOCKER_GPU_ENV_ID:
compute_on = 'gpu'
# estimated_cost and estimated_fee
subtask_price = calculate_subtask_payment(
task.max_price_per_hour,
task.subtask_timeout
)
estimated_cost = subtask_price * task.max_subtasks
estimated_fee = self.transaction_system.eth_for_batch_payment(
task.max_subtasks
)
task_dict = {
'id': task.task_id,
'time_remaining': time_remaining,
'subtasks_count': task.max_subtasks,
'status': task.status.value,
'progress': progress,
'time_started': time_started,
'last_updated': last_updated,
'name': task.name,
'bid': float(task.max_price_per_hour) / denoms.ether,
'compute_on': compute_on,
'concent_enabled': task.concent_enabled,
'subtask_timeout': str(timedelta(seconds=task.subtask_timeout)),
'timeout': str(timedelta(seconds=task.task_timeout)),
'type': app_name,
'options': {
'output_path': task.output_directory
},
'estimated_cost': estimated_cost,
'estimated_fee': estimated_fee,
}
else:
# OLD taskmanager
Expand Down
2 changes: 1 addition & 1 deletion golem/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def execute_sql(self, sql, params=None, require_commit=True):


class Database:
SCHEMA_VERSION = 45
SCHEMA_VERSION = 46

def __init__(self, # noqa pylint: disable=too-many-arguments
db: peewee.Database,
Expand Down
15 changes: 15 additions & 0 deletions golem/database/schemas/046_rtm_task_end_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# pylint: disable=no-member
# pylint: disable=unused-argument
import peewee as pw

SCHEMA_VERSION = 46


def migrate(migrator, database, fake=False, **kwargs):
migrator.add_fields(
'requestedtask',
end_time=pw.UTCDateTimeField(null=True))


def rollback(migrator, database, fake=False, **kwargs):
migrator.remove_fields('requestedtask', 'end_time')
2 changes: 1 addition & 1 deletion golem/ethereum/paymentprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def add( # pylint: disable=too-many-arguments
value: int,
) -> model.TaskPayment:
log.info(
"Adding payment for %s to %s (%.18f GNTB)",
"Adding payment. subtask_id=%s, receiver=%s, value=(%.18f GNTB)",
subtask_id,
eth_addr,
value / denoms.ether,
Expand Down
1 change: 1 addition & 0 deletions golem/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ class RequestedTask(BaseModel):
task_timeout = IntegerField(null=False) # seconds
subtask_timeout = IntegerField(null=False) # seconds
start_time = UTCDateTimeField(null=True)
end_time = UTCDateTimeField(null=True)

max_price_per_hour = HexIntegerField(null=False)
max_subtasks = IntegerField(null=False)
Expand Down
14 changes: 14 additions & 0 deletions golem/task/requestedtaskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ async def _task_creation_ctx(
try:
task = RequestedTask.get(task_id=task_id)
task.status = TaskStatus.errorCreating
task.end_time = default_now()
task.save()
if not app_id:
app_id = task.app_id
Expand Down Expand Up @@ -334,6 +335,7 @@ def error_creating(self, task_id: TaskId):
raise RuntimeError(f"Task {task_id} has already been started")

task.status = TaskStatus.errorCreating
task.end_time = default_now()
task.save()
self._notice_task_updated(task, op=TaskOp.ABORTED)

Expand Down Expand Up @@ -529,6 +531,7 @@ async def _verify(
if not await self.has_pending_subtasks(task_id):
if not self._get_pending_subtasks(task_id):
task.status = TaskStatus.finished
task.end_time = default_now()
task.save()

self._move_task_results(
Expand All @@ -554,6 +557,7 @@ async def abort_task(self, task_id: TaskId) -> None:
f"Task not active, can not abort. task_id={task_id}")

task.status = TaskStatus.aborted
task.end_time = default_now()
task.save()

for subtask in self._get_pending_subtasks(task_id):
Expand Down Expand Up @@ -605,6 +609,15 @@ def get_requested_task_ids() -> List[TaskId]:
tasks = RequestedTask.select(RequestedTask.task_id).execute()
return [task.task_id for task in tasks]

@staticmethod
def count_finished_subtasks(task_id: TaskId) -> float:
return RequestedSubtask.select(
fn.Count(RequestedSubtask.subtask_id)
).where(
RequestedSubtask.task_id == task_id,
RequestedSubtask.status == SubtaskStatus.finished,
).scalar()

@staticmethod
def get_requested_task_subtask_ids(task_id: TaskId) -> List[SubtaskId]:
subtasks = RequestedSubtask.select(RequestedSubtask.subtask_id) \
Expand Down Expand Up @@ -850,6 +863,7 @@ def _time_out_task(self, task_id: TaskId) -> None:
logger.info("Task timed out. task_id=%r", task_id)

task.status = TaskStatus.timeout
task.end_time = default_now()
task.save()

for subtask in self._get_pending_subtasks(task_id):
Expand Down