-
-
Notifications
You must be signed in to change notification settings - Fork 10
Description
Description
When a job with a unique_key is fetched by a worker and is in the running state, pushing another job with the same unique_key resets the original job's state back to pending. This behavior allows multiple workers to pick up and execute the same job instance concurrently.
This race condition leads to unexpected behavior and ultimately causes a KeyError in the process executor when multiple workers try to update the status of the same completed job. The intended purpose of unique_key—to prevent duplicate job execution for tasks already in progress—is not being met in this scenario.
Steps to Reproduce
- Define a long-running job (e.g., one that includes asyncio.sleep(5)).
- Start at least one chancy worker.
- Push the long-running job with a specific unique_key.
- While the first job is being executed by a worker (i.e., its state is running), push the exact same job with the same unique_key again.
- Observe the worker logs to see the same job ID being pulled and executed a second time.
import asyncio
import logging
from chancy import Chancy, Queue, QueuedJob, Worker, job
from chancy.app import setup_default_logger
@job()
async def long_running_job(*, job_context: QueuedJob):
print(f"Long running job started: {job_context.id}")
await asyncio.sleep(5)
print(f"Long running job completed: {job_context.id}")
async def main():
async with Chancy(
"postgresql://postgres:postgres@localhost:5432/postgres",
log=setup_default_logger(logging.DEBUG)
) as chancy:
await chancy.migrate()
async with Worker(chancy) as worker:
await chancy.declare(Queue("default"))
await chancy.push(long_running_job.job.with_unique_key("unique_long_running_job"))
await asyncio.sleep(1)
await chancy.push(long_running_job.job.with_unique_key("unique_long_running_job"))
await asyncio.sleep(1)
await worker.wait_for_shutdown()
if __name__ == "__main__":
asyncio.run(main())Expected Behavior
When the second chancy.push calls are made, they should be ignored because a job with the same unique_key is already running state. The job should be executed only once from start to finish.
Actual Behavior
The running job's state is reset to pending, causing another worker to fetch and execute the same job ID. This leads to concurrent execution and terminates with a KeyError.
Logs & Traceback
[2025-08-15 17:04:30][DEBUG] Announcing worker to the cluster.
[2025-08-15 17:04:30][INFO] Started listening for realtime notifications.
[2025-08-15 17:04:30][DEBUG] Polling for queue changes.
[2025-08-15 17:04:30][DEBUG] Polling for queue changes.
[2025-08-15 17:04:30][INFO] Pulled '03e5bdc6-4fff-9527-91a5-ce8b9cb22196' ('__main__.long_running_job') for queue 'default'
Long running job started: 03e5bdc6-4fff-9527-91a5-ce8b9cb22196
[2025-08-15 17:04:31][INFO] Pulled '03e5bdc6-4fff-9527-91a5-ce8b9cb22196' ('__main__.long_running_job') for queue 'default'
Long running job started: 03e5bdc6-4fff-9527-91a5-ce8b9cb22196
Long running job completed: 03e5bdc6-4fff-9527-91a5-ce8b9cb22196
[2025-08-15 17:04:36][DEBUG] Processing 1 outgoing updates.
Long running job completed: 03e5bdc6-4fff-9527-91a5-ce8b9cb22196
[2025-08-15 17:04:36][DEBUG] Job 03e5bdc6-4fff-9527-91a5-ce8b9cb22196 (__main__.long_running_job({}) failed with an exception
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.13/concurrent/futures/process.py", line 254, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "/home/mrsshr/repos/chancy-test/.venv/lib/python3.13/site-packages/chancy/executors/process.py", line 204, in job_wrapper
pids_for_job.pop(job.id)
~~~~~~~~~~~~~~~~^^^^^^^^
File "<string>", line 2, in pop
File "/usr/lib/python3.13/multiprocessing/managers.py", line 847, in _callmethod
raise convert_to_error(kind, result)
KeyError: '03e5bdc6-4fff-9527-91a5-ce8b9cb22196'
"""
The above exception was the direct cause of the following exception:
KeyError: '03e5bdc6-4fff-9527-91a5-ce8b9cb22196'
[2025-08-15 17:04:37][DEBUG] Processing 1 outgoing updates.
[2025-08-15 17:04:40][DEBUG] Polling for queue changes.
id | queue | func | kwargs | limits | meta | state | priority | attempts | max_attempts | taken_by | created_at | started_at | completed_at | scheduled_at | unique_key | errors
--------------------------------------+---------+---------------------------+--------+--------+------+--------+----------+----------+--------------+--------------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
03e5bdc6-4fff-9527-91a5-ce8b9cb22196 | default | __main__.long_running_job | {} | [] | {} | failed | 0 | 1 | 1 | bba3da6a-5677-477f-b015-280e692137cc | 2025-08-15 08:04:30.149132+00 | 2025-08-15 08:04:31.172894+00 | 2025-08-15 08:04:36.294421+00 | 2025-08-15 08:04:29.986917+00 | unique_long_running_job | [{"attempt": 0, "traceback": "concurrent.futures.process._RemoteTraceback: \n\"\"\"\nTraceback (most recent call last):\n File \"/usr/lib/python3.13/concurrent/futures/process.py\", line 254, in _process_worker\n r = call_item.fn(*call_item.args, **call_item.kwargs)\n File \"/home/mrsshr/repos/chancy-test/.venv/lib/python3.13/site-packages/chancy/executors/process.py\", line 204, in job_wrapper\n pids_for_job.pop(job.id)\n ~~~~~~~~~~~~~~~~^^^^^^^^\n File \"<string>\", line 2, in pop\n File \"/usr/lib/python3.13/multiprocessing/managers.py\", line 847, in _callmethod\n raise convert_to_error(kind, result)\nKeyError: '03e5bdc6-4fff-9527-91a5-ce8b9cb22196'\n\"\"\"\n\nThe above exception was the direct cause of the following exception:\n\nKeyError: '03e5bdc6-4fff-9527-91a5-ce8b9cb22196'\n"}]
Analysis of Potential Cause
The issue appears to stem from the DO UPDATE clause in the SQL query used for pushing jobs.
ON CONFLICT (unique_key)
WHERE
unique_key IS NOT NULL
AND state NOT IN ('succeeded', 'failed')
DO UPDATE
SET
state = EXCLUDED.state
RETURNING id;The WHERE state NOT IN ('succeeded', 'failed') condition evaluates to true for a new job being inserted. Consequently, the DO UPDATE statement executes, resetting the job's state to pending (the EXCLUDED.state), which re-queues it for another worker to pick up.
Possible Solution
To return the id while preventing this, change the statement as follows:
ON CONFLICT (unique_key)
WHERE
unique_key IS NOT NULL
AND state NOT IN ('succeeded', 'failed')
DO UPDATE
SET
state = {jobs}.state -- Change the update statement to a no-op operation
RETURNING id;