Skip to content

Commit

Permalink
[REF] queue_job: Use set.discard(...) instead of SafeSet
Browse files Browse the repository at this point in the history
  • Loading branch information
florentx committed Dec 16, 2024
1 parent cf2ff24 commit a22f22c
Showing 1 changed file with 10 additions and 30 deletions.
40 changes: 10 additions & 30 deletions queue_job/jobrunner/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ def __contains__(self, o):
def add(self, o):
if o is None:
raise ValueError()
if o in self._removed:
self._removed.remove(o)
self._removed.discard(o)
if o in self._known:
return
self._known.add(o)
Expand All @@ -87,8 +86,7 @@ def remove(self, o):
raise ValueError()
if o not in self._known:
return
if o not in self._removed:
self._removed.add(o)
self._removed.add(o)

def pop(self):
while True:
Expand All @@ -104,24 +102,6 @@ def pop(self):
return o


class SafeSet(set):
"""A set that does not raise KeyError when removing non-existent items.
>>> s = SafeSet()
>>> s.remove(1)
>>> len(s)
0
>>> s.remove(1)
"""

def remove(self, o):
# pylint: disable=missing-return,except-pass
try:
super().remove(o)
except KeyError:
pass


@total_ordering
class ChannelJob(object):
"""A channel job is attached to a channel and holds the properties of a
Expand Down Expand Up @@ -408,8 +388,8 @@ def __init__(self, name, parent, capacity=None, sequential=False, throttle=0):
self.parent.children[name] = self
self.children = {}
self._queue = ChannelQueue()
self._running = SafeSet()
self._failed = SafeSet()
self._running = set()
self._failed = set()
self._pause_until = 0 # utc seconds since the epoch
self.capacity = capacity
self.throttle = throttle # seconds
Expand Down Expand Up @@ -463,8 +443,8 @@ def __str__(self):
def remove(self, job):
"""Remove a job from the channel."""
self._queue.remove(job)
self._running.remove(job)
self._failed.remove(job)
self._running.discard(job)
self._failed.discard(job)
if self.parent:
self.parent.remove(job)

Expand All @@ -484,8 +464,8 @@ def set_pending(self, job):
"""
if job not in self._queue:
self._queue.add(job)
self._running.remove(job)
self._failed.remove(job)
self._running.discard(job)
self._failed.discard(job)
if self.parent:
self.parent.remove(job)
_logger.debug("job %s marked pending in channel %s", job.uuid, self)
Expand All @@ -498,7 +478,7 @@ def set_running(self, job):
if job not in self._running:
self._queue.remove(job)
self._running.add(job)
self._failed.remove(job)
self._failed.discard(job)

Check warning on line 481 in queue_job/jobrunner/channels.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/channels.py#L481

Added line #L481 was not covered by tests
if self.parent:
self.parent.set_running(job)
_logger.debug("job %s marked running in channel %s", job.uuid, self)
Expand All @@ -507,7 +487,7 @@ def set_failed(self, job):
"""Mark the job as failed."""
if job not in self._failed:
self._queue.remove(job)
self._running.remove(job)
self._running.discard(job)
self._failed.add(job)
if self.parent:
self.parent.remove(job)
Expand Down

0 comments on commit a22f22c

Please sign in to comment.