Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Remove resource size from task header #3917

Merged
merged 9 commits into from
Mar 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apps/core/task/coretask.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ def __init__(self,
deadline=self._deadline,
subtask_timeout=task_definition.subtask_timeout,
subtasks_count=total_tasks,
resource_size=self.resource_size,
estimated_memory=task_definition.estimated_memory,
max_price=task_definition.max_price,
concent_enabled=task_definition.concent_enabled,
Expand Down
3 changes: 1 addition & 2 deletions golem/resource/resourcehandshake.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,7 @@ def _download_handshake_nonce(self, key_id, resource, options):
success=lambda res, files, _: self._nonce_downloaded(key_id, files),
error=lambda exc, *_: self._handshake_error(key_id, exc),
client_options=self.task_server.get_download_options(
options,
self.NONCE_TASK
options
)
)

Expand Down
4 changes: 2 additions & 2 deletions golem/task/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def _get_mask_for_task(client, task: coretask.CoreTask) -> masking.Mask:
def _inform_subsystems(client, task, packager_result):
task_id = task.header.task_id
package_path, package_sha1 = packager_result
task.header.resource_size = os.path.getsize(package_path)
resource_size = os.path.getsize(package_path)

if client.config_desc.net_masking_enabled:
task.header.mask = _get_mask_for_task(
Expand All @@ -306,7 +306,7 @@ def _inform_subsystems(client, task, packager_result):
package_path,
package_sha1,
task_id,
task.header.resource_size,
resource_size,
client_options=client_options,
)
return resource_server_result
Expand Down
3 changes: 1 addition & 2 deletions golem/task/server/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ def computed_task_reported(
task = task_manager.tasks.get(report_computed_task.task_id, None)
output_dir = task.tmp_dir if hasattr(task, 'tmp_dir') else None
client_options = task_server.get_download_options(
report_computed_task.options,
report_computed_task.task_id,
report_computed_task.options
)

fgtr = message.concents.ForceGetTaskResult(
Expand Down
10 changes: 4 additions & 6 deletions golem/task/server/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def request_resource(self, task_id, subtask_id, resources):

task_keeper = self.task_manager.comp_task_keeper
options = task_keeper.get_resources_options(subtask_id)
client_options = self.get_download_options(options, task_id)
client_options = self.get_download_options(options)
self.pull_resources(task_id, resources, client_options)
return True

Expand All @@ -98,9 +98,8 @@ def pull_resources(self, task_id, resources, client_options=None):
def get_download_options(
self,
received_options: Optional[Union[dict, HyperdriveClientOptions]],
task_id: Optional[str] = None):
size: Optional[int] = None):

task_keeper = getattr(self, 'task_keeper')
resource_manager = self._get_resource_manager()
options: Optional[HyperdriveClientOptions] = None

Expand All @@ -125,10 +124,9 @@ def _filter_options(_options):
options = received_options

options = _filter_options(options)
task_header = task_keeper.task_headers.get(task_id)

if task_header:
options.set(size=task_header.resource_size)
if size and options:
options.set(size=size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@badb there seems to be something not quite right here... unless I'm misreading...

what I mean is that previously, the size here had always been set to the resource size specified in the task header...

right now, the only call to this method that specifies the size, uses the ReportComputedTask.size which is not the resource size but rather the result size ... is that indeed what we need here? am I wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Size option can be also already specified in options (that should be the case for TaskToCompute (check line 233 in taskkeeper).
This option is used to estimate how long may it take to download resources. With ReportComputedTask we want to dowload results so their size should be used. I guess that usage of resource_size in this place was just a heuristic (we assumed that results size won't be significantly different that size of resources, which of course doesn't have to be true).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return options

def get_share_options(self, task_id: str, # noqa # pylint: disable=unused-argument
Expand Down
8 changes: 7 additions & 1 deletion golem/task/taskkeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, header: dt_tasks.TaskHeader) -> None:
self.subtasks: dict = {}
# TODO Add concent communication timeout. Issue #2406
self.keeping_deadline = comp_task_info_keeping_timeout(
self.header.subtask_timeout, self.header.resource_size)
self.header.subtask_timeout, 0)

def __repr__(self):
return "<CompTaskInfo(%r) reqs: %r>" % (
Expand Down Expand Up @@ -225,8 +225,14 @@ def receive_subtask(self, task_to_compute: message.tasks.TaskToCompute):

comp_task_info.requests -= 1
comp_task_info.subtasks[subtask_id] = comp_task_def
header = self.get_task_header(task_id)
comp_task_info.keeping_deadline = comp_task_info_keeping_timeout(
header.subtask_timeout, task_to_compute.size)

self.subtask_to_task[subtask_id] = task_id
if task_to_compute.resources_options:
task_to_compute.resources_options['options']['size'] = \
task_to_compute.size
self.resources_options[subtask_id] = task_to_compute.resources_options
self.dump()
return True
Expand Down
5 changes: 0 additions & 5 deletions golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,11 +714,6 @@ def should_accept_provider( # noqa pylint: disable=too-many-arguments,too-many-
f' < {min_accepted_perf}; {ids}')
return False

if task.header.resource_size > (int(max_resource_size) * 1024):
logger.info('insufficient provider disk size: '
f'{max_resource_size} KiB; {ids}')
return False

if task.header.estimated_memory > (int(max_memory_size) * 1024):
logger.info('insufficient provider memory size: '
f'{max_memory_size} KiB; {ids}')
Expand Down
14 changes: 14 additions & 0 deletions golem/task/tasksession.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,11 @@ def _cannot_compute(reason):
_cannot_compute(reasons.ConcentDisabled)
return

if not self._check_resource_size(msg.size):
# We don't have enough disk space available
_cannot_compute(reasons.ResourcesTooBig)
return

number_of_subtasks = self.task_server.task_keeper\
.task_headers[msg.task_id]\
.subtasks_count
Expand Down Expand Up @@ -744,6 +749,15 @@ def _cannot_compute(reason):
return
_cannot_compute(self.err_msg)

def _check_resource_size(self, resource_size):
max_resource_size_kib = self.task_server.config_desc.max_resource_size
max_resource_size = int(max_resource_size_kib) * 1024
if resource_size > max_resource_size:
logger.info('Subtask with too big resources received: '
f'{resource_size}, only {max_resource_size} available')
return False
return True

def _react_to_waiting_for_results(
self,
msg: message.tasks.WaitingForResults,
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ eth-keys==0.2.0b3
eth-tester==0.1.0-beta.24
eth-utils==1.0.3
ethereum==1.6.1
Golem-Messages==3.2.0
Golem-Messages==3.3.0
Golem-Smart-Contracts-Interface==1.6.1
greenlet==0.4.15
h2==3.0.1
Expand Down
2 changes: 1 addition & 1 deletion requirements_to-freeze.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ docker==3.5.0
enforce==0.3.4
eth-utils==1.0.3
ethereum==1.6.1
Golem-Messages==3.2.0
Golem-Messages==3.3.0
Golem-Smart-Contracts-Interface==1.6.1
html2text==2018.1.9
humanize==0.5.1
Expand Down
1 change: 0 additions & 1 deletion tests/golem/docker/test_docker_blender_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ def test_build(self):
assert isinstance(task.header.task_owner, dt_p2p.Node)
assert task.header.subtask_timeout == 1200
assert task.header.task_owner.node_name == 'some_node'
assert task.header.resource_size > 0
assert task.header.environment == 'BLENDER'
assert task.header.estimated_memory == 0
assert task.docker_images[0].repository == 'golemfactory/blender'
Expand Down
1 change: 0 additions & 1 deletion tests/golem/task/dummy/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ def __init__(self, client_id, params, num_subtasks, public_key):
deadline=timeout_to_deadline(14400),
subtask_timeout=1200,
subtasks_count=num_subtasks,
resource_size=params.shared_data_size + params.subtask_data_size,
estimated_memory=0,
max_price=MIN_PRICE,
min_version=golem.__version__,
Expand Down
2 changes: 2 additions & 0 deletions tests/golem/task/test_concent_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def setUp(self):
self.task_session.task_computer.has_assigned_task.return_value = False
self.task_session.task_server.keys_auth.ecc.raw_pubkey = \
self.keys.raw_pubkey
self.task_session.task_server.config_desc.max_resource_size = \
1024 * 1024 * 1024 * 100
self.task_session.task_server.task_keeper\
.task_headers[self.msg.task_id]\
.subtasks_count = 10
Expand Down
21 changes: 12 additions & 9 deletions tests/golem/task/test_taskkeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
from golem.environments.environment import Environment, UnsupportReason,\
SupportStatus
from golem.environments.environmentsmanager import EnvironmentsManager
from golem.network.hyperdrive.client import HyperdriveClient, \
HyperdriveClientOptions
from golem.network.hyperdrive.client import HyperdriveClient
from golem.task import taskkeeper
from golem.task.taskkeeper import TaskHeaderKeeper, CompTaskKeeper, logger
from golem.testutils import PEP8MixIn
Expand Down Expand Up @@ -432,7 +431,6 @@ def get_dict_task_header(key_id_seed="kkk"):
"subtasks_count": 1,
"max_price": 10,
"min_version": golem.__version__,
"resource_size": 0,
"estimated_memory": 0,
'mask': Mask().to_bytes(),
'timestamp': 0,
Expand Down Expand Up @@ -464,7 +462,6 @@ def _dump_some_tasks(self, tasks_dir):
header = get_task_header()
header.deadline = timeout_to_deadline(1)
header.subtask_timeout = 3
header.resource_size = 1

test_headers.append(header)
price_bid = int(random.random() * 100)
Expand All @@ -480,11 +477,16 @@ def _dump_some_tasks(self, tasks_dir):
price_bid,
header.subtask_timeout,
)
ttc = msg_factories.tasks.TaskToComputeFactory(price=price)
ttc = msg_factories.tasks.TaskToComputeFactory(
price=price,
size=1024
)
ttc.compute_task_def = ctd
ttc.resources_options = HyperdriveClientOptions(
HyperdriveClient.CLIENT_ID,
HyperdriveClient.VERSION)
ttc.resources_options = {
'client_id': HyperdriveClient.CLIENT_ID,
'version': HyperdriveClient.VERSION,
'options': {}
}
self.assertTrue(ctk.receive_subtask(ttc))
test_subtasks_ids.append(ctd['subtask_id'])
del ctk
Expand Down Expand Up @@ -687,4 +689,5 @@ def test_resources_options(self):
assert ctk.get_resources_options("unknown") is None
subtask_id = random.choice(list(ctk.subtask_to_task.keys()))
res = ctk.get_resources_options(subtask_id)
assert isinstance(res, HyperdriveClientOptions)
assert isinstance(res, dict)
assert res['client_id'] == HyperdriveClient.CLIENT_ID
17 changes: 3 additions & 14 deletions tests/golem/task/test_taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def get_example_task_header(key_id: str) -> dt_tasks.TaskHeader:
pub_port=40103,
pub_addr='1.2.3.4',
),
resource_size=2 * 1024,
estimated_memory=3 * 1024,
)

Expand Down Expand Up @@ -585,14 +584,6 @@ def _assert_log_msg(logger_mock, msg):
f'INFO:{logger.name}:insufficient provider performance: '
f'27.18 < {min_accepted_perf}; {ids}')

