From ab73798cf327949970edb7f989fee34b50aba7ac Mon Sep 17 00:00:00 2001 From: Brandur Date: Fri, 30 Aug 2024 19:15:07 -0400 Subject: [PATCH] Fast path for unique insertion Updates the Python client to be compatible with the fast unique insertion added to the main River in [1] which uses a unique index instead of advisory lock + fetch as long as uniqueness is constrained to the default set of unique job states. Also, not so much by design, but upgrade to sqlc v1.27.0. [1] https://github.com/riverqueue/river/pull/451 --- .github/workflows/ci.yaml | 2 +- CHANGELOG.md | 4 + docs/development.md | 2 +- src/riverqueue/client.py | 141 ++++++++------ src/riverqueue/driver/driver_protocol.py | 10 + .../driver/riversqlalchemy/dbsqlc/models.py | 3 +- .../driver/riversqlalchemy/dbsqlc/pg_misc.py | 2 +- .../riversqlalchemy/dbsqlc/river_job.py | 180 +++++++++++++++++- .../riversqlalchemy/dbsqlc/river_job.sql | 41 +++- .../riversqlalchemy/sql_alchemy_driver.py | 29 ++- src/riverqueue/job.py | 7 + tests/client_test.py | 25 +-- .../riversqlalchemy/sqlalchemy_driver_test.py | 150 ++++++++++++++- 13 files changed, 503 insertions(+), 93 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 89560a2..3135280 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -142,7 +142,7 @@ jobs: env: BIN_PATH: /home/runner/bin - SQLC_VERSION: 1.26.0 + SQLC_VERSION: 1.27.0 steps: - name: Checkout diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d6d419..4f11b92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Now compatible with "fast path" unique job insertion that uses a unique index instead of advisory lock and fetch [as introduced in River #451](https://github.com/riverqueue/river/pull/451). [PR #XXX](https://github.com/riverqueue/riverqueue-python/pull/XXX). + ## [0.6.3] - 2024-07-08 ### Fixed diff --git a/docs/development.md b/docs/development.md index a8bf526..e673bff 100644 --- a/docs/development.md +++ b/docs/development.md @@ -51,7 +51,7 @@ $ rye lint ## Run type check (Mypy) ```shell -$ make typecheck +$ make type-check ``` ## Format code diff --git a/src/riverqueue/client.py b/src/riverqueue/client.py index f6fc040..0f0b773 100644 --- a/src/riverqueue/client.py +++ b/src/riverqueue/client.py @@ -1,13 +1,12 @@ from dataclasses import dataclass, field from datetime import datetime, timezone, timedelta +from hashlib import sha256 import re from typing import ( - Awaitable, Optional, Protocol, Tuple, List, - Callable, cast, runtime_checkable, ) @@ -206,12 +205,7 @@ def to_json(self) -> str: insert_opts = InsertOpts() insert_params, unique_opts = _make_driver_insert_params(args, insert_opts) - async def insert(): - return InsertResult(await exec.job_insert(insert_params)) - - return await self.__check_unique_job( - exec, insert_params, unique_opts, insert - ) + return await self.__insert_job_with_unique(exec, insert_params, unique_opts) async def insert_tx( self, tx, args: JobArgs, insert_opts: Optional[InsertOpts] = None @@ -253,10 +247,7 @@ async def insert_tx( insert_opts = InsertOpts() insert_params, unique_opts = _make_driver_insert_params(args, insert_opts) - async def insert(): - return InsertResult(await exec.job_insert(insert_params)) - - return await self.__check_unique_job(exec, insert_params, unique_opts, insert) + return await self.__insert_job_with_unique(exec, insert_params, unique_opts) async def insert_many(self, args: List[JobArgs | InsertManyParams]) -> int: """ @@ -327,33 +318,50 @@ async def insert_many_tx(self, tx, args: List[JobArgs | InsertManyParams]) -> in exec = self.driver.unwrap_executor(tx) return await exec.job_insert_many(_make_driver_insert_params_many(args)) - async def __check_unique_job( + async def __insert_job_with_unique( self, exec: AsyncExecutorProtocol, insert_params: JobInsertParams, unique_opts: Optional[UniqueOpts], - insert_func: Callable[[], Awaitable[InsertResult]], ) -> InsertResult: """ - Checks for an existing unique job and runs `insert_func` if one is - present. + Inserts a job, accounting for unique jobs whose insertion may be skipped + if an equivalent job is already present. """ - get_params, lock_key = _build_unique_get_params_and_lock_key( - self.advisory_lock_prefix, insert_params, unique_opts + get_params, unique_key = _build_unique_get_params_and_unique_key( + insert_params, unique_opts ) - if not get_params: - return await insert_func() + if not get_params or not unique_opts: + return InsertResult(await exec.job_insert(insert_params)) + + # fast path + if ( + not unique_opts.by_state + or unique_opts.by_state.sort == UNIQUE_STATES_DEFAULT + ): + job, unique_skipped_as_duplicate = await exec.job_insert_unique( + insert_params, sha256(unique_key.encode("utf-8")).digest() + ) + return InsertResult( + job=job, unique_skipped_as_duplicated=unique_skipped_as_duplicate + ) async with exec.transaction(): - await exec.advisory_lock(lock_key) + lock_key = "unique_key" + lock_key += "kind=#{insert_params.kind}" + lock_key += unique_key + + await exec.advisory_lock( + _hash_lock_key(self.advisory_lock_prefix, lock_key) + ) existing_job = await exec.job_get_by_kind_and_unique_properties(get_params) if existing_job: return InsertResult(existing_job, unique_skipped_as_duplicated=True) - return await insert_func() + return InsertResult(await exec.job_insert(insert_params)) class Client: @@ -451,10 +459,7 @@ def to_json(self) -> str: insert_opts = InsertOpts() insert_params, unique_opts = _make_driver_insert_params(args, insert_opts) - def insert(): - return InsertResult(exec.job_insert(insert_params)) - - return self.__check_unique_job(exec, insert_params, unique_opts, insert) + return self.__insert_job_with_unique(exec, insert_params, unique_opts) def insert_tx( self, tx, args: JobArgs, insert_opts: Optional[InsertOpts] = None @@ -496,10 +501,7 @@ def insert_tx( insert_opts = InsertOpts() insert_params, unique_opts = _make_driver_insert_params(args, insert_opts) - def insert(): - return InsertResult(exec.job_insert(insert_params)) - - return self.__check_unique_job(exec, insert_params, unique_opts, insert) + return self.__insert_job_with_unique(exec, insert_params, unique_opts) def insert_many(self, args: List[JobArgs | InsertManyParams]) -> int: """ @@ -570,58 +572,72 @@ def insert_many_tx(self, tx, args: List[JobArgs | InsertManyParams]) -> int: exec = self.driver.unwrap_executor(tx) return exec.job_insert_many(_make_driver_insert_params_many(args)) - def __check_unique_job( + def __insert_job_with_unique( self, exec: ExecutorProtocol, insert_params: JobInsertParams, unique_opts: Optional[UniqueOpts], - insert_func: Callable[[], InsertResult], ) -> InsertResult: """ - Checks for an existing unique job and runs `insert_func` if one is - present. + Inserts a job, accounting for unique jobs whose insertion may be skipped + if an equivalent job is already present. """ - get_params, lock_key = _build_unique_get_params_and_lock_key( - self.advisory_lock_prefix, insert_params, unique_opts + get_params, unique_key = _build_unique_get_params_and_unique_key( + insert_params, unique_opts ) - if not get_params: - return insert_func() + if not get_params or not unique_opts: + return InsertResult(exec.job_insert(insert_params)) + + # fast path + if ( + not unique_opts.by_state + or unique_opts.by_state.sort == UNIQUE_STATES_DEFAULT + ): + job, unique_skipped_as_duplicate = exec.job_insert_unique( + insert_params, sha256(unique_key.encode("utf-8")).digest() + ) + return InsertResult( + job=job, unique_skipped_as_duplicated=unique_skipped_as_duplicate + ) with exec.transaction(): - exec.advisory_lock(lock_key) + lock_key = "unique_key" + lock_key += "kind=#{insert_params.kind}" + lock_key += unique_key + + exec.advisory_lock(_hash_lock_key(self.advisory_lock_prefix, lock_key)) existing_job = exec.job_get_by_kind_and_unique_properties(get_params) if existing_job: return InsertResult(existing_job, unique_skipped_as_duplicated=True) - return insert_func() + return InsertResult(exec.job_insert(insert_params)) -def _build_unique_get_params_and_lock_key( - advisory_lock_prefix: Optional[int], +def _build_unique_get_params_and_unique_key( insert_params: JobInsertParams, unique_opts: Optional[UniqueOpts], -) -> tuple[Optional[JobGetByKindAndUniquePropertiesParam], int]: +) -> tuple[Optional[JobGetByKindAndUniquePropertiesParam], str]: """ Builds driver get params and an advisory lock key from insert params and unique options for use during a unique job insertion. """ if unique_opts is None: - return (None, 0) + return (None, "") any_unique_opts = False get_params = JobGetByKindAndUniquePropertiesParam(kind=insert_params.kind) - lock_str = f"unique_keykind={insert_params.kind}" + unique_key = "" if unique_opts.by_args: any_unique_opts = True get_params.by_args = True get_params.args = insert_params.args - lock_str += f"&args={insert_params.args}" + unique_key += f"&args={insert_params.args}" if unique_opts.by_period: lower_period_bound = _truncate_time( @@ -634,33 +650,27 @@ def _build_unique_get_params_and_lock_key( lower_period_bound, lower_period_bound + timedelta(seconds=unique_opts.by_period), ] - lock_str += f"&period={lower_period_bound.strftime('%FT%TZ')}" + unique_key += f"&period={lower_period_bound.strftime('%FT%TZ')}" if unique_opts.by_queue: any_unique_opts = True get_params.by_queue = True get_params.queue = insert_params.queue - lock_str += f"&queue={insert_params.queue}" + unique_key += f"&queue={insert_params.queue}" if unique_opts.by_state: any_unique_opts = True get_params.by_state = True get_params.state = cast(list[str], unique_opts.by_state) - lock_str += f"&state={','.join(unique_opts.by_state)}" + unique_key += f"&state={','.join(unique_opts.by_state)}" else: get_params.state = UNIQUE_STATES_DEFAULT - lock_str += f"&state={','.join(UNIQUE_STATES_DEFAULT)}" + unique_key += f"&state={','.join(UNIQUE_STATES_DEFAULT)}" if not any_unique_opts: - return (None, 0) + return (None, "") - if advisory_lock_prefix is None: - lock_key = fnv1_hash(lock_str.encode("utf-8"), 64) - else: - prefix = advisory_lock_prefix - lock_key = (prefix << 32) | fnv1_hash(lock_str.encode("utf-8"), 32) - - return (get_params, _uint64_to_int64(lock_key)) + return (get_params, unique_key) def _check_advisory_lock_prefix_bounds( @@ -678,6 +688,21 @@ def _check_advisory_lock_prefix_bounds( return advisory_lock_prefix +def _hash_lock_key(advisory_lock_prefix: Optional[int], lock_key: str) -> int: + """ + Generates an FNV-1 hash from the given lock key string suitable for use with + a PG advisory lock while checking for the existence of a unique job. + """ + + if advisory_lock_prefix is None: + lock_key_hash = fnv1_hash(lock_key.encode("utf-8"), 64) + else: + prefix = advisory_lock_prefix + lock_key_hash = (prefix << 32) | fnv1_hash(lock_key.encode("utf-8"), 32) + + return _uint64_to_int64(lock_key_hash) + + def _make_driver_insert_params( args: JobArgs, insert_opts: InsertOpts, diff --git a/src/riverqueue/driver/driver_protocol.py b/src/riverqueue/driver/driver_protocol.py index 0cc9c86..0d5c50a 100644 --- a/src/riverqueue/driver/driver_protocol.py +++ b/src/riverqueue/driver/driver_protocol.py @@ -64,6 +64,11 @@ async def job_insert(self, insert_params: JobInsertParams) -> Job: async def job_insert_many(self, all_params) -> int: pass + async def job_insert_unique( + self, insert_params: JobInsertParams, unique_key: bytes + ) -> tuple[Job, bool]: + pass + async def job_get_by_kind_and_unique_properties( self, get_params: JobGetByKindAndUniquePropertiesParam ) -> Optional[Job]: @@ -137,6 +142,11 @@ def job_insert(self, insert_params: JobInsertParams) -> Job: def job_insert_many(self, all_params) -> int: pass + def job_insert_unique( + self, insert_params: JobInsertParams, unique_key: bytes + ) -> tuple[Job, bool]: + pass + def job_get_by_kind_and_unique_properties( self, get_params: JobGetByKindAndUniquePropertiesParam ) -> Optional[Job]: diff --git a/src/riverqueue/driver/riversqlalchemy/dbsqlc/models.py b/src/riverqueue/driver/riversqlalchemy/dbsqlc/models.py index a611a41..843c8e4 100644 --- a/src/riverqueue/driver/riversqlalchemy/dbsqlc/models.py +++ b/src/riverqueue/driver/riversqlalchemy/dbsqlc/models.py @@ -1,6 +1,6 @@ # Code generated by sqlc. DO NOT EDIT. # versions: -# sqlc v1.26.0 +# sqlc v1.27.0 import dataclasses import datetime import enum @@ -36,3 +36,4 @@ class RiverJob: state: RiverJobState scheduled_at: datetime.datetime tags: List[str] + unique_key: Optional[memoryview] diff --git a/src/riverqueue/driver/riversqlalchemy/dbsqlc/pg_misc.py b/src/riverqueue/driver/riversqlalchemy/dbsqlc/pg_misc.py index fc6cc2c..2518389 100644 --- a/src/riverqueue/driver/riversqlalchemy/dbsqlc/pg_misc.py +++ b/src/riverqueue/driver/riversqlalchemy/dbsqlc/pg_misc.py @@ -1,6 +1,6 @@ # Code generated by sqlc. DO NOT EDIT. # versions: -# sqlc v1.26.0 +# sqlc v1.27.0 # source: pg_misc.sql from typing import Any diff --git a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py index 5265e66..9c1ebeb 100644 --- a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py +++ b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py @@ -1,6 +1,6 @@ # Code generated by sqlc. DO NOT EDIT. # versions: -# sqlc v1.26.0 +# sqlc v1.27.0 # source: river_job.sql import dataclasses import datetime @@ -13,20 +13,20 @@ JOB_GET_ALL = """-- name: job_get_all \\:many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job """ JOB_GET_BY_ID = """-- name: job_get_by_id \\:one -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = :p1 """ JOB_GET_BY_KIND_AND_UNIQUE_PROPERTIES = """-- name: job_get_by_kind_and_unique_properties \\:one -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE kind = :p1 AND CASE WHEN :p2\\:\\:boolean THEN args = :p3 ELSE true END @@ -75,7 +75,7 @@ class JobGetByKindAndUniquePropertiesParams: coalesce(:p9\\:\\:timestamptz, now()), :p10\\:\\:river_job_state, coalesce(:p11\\:\\:varchar(255)[], '{}') -) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key """ @@ -149,7 +149,8 @@ class JobInsertFastManyParams: queue, scheduled_at, state, - tags + tags, + unique_key ) VALUES ( :p1\\:\\:jsonb, coalesce(:p2\\:\\:smallint, 0), @@ -164,8 +165,9 @@ class JobInsertFastManyParams: :p11\\:\\:text, coalesce(:p12\\:\\:timestamptz, now()), :p13\\:\\:river_job_state, - coalesce(:p14\\:\\:varchar(255)[], '{}') -) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + coalesce(:p14\\:\\:varchar(255)[], '{}'), + :p15 +) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key """ @@ -185,6 +187,80 @@ class JobInsertFullParams: scheduled_at: Optional[datetime.datetime] state: models.RiverJobState tags: List[str] + unique_key: Optional[memoryview] + + +JOB_INSERT_UNIQUE = """-- name: job_insert_unique \\:one +INSERT INTO river_job( + args, + created_at, + finalized_at, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags, + unique_key +) VALUES ( + :p1, + coalesce(:p2\\:\\:timestamptz, now()), + :p3, + :p4, + :p5, + coalesce(:p6\\:\\:jsonb, '{}'), + :p7, + :p8, + coalesce(:p9\\:\\:timestamptz, now()), + :p10, + coalesce(:p11\\:\\:varchar(255)[], '{}'), + :p12 +) +ON CONFLICT (kind, unique_key) WHERE unique_key IS NOT NULL + -- Something needs to be updated for a row to be returned on a conflict. + DO UPDATE SET kind = EXCLUDED.kind +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, (xmax != 0) AS unique_skipped_as_duplicate +""" + + +@dataclasses.dataclass() +class JobInsertUniqueParams: + args: Any + created_at: Optional[datetime.datetime] + finalized_at: Optional[datetime.datetime] + kind: str + max_attempts: int + metadata: Any + priority: int + queue: str + scheduled_at: Optional[datetime.datetime] + state: models.RiverJobState + tags: List[str] + unique_key: Optional[memoryview] + + +@dataclasses.dataclass() +class JobInsertUniqueRow: + id: int + args: Any + attempt: int + attempted_at: Optional[datetime.datetime] + attempted_by: Optional[List[str]] + created_at: datetime.datetime + errors: Optional[List[Any]] + finalized_at: Optional[datetime.datetime] + kind: str + max_attempts: int + metadata: Any + priority: int + queue: str + state: models.RiverJobState + scheduled_at: datetime.datetime + tags: List[str] + unique_key: Optional[memoryview] + unique_skipped_as_duplicate: bool class Querier: @@ -211,6 +287,7 @@ def job_get_all(self) -> Iterator[models.RiverJob]: state=row[13], scheduled_at=row[14], tags=row[15], + unique_key=row[16], ) def job_get_by_id(self, *, id: int) -> Optional[models.RiverJob]: @@ -234,6 +311,7 @@ def job_get_by_id(self, *, id: int) -> Optional[models.RiverJob]: state=row[13], scheduled_at=row[14], tags=row[15], + unique_key=row[16], ) def job_get_by_kind_and_unique_properties(self, arg: JobGetByKindAndUniquePropertiesParams) -> Optional[models.RiverJob]: @@ -268,6 +346,7 @@ def job_get_by_kind_and_unique_properties(self, arg: JobGetByKindAndUniqueProper state=row[13], scheduled_at=row[14], tags=row[15], + unique_key=row[16], ) def job_insert_fast(self, arg: JobInsertFastParams) -> Optional[models.RiverJob]: @@ -303,6 +382,7 @@ def job_insert_fast(self, arg: JobInsertFastParams) -> Optional[models.RiverJob] state=row[13], scheduled_at=row[14], tags=row[15], + unique_key=row[16], ) def job_insert_fast_many(self, arg: JobInsertFastManyParams) -> int: @@ -335,6 +415,7 @@ def job_insert_full(self, arg: JobInsertFullParams) -> Optional[models.RiverJob] "p12": arg.scheduled_at, "p13": arg.state, "p14": arg.tags, + "p15": arg.unique_key, }).first() if row is None: return None @@ -355,6 +436,45 @@ def job_insert_full(self, arg: JobInsertFullParams) -> Optional[models.RiverJob] state=row[13], scheduled_at=row[14], tags=row[15], + unique_key=row[16], + ) + + def job_insert_unique(self, arg: JobInsertUniqueParams) -> Optional[JobInsertUniqueRow]: + row = self._conn.execute(sqlalchemy.text(JOB_INSERT_UNIQUE), { + "p1": arg.args, + "p2": arg.created_at, + "p3": arg.finalized_at, + "p4": arg.kind, + "p5": arg.max_attempts, + "p6": arg.metadata, + "p7": arg.priority, + "p8": arg.queue, + "p9": arg.scheduled_at, + "p10": arg.state, + "p11": arg.tags, + "p12": arg.unique_key, + }).first() + if row is None: + return None + return JobInsertUniqueRow( + id=row[0], + args=row[1], + attempt=row[2], + attempted_at=row[3], + attempted_by=row[4], + created_at=row[5], + errors=row[6], + finalized_at=row[7], + kind=row[8], + max_attempts=row[9], + metadata=row[10], + priority=row[11], + queue=row[12], + state=row[13], + scheduled_at=row[14], + tags=row[15], + unique_key=row[16], + unique_skipped_as_duplicate=row[17], ) @@ -382,6 +502,7 @@ async def job_get_all(self) -> AsyncIterator[models.RiverJob]: state=row[13], scheduled_at=row[14], tags=row[15], + unique_key=row[16], ) async def job_get_by_id(self, *, id: int) -> Optional[models.RiverJob]: @@ -405,6 +526,7 @@ async def job_get_by_id(self, *, id: int) -> Optional[models.RiverJob]: state=row[13], scheduled_at=row[14], tags=row[15], + unique_key=row[16], ) async def job_get_by_kind_and_unique_properties(self, arg: JobGetByKindAndUniquePropertiesParams) -> Optional[models.RiverJob]: @@ -439,6 +561,7 @@ async def job_get_by_kind_and_unique_properties(self, arg: JobGetByKindAndUnique state=row[13], scheduled_at=row[14], tags=row[15], + unique_key=row[16], ) async def job_insert_fast(self, arg: JobInsertFastParams) -> Optional[models.RiverJob]: @@ -474,6 +597,7 @@ async def job_insert_fast(self, arg: JobInsertFastParams) -> Optional[models.Riv state=row[13], scheduled_at=row[14], tags=row[15], + unique_key=row[16], ) async def job_insert_fast_many(self, arg: JobInsertFastManyParams) -> int: @@ -506,6 +630,7 @@ async def job_insert_full(self, arg: JobInsertFullParams) -> Optional[models.Riv "p12": arg.scheduled_at, "p13": arg.state, "p14": arg.tags, + "p15": arg.unique_key, })).first() if row is None: return None @@ -526,4 +651,43 @@ async def job_insert_full(self, arg: JobInsertFullParams) -> Optional[models.Riv state=row[13], scheduled_at=row[14], tags=row[15], + unique_key=row[16], + ) + + async def job_insert_unique(self, arg: JobInsertUniqueParams) -> Optional[JobInsertUniqueRow]: + row = (await self._conn.execute(sqlalchemy.text(JOB_INSERT_UNIQUE), { + "p1": arg.args, + "p2": arg.created_at, + "p3": arg.finalized_at, + "p4": arg.kind, + "p5": arg.max_attempts, + "p6": arg.metadata, + "p7": arg.priority, + "p8": arg.queue, + "p9": arg.scheduled_at, + "p10": arg.state, + "p11": arg.tags, + "p12": arg.unique_key, + })).first() + if row is None: + return None + return JobInsertUniqueRow( + id=row[0], + args=row[1], + attempt=row[2], + attempted_at=row[3], + attempted_by=row[4], + created_at=row[5], + errors=row[6], + finalized_at=row[7], + kind=row[8], + max_attempts=row[9], + metadata=row[10], + priority=row[11], + queue=row[12], + state=row[13], + scheduled_at=row[14], + tags=row[15], + unique_key=row[16], + unique_skipped_as_duplicate=row[17], ) diff --git a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql index 55b1788..ca62c3f 100644 --- a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql +++ b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql @@ -26,6 +26,8 @@ CREATE TABLE river_job( state river_job_state NOT NULL DEFAULT 'available' ::river_job_state, scheduled_at timestamptz NOT NULL DEFAULT NOW(), tags varchar(255)[] NOT NULL DEFAULT '{}' ::varchar(255)[], + unique_key bytea, + CONSTRAINT finalized_or_finalized_at_null CHECK ( (finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR (finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded')) @@ -80,6 +82,39 @@ INSERT INTO river_job( coalesce(@tags::varchar(255)[], '{}') ) RETURNING *; +-- name: JobInsertUnique :one +INSERT INTO river_job( + args, + created_at, + finalized_at, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags, + unique_key +) VALUES ( + @args, + coalesce(sqlc.narg('created_at')::timestamptz, now()), + @finalized_at, + @kind, + @max_attempts, + coalesce(@metadata::jsonb, '{}'), + @priority, + @queue, + coalesce(sqlc.narg('scheduled_at')::timestamptz, now()), + @state, + coalesce(@tags::varchar(255)[], '{}'), + @unique_key +) +ON CONFLICT (kind, unique_key) WHERE unique_key IS NOT NULL + -- Something needs to be updated for a row to be returned on a conflict. + DO UPDATE SET kind = EXCLUDED.kind +RETURNING *, (xmax != 0) AS unique_skipped_as_duplicate; + -- name: JobInsertFastMany :execrows INSERT INTO river_job( args, @@ -120,7 +155,8 @@ INSERT INTO river_job( queue, scheduled_at, state, - tags + tags, + unique_key ) VALUES ( @args::jsonb, coalesce(@attempt::smallint, 0), @@ -135,5 +171,6 @@ INSERT INTO river_job( @queue::text, coalesce(sqlc.narg('scheduled_at')::timestamptz, now()), @state::river_job_state, - coalesce(@tags::varchar(255)[], '{}') + coalesce(@tags::varchar(255)[], '{}'), + @unique_key ) RETURNING *; \ No newline at end of file diff --git a/src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py b/src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py index 870ab37..5c16eb6 100644 --- a/src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py +++ b/src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py @@ -50,6 +50,19 @@ async def job_insert_many(self, all_params: list[JobInsertParams]) -> int: ) return len(all_params) + async def job_insert_unique( + self, insert_params: JobInsertParams, unique_key: bytes + ) -> tuple[Job, bool]: + insert_unique_params = cast(river_job.JobInsertUniqueParams, insert_params) + insert_unique_params.unique_key = memoryview(unique_key) + + res = cast( # drop Optional[] because insert always returns a row + river_job.JobInsertUniqueRow, + await self.job_querier.job_insert_unique(insert_unique_params), + ) + + return job_from_row(res), res.unique_skipped_as_duplicate + async def job_get_by_kind_and_unique_properties( self, get_params: JobGetByKindAndUniquePropertiesParam ) -> Optional[Job]: @@ -115,6 +128,19 @@ def job_insert_many(self, all_params: list[JobInsertParams]) -> int: self.job_querier.job_insert_fast_many(_build_insert_many_params(all_params)) return len(all_params) + def job_insert_unique( + self, insert_params: JobInsertParams, unique_key: bytes + ) -> tuple[Job, bool]: + insert_unique_params = cast(river_job.JobInsertUniqueParams, insert_params) + insert_unique_params.unique_key = memoryview(unique_key) + + res = cast( # drop Optional[] because insert always returns a row + river_job.JobInsertUniqueRow, + self.job_querier.job_insert_unique(insert_unique_params), + ) + + return job_from_row(res), res.unique_skipped_as_duplicate + def job_get_by_kind_and_unique_properties( self, get_params: JobGetByKindAndUniquePropertiesParam ) -> Optional[Job]: @@ -186,7 +212,7 @@ def _build_insert_many_params( return insert_many_params -def job_from_row(row: models.RiverJob) -> Job: +def job_from_row(row: models.RiverJob | river_job.JobInsertUniqueRow) -> Job: """ Converts an internal sqlc generated row to the top level type, issuing a few minor transformations along the way. Timestamps are changed from local @@ -214,4 +240,5 @@ def to_utc(t: datetime) -> datetime: scheduled_at=to_utc(row.scheduled_at), state=cast(JobState, row.state), tags=row.tags, + unique_key=cast(Optional[bytes], row.unique_key), ) diff --git a/src/riverqueue/job.py b/src/riverqueue/job.py index ca58209..d448bc0 100644 --- a/src/riverqueue/job.py +++ b/src/riverqueue/job.py @@ -192,6 +192,13 @@ class Job: help group and categorize jobs. """ + unique_key: Optional[bytes] + """ + A unique key for the job within its kind that's used for unique job + insertions. It's generated by hashing an inserted job's unique opts + configuration. + """ + @dataclass class AttemptError: diff --git a/tests/client_test.py b/tests/client_test.py index 705f980..b8e9723 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -159,16 +159,16 @@ def to_json() -> str: def test_insert_with_unique_opts_by_args(client, mock_exec, simple_args): insert_opts = InsertOpts(unique_opts=UniqueOpts(by_args=True)) - mock_exec.job_get_by_kind_and_unique_properties.return_value = None - mock_exec.job_insert.return_value = "job_row" + # fast path + mock_exec.job_insert_unique.return_value = ("job_row", False) insert_res = client.insert(simple_args, insert_opts=insert_opts) - mock_exec.job_insert.assert_called_once() + mock_exec.job_insert_unique.assert_called_once() assert insert_res.job == "job_row" # Check that the UniqueOpts were correctly processed - call_args = mock_exec.job_insert.call_args[0][0] + call_args = mock_exec.job_insert_unique.call_args[0][0] assert call_args.kind == "simple" @@ -180,38 +180,39 @@ def test_insert_with_unique_opts_by_period( insert_opts = InsertOpts(unique_opts=UniqueOpts(by_period=900)) - mock_exec.job_get_by_kind_and_unique_properties.return_value = None - mock_exec.job_insert.return_value = "job_row" + # fast path + mock_exec.job_insert_unique.return_value = ("job_row", False) insert_res = client.insert(simple_args, insert_opts=insert_opts) - mock_exec.job_insert.assert_called_once() + mock_exec.job_insert_unique.assert_called_once() assert insert_res.job == "job_row" # Check that the UniqueOpts were correctly processed - call_args = mock_exec.job_insert.call_args[0][0] + call_args = mock_exec.job_insert_unique.call_args[0][0] assert call_args.kind == "simple" def test_insert_with_unique_opts_by_queue(client, mock_exec, simple_args): insert_opts = InsertOpts(unique_opts=UniqueOpts(by_queue=True)) - mock_exec.job_get_by_kind_and_unique_properties.return_value = None - mock_exec.job_insert.return_value = "job_row" + # fast path + mock_exec.job_insert_unique.return_value = ("job_row", False) insert_res = client.insert(simple_args, insert_opts=insert_opts) - mock_exec.job_insert.assert_called_once() + mock_exec.job_insert_unique.assert_called_once() assert insert_res.job == "job_row" # Check that the UniqueOpts were correctly processed - call_args = mock_exec.job_insert.call_args[0][0] + call_args = mock_exec.job_insert_unique.call_args[0][0] assert call_args.kind == "simple" def test_insert_with_unique_opts_by_state(client, mock_exec, simple_args): insert_opts = InsertOpts(unique_opts=UniqueOpts(by_state=["available", "running"])) + # slow path mock_exec.job_get_by_kind_and_unique_properties.return_value = None mock_exec.job_insert.return_value = "job_row" diff --git a/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py b/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py index eefe9c2..90a3680 100644 --- a/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py +++ b/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py @@ -106,6 +106,7 @@ async def test_insert_job_from_row(self, client, simple_args, test_tx): scheduled_at=datetime.now(), state=JobState.COMPLETED, tags=[], + unique_key=b"unique_key", ) ) @@ -122,6 +123,7 @@ async def test_insert_job_from_row(self, client, simple_args, test_tx): ] assert job.finalized_at.tzinfo == timezone.utc assert job.metadata == dict(foo="metadata") + assert job.unique_key == b"unique_key" # # tests below this line should match what are in the sync client tests below @@ -159,10 +161,14 @@ async def test_insert_with_opts(self, client, simple_args): @pytest.mark.asyncio async def test_insert_with_unique_opts_by_args(self, client, simple_args): insert_opts = InsertOpts(unique_opts=UniqueOpts(by_args=True)) + insert_res = await client.insert(simple_args, insert_opts=insert_opts) assert insert_res.job + assert not insert_res.unique_skipped_as_duplicated + insert_res2 = await client.insert(simple_args, insert_opts=insert_opts) - assert insert_res.job == insert_res2.job + assert insert_res2.job == insert_res.job + assert insert_res2.unique_skipped_as_duplicated @patch("datetime.datetime") @pytest.mark.asyncio @@ -174,27 +180,90 @@ async def test_insert_with_unique_opts_by_period( ) insert_opts = InsertOpts(unique_opts=UniqueOpts(by_period=900)) + insert_res = await client.insert(simple_args, insert_opts=insert_opts) + assert insert_res.job + assert not insert_res.unique_skipped_as_duplicated + insert_res2 = await client.insert(simple_args, insert_opts=insert_opts) - assert insert_res.job == insert_res2.job + assert insert_res2.job == insert_res.job + assert insert_res2.unique_skipped_as_duplicated @pytest.mark.asyncio async def test_insert_with_unique_opts_by_queue(self, client, simple_args): insert_opts = InsertOpts(unique_opts=UniqueOpts(by_queue=True)) + insert_res = await client.insert(simple_args, insert_opts=insert_opts) assert insert_res.job + assert not insert_res.unique_skipped_as_duplicated + insert_res2 = await client.insert(simple_args, insert_opts=insert_opts) - assert insert_res.job == insert_res2.job + assert insert_res2.job == insert_res.job + assert insert_res2.unique_skipped_as_duplicated @pytest.mark.asyncio async def test_insert_with_unique_opts_by_state(self, client, simple_args): insert_opts = InsertOpts( unique_opts=UniqueOpts(by_state=[JobState.AVAILABLE, JobState.RUNNING]) ) + + insert_res = await client.insert(simple_args, insert_opts=insert_opts) + assert insert_res.job + assert not insert_res.unique_skipped_as_duplicated + + insert_res2 = await client.insert(simple_args, insert_opts=insert_opts) + assert insert_res2.job == insert_res.job + assert insert_res2.unique_skipped_as_duplicated + + @patch("datetime.datetime") + @pytest.mark.asyncio + async def test_insert_with_unique_opts_all_fast_path( + self, mock_datetime, client, simple_args + ): + mock_datetime.now.return_value = datetime( + 2024, 6, 1, 12, 0, 0, tzinfo=timezone.utc + ) + + insert_opts = InsertOpts( + unique_opts=UniqueOpts(by_args=True, by_period=900, by_queue=True) + ) + + insert_res = await client.insert(simple_args, insert_opts=insert_opts) + assert insert_res.job + assert not insert_res.unique_skipped_as_duplicated + + insert_res2 = await client.insert(simple_args, insert_opts=insert_opts) + assert insert_res2.job == insert_res.job + assert insert_res2.unique_skipped_as_duplicated + + @patch("datetime.datetime") + @pytest.mark.asyncio + async def test_insert_with_unique_opts_all_slow_path( + self, mock_datetime, client, simple_args + ): + mock_datetime.now.return_value = datetime( + 2024, 6, 1, 12, 0, 0, tzinfo=timezone.utc + ) + + insert_opts = InsertOpts( + unique_opts=UniqueOpts( + by_args=True, + by_period=900, + by_queue=True, + by_state=[ + JobState.AVAILABLE, + JobState.RUNNING, + ], # non-default states activate slow path + ) + ) + insert_res = await client.insert(simple_args, insert_opts=insert_opts) assert insert_res.job + assert not insert_res.unique_skipped_as_duplicated + insert_res2 = await client.insert(simple_args, insert_opts=insert_opts) - assert insert_res.job == insert_res2.job + assert insert_res2.job == insert_res.job + assert insert_res2.unique_skipped_as_duplicated @pytest.mark.asyncio async def test_insert_many_with_only_args(self, client, simple_args): @@ -272,10 +341,14 @@ def test_insert_with_unique_opts_by_args(self, client, simple_args): print("self", self) print("client", client) insert_opts = InsertOpts(unique_opts=UniqueOpts(by_args=True)) + insert_res = client.insert(simple_args, insert_opts=insert_opts) assert insert_res.job + assert not insert_res.unique_skipped_as_duplicated + insert_res2 = client.insert(simple_args, insert_opts=insert_opts) - assert insert_res.job == insert_res2.job + assert insert_res2.job == insert_res.job + assert insert_res2.unique_skipped_as_duplicated @patch("datetime.datetime") def test_insert_with_unique_opts_by_period( @@ -286,25 +359,86 @@ def test_insert_with_unique_opts_by_period( ) insert_opts = InsertOpts(unique_opts=UniqueOpts(by_period=900)) + insert_res = client.insert(simple_args, insert_opts=insert_opts) + assert insert_res.job + assert not insert_res.unique_skipped_as_duplicated + insert_res2 = client.insert(simple_args, insert_opts=insert_opts) - assert insert_res.job == insert_res2.job + assert insert_res2.job == insert_res.job + assert insert_res2.unique_skipped_as_duplicated def test_insert_with_unique_opts_by_queue(self, client, simple_args): insert_opts = InsertOpts(unique_opts=UniqueOpts(by_queue=True)) + insert_res = client.insert(simple_args, insert_opts=insert_opts) assert insert_res.job + assert not insert_res.unique_skipped_as_duplicated + insert_res2 = client.insert(simple_args, insert_opts=insert_opts) - assert insert_res.job == insert_res2.job + assert insert_res2.job == insert_res.job + assert insert_res2.unique_skipped_as_duplicated def test_insert_with_unique_opts_by_state(self, client, simple_args): insert_opts = InsertOpts( unique_opts=UniqueOpts(by_state=[JobState.AVAILABLE, JobState.RUNNING]) ) + insert_res = client.insert(simple_args, insert_opts=insert_opts) assert insert_res.job + assert not insert_res.unique_skipped_as_duplicated + + insert_res2 = client.insert(simple_args, insert_opts=insert_opts) + assert insert_res2.job == insert_res.job + assert insert_res2.unique_skipped_as_duplicated + + @patch("datetime.datetime") + def test_insert_with_unique_opts_all_fast_path( + self, mock_datetime, client, simple_args + ): + mock_datetime.now.return_value = datetime( + 2024, 6, 1, 12, 0, 0, tzinfo=timezone.utc + ) + + insert_opts = InsertOpts( + unique_opts=UniqueOpts(by_args=True, by_period=900, by_queue=True) + ) + + insert_res = client.insert(simple_args, insert_opts=insert_opts) + assert insert_res.job + assert not insert_res.unique_skipped_as_duplicated + + insert_res2 = client.insert(simple_args, insert_opts=insert_opts) + assert insert_res2.job == insert_res.job + assert insert_res2.unique_skipped_as_duplicated + + @patch("datetime.datetime") + def test_insert_with_unique_opts_all_slow_path( + self, mock_datetime, client, simple_args + ): + mock_datetime.now.return_value = datetime( + 2024, 6, 1, 12, 0, 0, tzinfo=timezone.utc + ) + + insert_opts = InsertOpts( + unique_opts=UniqueOpts( + by_args=True, + by_period=900, + by_queue=True, + by_state=[ + JobState.AVAILABLE, + JobState.RUNNING, + ], # non-default states activate slow path + ) + ) + + insert_res = client.insert(simple_args, insert_opts=insert_opts) + assert insert_res.job + assert not insert_res.unique_skipped_as_duplicated + insert_res2 = client.insert(simple_args, insert_opts=insert_opts) - assert insert_res.job == insert_res2.job + assert insert_res2.job == insert_res.job + assert insert_res2.unique_skipped_as_duplicated def test_insert_many_with_only_args(self, client, simple_args): num_inserted = client.insert_many([simple_args])