From debafb1d4cf383027a1f6145e77e2f31eba0dfab Mon Sep 17 00:00:00 2001 From: Marek Franciszkiewicz Date: Fri, 22 Nov 2019 17:34:31 +0100 Subject: [PATCH] Fix: execute _resource_downloaded on successful resource download --- golem/task/taskserver.py | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/golem/task/taskserver.py b/golem/task/taskserver.py index e6076510bb..bf93d73d6e 100644 --- a/golem/task/taskserver.py +++ b/golem/task/taskserver.py @@ -502,6 +502,12 @@ def task_given( task_header.subtask_budget, msg.want_to_compute_task.price) self.task_computer.task_given(msg.compute_task_def, cpu_time_limit) + resource_downloaded = functools.partial( + self._resource_downloaded, + msg.subtask_id, + msg.requestor_id, + msg.price) + if task_header.environment_prerequisites: subtask_inputs_dir = self.task_computer.get_subtask_inputs_dir() resources_options = msg.resources_options or dict() @@ -516,10 +522,14 @@ def task_given( ) for resource_id in msg.compute_task_def['resources'] ] - defer.gatherResults(deferred_list, consumeErrors=True)\ - .addCallbacks( - lambda _: self.resource_collected(msg.task_id), - lambda e: self.resource_failure(msg.task_id, e)) + defer.gatherResults( + deferred_list, + consumeErrors=True, + ).addCallback( + lambda _: resource_downloaded() + ).addCallbacks( + lambda _: self.resource_collected(msg.task_id), + lambda e: self.resource_failure(msg.task_id, e)) else: self.request_resource( msg.task_id, @@ -527,15 +537,24 @@ def task_given( msg.compute_task_def['resources'], msg.resources_options, ) + resource_downloaded() + + return True + + def _resource_downloaded( + self, + subtask_id: str, + requestor_id: str, + price: int, + ) -> None: self.requested_tasks.clear() - update_requestor_assigned_sum(msg.requestor_id, msg.price) + update_requestor_assigned_sum(requestor_id, price) dispatcher.send( signal='golem.subtask', event='started', - subtask_id=msg.subtask_id, - price=msg.price, + subtask_id=subtask_id, + price=price, ) - return True def resource_collected(self, task_id: str) -> bool: if self.task_computer.assigned_task_id != task_id: