Skip to content

Commit

Permalink
Adjust header to allow more efficient writing of task status #991
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Jul 23, 2018
1 parent abaad89 commit a540ac4
Showing 1 changed file with 24 additions and 46 deletions.
70 changes: 24 additions & 46 deletions src/sos/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,15 @@ class TaskFile(object):
7. compressed pickled signatures
'''
TaskHeader = namedtuple('TaskHeader',
'version status tags '
'new_time pending_time submitted_time running_time aborted_time failed_time completed_time last_modified '
'params_size pulse_size stdout_size stderr_size result_size signature_size'
'version status last_modified '
'new_time pending_time submitted_time running_time aborted_time failed_time completed_time '
'params_size pulse_size stdout_size stderr_size result_size signature_size '
'tags'
)

header_fmt = '!2h 128s 8d 6i'
header_size = struct.calcsize(header_fmt)
header_fmt = '!2h 8d 6i 128s'
header_size = 220 # struct.calcsize(header_fmt)
tags_offset = 92 # struct.calcsize(status_fmt + '6i')

def __init__(self, task_id: str):
self.task_file = os.path.join(
Expand All @@ -186,21 +188,22 @@ def save(self, params):
header = self.TaskHeader(
version=1,
status=TaskStatus.new.value,
tags=' '.join(sorted(tags)).ljust(128).encode(),
last_modified=now,
new_time=now,
pending_time=0,
running_time=0,
submitted_time=0,
aborted_time=0,
failed_time=0,
completed_time=0,
last_modified=now,
params_size=len(params_block),
pulse_size=0,
stdout_size=0,
stderr_size=0,
result_size=0,
signature_size=0
signature_size=0,
tags=' '.join(sorted(tags)).ljust(128).encode(),

)
with open(self.task_file, 'wb+') as fh:
self._write_header(fh, header)
Expand All @@ -213,14 +216,14 @@ def update(self, params):
now = time.time()
header = header._replace(
status=TaskStatus.pending.value,
last_modified=now,
new_time=now,
pending_time=0,
submitted_time=0,
running_time=0,
aborted_time=0,
failed_time=0,
completed_time=0,
last_modified=now,
params_size=len(params_block),
pulse_size=0,
stdout_size=0,
Expand All @@ -238,14 +241,14 @@ def _reset(self, fh):
now = time.time()
header = header._replace(
status=TaskStatus.new.value,
last_modified=now,
new_time=now,
pending_time=0,
submitted_time=0,
running_time=0,
aborted_time=0,
failed_time=0,
completed_time=0,
last_modified=now,
pulse_size=0,
stdout_size=0,
stderr_size=0,
Expand Down Expand Up @@ -362,58 +365,33 @@ def _get_status(self):

def _set_status(self, status):
with open(self.task_file, 'r+b') as fh:
header = self._read_header(fh)
fh.seek(2, 0)
now = time.time()
if status == 'new':
header = header._replace(
status=TaskStatus[status].value,
new_time=now, last_modified=now)
elif status == 'pending':
header = header._replace(
status=TaskStatus[status].value,
pending_time=now, last_modified=now)
elif status == 'submitted':
header = header._replace(
status=TaskStatus[status].value,
submitted_time=now, last_modified=now)
elif status == 'running':
header = header._replace(
status=TaskStatus[status].value,
running_time=now, last_modified=now)
elif status == 'failed':
header = header._replace(
status=TaskStatus[status].value,
failed_time=now, last_modified=now)
elif status == 'aborted':
header = header._replace(
status=TaskStatus[status].value,
aborted_time=now, last_modified=now)
elif status == 'completed':
header = header._replace(
status=TaskStatus[status].value,
completed_time=now, last_modified=now)
else:
raise RuntimeError(f'Unrecognized task status: {status}')
self._write_header(fh, header)
sts = TaskStatus[status].value
# update status and last modified
fh.write(struct.pack('!hd', sts, now))
# from the current location, move by status
fh.seek(sts * 4, 1)
fh.write(struct.pack('!d', now))

status = property(_get_status, _set_status)

def _get_tags(self):
with open(self.task_file, 'rb') as fh:
fh.seek(4, 0)
fh.seek(self.tags_offset, 0)
return fh.read(128).decode().strip()

def _set_tags(self, tags: list):
with open(self.task_file, 'r+b') as fh:
fh.seek(4, 0)
fh.seek(self.tags_offset, 0)
fh.write(' '.join(
sorted(tags)).ljust(128).encode())

def add_tags(self, tags: list):
with open(self.task_file, 'r+b') as fh:
fh.seek(4, 0)
fh.seek(self.tags_offset, 0)
existing_tags = fh.read(128).decode().strip()
fh.seek(4, 0)
fh.seek(self.tags_offset, 0)
fh.write(' '.join(
sorted(tags + existing_tags.split())).ljust(128).encode())

Expand Down

0 comments on commit a540ac4

Please sign in to comment.