Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Use res_id instead task_id in ResourceManager (#3942)
Browse files Browse the repository at this point in the history
* res_id instead of task_id in Resource

* task_id to res_id in ResourceCached

* res_id in resource_manager

* res_id in resource_server

* client and taskcomputer resource related messages should not use task
  • Loading branch information
badb authored Mar 13, 2019
1 parent ee3b978 commit c5dc191
Show file tree
Hide file tree
Showing 15 changed files with 207 additions and 202 deletions.
11 changes: 4 additions & 7 deletions golem/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,14 +611,11 @@ def quit(self):
if self.db:
self.db.close()

def task_resource_send(self, task_id):
self.task_server.task_manager.resources_send(task_id)
def resource_collected(self, res_id):
self.task_server.task_computer.resource_collected(res_id)

def task_resource_collected(self, task_id):
self.task_server.task_computer.task_resource_collected(task_id)

def task_resource_failure(self, task_id, reason):
self.task_server.task_computer.task_resource_failure(task_id, reason)
def resource_failure(self, res_id, reason):
self.task_server.task_computer.resource_failure(res_id, reason)

@rpc_utils.expose('comp.tasks.check.abort')
def abort_test_task(self) -> bool:
Expand Down
80 changes: 40 additions & 40 deletions golem/resource/base/resourceserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ class TransferStatus(Enum):

class PendingResource(object):

def __init__(self, resource, task_id, client_options, status):
def __init__(self, resource, res_id, client_options, status):
self.resource = resource
self.task_id = task_id
self.res_id = res_id
self.client_options = client_options
self.status = status

Expand Down Expand Up @@ -59,69 +59,69 @@ def get_distributed_resource_root(self):
def sync_network(self):
self._download_resources()

def add_task(self, pkg_path, pkg_sha1, task_id, pkg_size, # noqa pylint:disable=too-many-arguments
def add_resources(self, pkg_path, pkg_sha1, res_id, pkg_size, # noqa pylint:disable=too-many-arguments
client_options=None) -> Deferred:
_result = Deferred()
_result.addErrback(self._add_task_error)
_result.addErrback(self._add_res_error)

def callback(r):
value = r, pkg_path, pkg_sha1, pkg_size
_result.callback(value)

_deferred = self.resource_manager.add_task(
[pkg_path], task_id, client_options=client_options)
_deferred = self.resource_manager.add_resources(
[pkg_path], res_id, client_options=client_options)
_deferred.addCallback(callback)
_deferred.addErrback(_result.errback)

return _result

def create_resource_package(self, files, task_id) -> Deferred:
resource_dir = self.resource_manager.storage.get_dir(task_id)
package_path = os.path.join(resource_dir, task_id)
def create_resource_package(self, files, res_id) -> Deferred:
resource_dir = self.resource_manager.storage.get_dir(res_id)
package_path = os.path.join(resource_dir, res_id)
request = golem_async.AsyncRequest(
self.packager.create,
package_path, files,
)
return golem_async.async_run(request)

@staticmethod
def _add_task_error(error):
logger.error("Resource server: add_task error: %r", error)
def _add_res_error(error):
logger.error("Resource server: add_resources error: %r", error)
return error # continue with the errback chain

def remove_task(self, task_id):
self.resource_manager.remove_task(task_id)
def remove_resources(self, res_id):
self.resource_manager.remove_resources(res_id)

def download_resources(self, resources, task_id, client_options=None):
def download_resources(self, resources, res_id, client_options=None):
with self._lock:
for resource in resources:
self._add_pending_resource(resource, task_id, client_options)
self._add_pending_resource(resource, res_id, client_options)

collected = not self.pending_resources.get(task_id)
collected = not self.pending_resources.get(res_id)

if collected:
self.client.task_resource_collected(task_id)
self.client.resource_collected(res_id)

def _add_pending_resource(self, resource, task_id, client_options):
if task_id not in self.pending_resources:
self.pending_resources[task_id] = []
def _add_pending_resource(self, resource, res_id, client_options):
if res_id not in self.pending_resources:
self.pending_resources[res_id] = []

self.pending_resources[task_id].append(PendingResource(
resource, task_id, client_options, TransferStatus.idle
self.pending_resources[res_id].append(PendingResource(
resource, res_id, client_options, TransferStatus.idle
))

def _remove_pending_resource(self, resource, task_id):
def _remove_pending_resource(self, resource, res_id):
with self._lock:
pending_resources = self.pending_resources.get(task_id, [])
pending_resources = self.pending_resources.get(res_id, [])

for i, pending_resource in enumerate(pending_resources):
if pending_resource.resource == resource:
pending_resources.pop(i)
break

if not pending_resources:
self.pending_resources.pop(task_id, None)
return task_id
self.pending_resources.pop(res_id, None)
return res_id

def _download_resources(self, async_=True):
download_statuses = [TransferStatus.idle, TransferStatus.failed]
Expand All @@ -135,31 +135,31 @@ def _download_resources(self, async_=True):
entry.status = TransferStatus.transferring

self.resource_manager.pull_resource(
entry.resource, entry.task_id,
entry.resource, entry.res_id,
client_options=entry.client_options,
success=self._download_success,
error=self._download_error,
async_=async_
)

def _download_success(self, resource, _, task_id):
def _download_success(self, resource, _, res_id):
if not resource:
self._download_error("Downloaded an empty resource package",
resource, task_id)
resource, res_id)
return

if not self._remove_pending_resource(resource, task_id):
logger.warning("Resources for task %r were re-downloaded", task_id)
if not self._remove_pending_resource(resource, res_id):
logger.warning("Resources for id %r were re-downloaded", res_id)
return

self._extract_task_resources(resource, task_id)
self._extract_resources(resource, res_id)

def _download_error(self, error, resource, task_id):
self._remove_pending_resource(resource, task_id)
self.client.task_resource_failure(task_id, error)
def _download_error(self, error, resource, res_id):
self._remove_pending_resource(resource, res_id)
self.client.resource_failure(res_id, error)

def _extract_task_resources(self, resource, task_id):
resource_dir = self.resource_manager.storage.get_dir(task_id)
def _extract_resources(self, resource, res_id):
resource_dir = self.resource_manager.storage.get_dir(res_id)
ctk = self.client.task_server.task_manager.comp_task_keeper

def extract_packages(package_files):
Expand All @@ -170,12 +170,12 @@ def extract_packages(package_files):
logger.info('Extracting task resource: %r', package_path)
self.packager.extract(package_path, resource_dir)

ctk.add_package_paths(task_id, package_paths)
ctk.add_package_paths(res_id, package_paths)

async_req = golem_async.AsyncRequest(extract_packages, resource[1])
golem_async.async_run(async_req).addCallbacks(
lambda _: self.client.task_resource_collected(task_id),
lambda e: self._download_error(e, resource, task_id)
lambda _: self.client.resource_collected(res_id),
lambda e: self._download_error(e, resource, res_id)
)

def start_accepting(self):
Expand Down
68 changes: 34 additions & 34 deletions golem/resource/hyperdrive/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,23 @@ class ResourceError(RuntimeError):

class Resource:

__slots__ = ('hash', 'files', 'path', 'task_id')
__slots__ = ('hash', 'files', 'path', 'res_id')

def __init__(self, resource_hash, task_id=None, files=None, path=None):
def __init__(self, resource_hash, res_id=None, files=None, path=None):
self.hash = resource_hash
self.task_id = task_id
self.res_id = res_id
self.files = files
self.path = path

def __eq__(self, other):
return other and \
self.task_id == other.task_id and \
self.res_id == other.res_id and \
self.hash == other.hash and \
self.path == other.path and \
self.files == other.files

def __str__(self):
return 'Resource(hash: {}, task: {})'.format(self.hash, self.task_id)
return 'Resource(hash: {}, id: {})'.format(self.hash, self.res_id)

def __repr__(self):
return str(self)
Expand Down Expand Up @@ -73,18 +73,18 @@ def __init__(self):
self._hash_to_res = dict()
# path to resource
self._path_to_res = dict()
# task to resources
self._task_to_res = dict()
# task to resource common prefix
self._task_to_prefix = dict()
# id to resources
self._id_to_res = dict()
# id to resource common prefix
self._id_to_prefix = dict()

def add_resource(self, resource):
task_id = resource.task_id
res_id = resource.res_id

with self._lock:
resource_list = self._task_to_res.get(task_id)
resource_list = self._id_to_res.get(res_id)
if not resource_list:
self._task_to_res[task_id] = resource_list = list()
self._id_to_res[res_id] = resource_list = list()
resource_list.append(resource)

self._hash_to_res[resource.hash] = resource
Expand All @@ -97,34 +97,34 @@ def get_by_path(self, resource_path, default=None):
return self._path_to_res.get(resource_path, default)

def has_resource(self, resource):
if resource.task_id and resource.task_id not in self._task_to_res:
if resource.res_id and resource.res_id not in self._id_to_res:
return False
if resource.hash and resource.hash not in self._hash_to_res:
return False
return resource.path in self._path_to_res

def get_resources(self, task_id, default=None):
return self._task_to_res.get(task_id, default or [])
def get_resources(self, res_id, default=None):
return self._id_to_res.get(res_id, default or [])

def set_prefix(self, task_id, prefix):
self._task_to_prefix[task_id] = norm_path(prefix)
def set_prefix(self, res_id, prefix):
self._id_to_prefix[res_id] = norm_path(prefix)

def get_prefix(self, task_id, default=''):
return self._task_to_prefix.get(task_id, default)
def get_prefix(self, res_id, default=''):
return self._id_to_prefix.get(res_id, default)

def remove(self, task_id):
resources = self._task_to_res.pop(task_id, [])
def remove(self, res_id):
resources = self._id_to_res.pop(res_id, [])
for r in resources:
self._hash_to_res.pop(r.hash, None)
self._path_to_res.pop(r.path, None)
self._task_to_prefix.pop(task_id, None)
self._id_to_prefix.pop(res_id, None)
return resources

def clear(self):
self._hash_to_res = dict()
self._path_to_res = dict()
self._task_to_res = dict()
self._task_to_prefix = dict()
self._id_to_res = dict()
self._id_to_prefix = dict()


class ResourceStorage(object):
Expand All @@ -134,25 +134,25 @@ def __init__(self, dir_manager, resource_dir_method):
self.resource_dir_method = resource_dir_method
self.cache = ResourceCache()

def get_dir(self, task_id):
return norm_path(self.resource_dir_method(task_id))
def get_dir(self, res_id):
return norm_path(self.resource_dir_method(res_id))

def get_path(self, relative_file_path, task_id):
resource_dir = self.get_dir(task_id)
def get_path(self, relative_file_path, res_id):
resource_dir = self.get_dir(res_id)
return os.path.join(resource_dir, norm_path(relative_file_path))

def get_root(self):
return self.dir_manager.get_node_dir()

def get_resources(self, task_id) -> List[Resource]:
return self.cache.get_resources(task_id)
def get_resources(self, res_id) -> List[Resource]:
return self.cache.get_resources(res_id)

def exists(self, resource):
return self.cache.has_resource(resource) and resource.exists

def relative_path(self, path, task_id):
def relative_path(self, path, res_id):
path = norm_path(path)
common_prefix = self.cache.get_prefix(task_id)
common_prefix = self.cache.get_prefix(res_id)
return relative_path(path, common_prefix)

def copy_dir(self, src_dir):
Expand All @@ -164,10 +164,10 @@ def copy_dir(self, src_dir):
copy_file_tree(src_dir, root_dir)
return True

def copy(self, src_path, dst_relative_path, task_id):
def copy(self, src_path, dst_relative_path, res_id):

dst_relative_path = norm_path(dst_relative_path)
dst_path = self.get_path(dst_relative_path, task_id)
dst_path = self.get_path(dst_relative_path, res_id)
src_path = norm_path(src_path)

if os.path.isfile(dst_path):
Expand Down
Loading

0 comments on commit c5dc191

Please sign in to comment.