-
Notifications
You must be signed in to change notification settings - Fork 284
Conversation
8ec549d
to
392128f
Compare
Created NewTaskComputer class for computing tasks created with the new Task API. Support for old-style tasks is kept as well. TaskComputerAdapter was introduced to dispatch tasks between new and old task computer.
392128f
to
63ce003
Compare
@Krigpl @mfranciszkiewicz Unit tests for the NewTaskComputer are yet to come but you can already start the review. |
golem/task/taskserver.py
Outdated
@@ -419,7 +419,7 @@ def resource_failure(self, task_id: str, reason: str) -> None: | |||
|
|||
self.task_computer.task_interrupted() | |||
self.send_task_failed( | |||
self.task_computer.assigned_subtask['subtask_id'], | |||
self.task_computer.assigned_subtask_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is suspicious. It either means that assigned_subtask_id
doesn't get cleared after task_interrupted
or it is cleared and it's empty here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... This is actually a race condition. In most cases assigned_subtask_id
won't be cleaned yet. 😛
Good catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think it's better to get this fixed - like by grabbing the subtask_id before calling task_interrupted
and using that value here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, for sure.
task_id = self.assigned_task_id | ||
subtask_id = self.assigned_subtask_id | ||
computation = self._new_computer.compute() | ||
self._handle_computation_results(task_id, subtask_id, computation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That returns deferred but it's not yielded anywhere - expected? If so, please put a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's on purpose. The computation
deferred resolves once the computation is complete not when it's started. And this method is supposed to just start the computation. I remember adding a comment about it but must've deleted it by accident.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It still needs an error handler, right? twisted complains when there are unyielded deferreds with unhandled errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_handle_computation_results()
has a catch-all block so it shouldn't raise any error. I can put the _send_results()
inside this block as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, there is a comment but somehow it got three lines up.
return self._assigned_task.env_id | ||
|
||
@defer.inlineCallbacks | ||
def change_config( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what's the decision with change_config
? I believe it doesn't necessarily belong here but it's too awkward to remove just now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no final decision yet. @mfranciszkiewicz is researching this topic. This is most probably a temporary solution. And it's compatible with existing code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the method in this PR
28c07cf
to
7e42786
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Some small nitty comments, still approving
Codecov Report
@@ Coverage Diff @@
## develop #4510 +/- ##
===========================================
+ Coverage 90.29% 90.31% +0.02%
===========================================
Files 225 225
Lines 19715 19932 +217
===========================================
+ Hits 17802 18002 +200
- Misses 1913 1930 +17 |
166049c
to
54cec12
Compare
def subtask_deadline(self): | ||
return int(time.time()) + 3600 | ||
|
||
def get_task_header(self, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you could just use factories from GM, here and for ctd, and remove these helpers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will take a look at these factories.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, after a look I don't see much value in using these factories because they generate some random ID's and I want to use particular ones so that I can easily make asserts without passing them around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can set the values of the fields you're interested in. Like TaskHeaderFactory(task_id=<your id>)
, so basically very similar logic to this one here.
Not insisting but I feel it's somewhat redundant to have these helpers.
deadline=kwargs.get('subtask_deadline') or self.subtask_deadline | ||
) | ||
|
||
def patch_async(self, name, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is async
supposed to mean here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It supposed to mean that these patch
method is suitable for asynchronous functions unlike the ordinary @patch
decorator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
afaict it's only used with non-async functions here so I got confused, that's why I asked.
583788b
to
0ac8c24
Compare
35e3c7e
to
11e053d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One small thing
sync_wait(self._new_computer.prepare()) | ||
|
||
# Should this node behave as provider and compute tasks? | ||
self.compute_tasks = task_server.config_desc.accept_tasks \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compute_tasks
should become a @property
because afair task_server.config_desc.in_shutdown
is changed dynamically during runtime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a part of the config and config needs to be explicitly changed be calling change_config
. That has been the case with the old TaskComputer
.
If some code modifies the in_shutdown
setting without calling update_config
then probably it should be fixed but that's not a part of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change_config
is called on the client after updating this setting, so should be good :)
https://github.com/golemfactory/golem/blob/develop/golem/node.py#L377
* Refactored _get_task_api_service() and _get_task_dir() to use _assigned_task instead of method parameters. * Added support_direct_computation property to TaskComputerAdapter (needed by dummy task computation). * Fixed DummyTask.computation_failed() which had passing None value as a not-none parameter to DummyTask.computation_finished(). * Added unit test for NewTaskComputer._get_task_api_service() * Moved _runtime assignment after calling _app_client.compute() in NewTaskComputer.compute() (otherwise it would be always None).
11e053d
to
cadc186
Compare
Created NewTaskComputer class for computing tasks created with the new Task API. Support for old-style tasks is kept as well. TaskComputerAdapter was introduced to dispatch tasks between new and old
task computer.