with self.assertLogs(logger, level='INFO') as cm:
assert not ts.should_accept_provider(
node_id, node_name, task_id, 99, 1.72, 1)
_assert_log_msg(
cm,
f'INFO:{logger.name}:insufficient provider disk size:'
f' 1.72 KiB; {ids}')

with self.assertLogs(logger, level='INFO') as cm:
assert not ts.should_accept_provider(
node_id, node_name, task_id, 999, 3, 2.7)
Expand Down Expand Up @@ -718,7 +709,7 @@ def test_download_options(self, *_):
options = HyperdriveClientOptions(HyperdriveClient.CLIENT_ID,
HyperdriveClient.VERSION)

client_options = ts.get_download_options(options, task_id='task_id')
client_options = ts.get_download_options(options)
assert client_options.peers is None

peers = [
Expand All @@ -733,11 +724,12 @@ def test_download_options(self, *_):
HyperdriveClient.VERSION,
options=dict(peers=peers))

client_options = ts.get_download_options(options, task_id='task_id')
client_options = ts.get_download_options(options, size=1024)
assert client_options.options.get('peers') == [
to_hyperg_peer('127.0.0.1', 3282),
to_hyperg_peer('1.2.3.4', 3282),
]
assert client_options.options.get('size') == 1024

def test_download_options_errors(self, *_):
built_options = Mock()
Expand All @@ -746,17 +738,14 @@ def test_download_options_errors(self, *_):

assert self.ts.get_download_options(
received_options=None,
task_id='task_id'
) is built_options

assert self.ts.get_download_options(
received_options={'options': {'peers': ['Invalid']}},
task_id='task_id'
) is built_options

assert self.ts.get_download_options(
received_options=Mock(filtered=Mock(side_effect=Exception)),
task_id='task_id'
) is built_options

def test_pause_and_resume(self, *_):
Expand Down
Loading