diff --git a/chancy/executors/asyncex.py b/chancy/executors/asyncex.py index ec4c33b..fbb0b2a 100644 --- a/chancy/executors/asyncex.py +++ b/chancy/executors/asyncex.py @@ -39,7 +39,13 @@ async def push(self, job: QueuedJob): job = await self.on_job_starting(job) task = asyncio.create_task(self._job_wrapper(job)) self.jobs[task] = job - task.add_done_callback(self.jobs.pop) + task.add_done_callback(self._on_task_done) + + def _on_task_done(self, task): + if self.free_slots == 0: + # Executor was exactly full + self.worker.queue_wake_events[self.queue.name].set() + self.jobs.pop(task) def __len__(self): return len(self.jobs) diff --git a/chancy/worker.py b/chancy/worker.py index 5677d8f..88779c0 100644 --- a/chancy/worker.py +++ b/chancy/worker.py @@ -357,10 +357,6 @@ async def _maintain_queue(self, queue: Queue): try: self._executors[queue.name] = executor - concurrency = queue.concurrency - if queue.concurrency is None: - concurrency = executor.get_default_concurrency() - await self.hub.emit( "worker.queue.started", { @@ -428,8 +424,8 @@ async def _maintain_queue(self, queue: Queue): # Otherwise, we can fetch jobs from the queue until # the executor is full. else: - maximum_jobs_to_poll = concurrency - len(executor) - if maximum_jobs_to_poll <= 0: + free_slots = executor.free_slots + if free_slots <= 0: await self.hub.emit( "worker.queue.full", { @@ -442,7 +438,9 @@ async def _maintain_queue(self, queue: Queue): async with self.chancy.pool.connection() as conn: jobs = await self.fetch_jobs( - queue, conn, up_to=maximum_jobs_to_poll + queue, + conn, + up_to=free_slots, ) for job in jobs: @@ -452,11 +450,6 @@ async def _maintain_queue(self, queue: Queue): ) await executor.push(job) - # If we pulled exactly the maximum number of jobs, - # there's likely more jobs available, so we set the - # event to skip the next sleep. - if len(jobs) == maximum_jobs_to_poll: - self.queue_wake_events[queue.name].set() finally: self._executors.pop(queue.name, None)