From a540ac40d4d4c5b67423d15d69a6d3627695954d Mon Sep 17 00:00:00 2001 From: Bo Peng Date: Mon, 23 Jul 2018 15:25:52 -0500 Subject: [PATCH] Adjust header to allow more efficient writing of task status #991 --- src/sos/tasks.py | 70 +++++++++++++++++------------------------------- 1 file changed, 24 insertions(+), 46 deletions(-) diff --git a/src/sos/tasks.py b/src/sos/tasks.py index d7fba5dc6..b85531825 100644 --- a/src/sos/tasks.py +++ b/src/sos/tasks.py @@ -159,13 +159,15 @@ class TaskFile(object): 7. compressed pickled signatures ''' TaskHeader = namedtuple('TaskHeader', - 'version status tags ' - 'new_time pending_time submitted_time running_time aborted_time failed_time completed_time last_modified ' - 'params_size pulse_size stdout_size stderr_size result_size signature_size' + 'version status last_modified ' + 'new_time pending_time submitted_time running_time aborted_time failed_time completed_time ' + 'params_size pulse_size stdout_size stderr_size result_size signature_size ' + 'tags' ) - header_fmt = '!2h 128s 8d 6i' - header_size = struct.calcsize(header_fmt) + header_fmt = '!2h 8d 6i 128s' + header_size = 220 # struct.calcsize(header_fmt) + tags_offset = 92 # struct.calcsize(status_fmt + '6i') def __init__(self, task_id: str): self.task_file = os.path.join( @@ -186,7 +188,7 @@ def save(self, params): header = self.TaskHeader( version=1, status=TaskStatus.new.value, - tags=' '.join(sorted(tags)).ljust(128).encode(), + last_modified=now, new_time=now, pending_time=0, running_time=0, @@ -194,13 +196,14 @@ def save(self, params): aborted_time=0, failed_time=0, completed_time=0, - last_modified=now, params_size=len(params_block), pulse_size=0, stdout_size=0, stderr_size=0, result_size=0, - signature_size=0 + signature_size=0, + tags=' '.join(sorted(tags)).ljust(128).encode(), + ) with open(self.task_file, 'wb+') as fh: self._write_header(fh, header) @@ -213,6 +216,7 @@ def update(self, params): now = time.time() header = header._replace( status=TaskStatus.pending.value, + last_modified=now, new_time=now, pending_time=0, submitted_time=0, @@ -220,7 +224,6 @@ def update(self, params): aborted_time=0, failed_time=0, completed_time=0, - last_modified=now, params_size=len(params_block), pulse_size=0, stdout_size=0, @@ -238,6 +241,7 @@ def _reset(self, fh): now = time.time() header = header._replace( status=TaskStatus.new.value, + last_modified=now, new_time=now, pending_time=0, submitted_time=0, @@ -245,7 +249,6 @@ def _reset(self, fh): aborted_time=0, failed_time=0, completed_time=0, - last_modified=now, pulse_size=0, stdout_size=0, stderr_size=0, @@ -362,58 +365,33 @@ def _get_status(self): def _set_status(self, status): with open(self.task_file, 'r+b') as fh: - header = self._read_header(fh) + fh.seek(2, 0) now = time.time() - if status == 'new': - header = header._replace( - status=TaskStatus[status].value, - new_time=now, last_modified=now) - elif status == 'pending': - header = header._replace( - status=TaskStatus[status].value, - pending_time=now, last_modified=now) - elif status == 'submitted': - header = header._replace( - status=TaskStatus[status].value, - submitted_time=now, last_modified=now) - elif status == 'running': - header = header._replace( - status=TaskStatus[status].value, - running_time=now, last_modified=now) - elif status == 'failed': - header = header._replace( - status=TaskStatus[status].value, - failed_time=now, last_modified=now) - elif status == 'aborted': - header = header._replace( - status=TaskStatus[status].value, - aborted_time=now, last_modified=now) - elif status == 'completed': - header = header._replace( - status=TaskStatus[status].value, - completed_time=now, last_modified=now) - else: - raise RuntimeError(f'Unrecognized task status: {status}') - self._write_header(fh, header) + sts = TaskStatus[status].value + # update status and last modified + fh.write(struct.pack('!hd', sts, now)) + # from the current location, move by status + fh.seek(sts * 4, 1) + fh.write(struct.pack('!d', now)) status = property(_get_status, _set_status) def _get_tags(self): with open(self.task_file, 'rb') as fh: - fh.seek(4, 0) + fh.seek(self.tags_offset, 0) return fh.read(128).decode().strip() def _set_tags(self, tags: list): with open(self.task_file, 'r+b') as fh: - fh.seek(4, 0) + fh.seek(self.tags_offset, 0) fh.write(' '.join( sorted(tags)).ljust(128).encode()) def add_tags(self, tags: list): with open(self.task_file, 'r+b') as fh: - fh.seek(4, 0) + fh.seek(self.tags_offset, 0) existing_tags = fh.read(128).decode().strip() - fh.seek(4, 0) + fh.seek(self.tags_offset, 0) fh.write(' '.join( sorted(tags + existing_tags.split())).ljust(128).encode())