Skip to content

Commit

Permalink
use new unique jobs implementation
Browse files Browse the repository at this point in the history
This moves the library to use the new unique jobs implementation from
riverqueue/river#590 and migrates the sqlalchemy
driver to use a unified insertion path, allowing bulk inserts to use
unique jobs.
  • Loading branch information
bgentry committed Jan 29, 2025
1 parent 580a39c commit 31a2cd1
Show file tree
Hide file tree
Showing 20 changed files with 530 additions and 1,605 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE ${TEST_DATABASE_NAME};" ${ADMIN_DATABASE_URL}

- name: river migrate-up
run: river migrate-up --database-url "$TEST_DATABASE_URL" --max-steps 5 # temporarily include max steps so tests can pass with unique fixes
run: river migrate-up --database-url "$TEST_DATABASE_URL"

- name: Test
run: rye test
Expand Down Expand Up @@ -109,7 +109,7 @@ jobs:
run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE ${DATABASE_NAME};" ${ADMIN_DATABASE_URL}

- name: river migrate-up
run: river migrate-up --database-url "$DATABASE_URL" --max-steps 5 # temporarily include max steps so tests can pass with unique fixes
run: river migrate-up --database-url "$DATABASE_URL"

- name: Run examples
run: rye run python3 -m examples.all
Expand Down
14 changes: 2 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ class JobArgs(Protocol):
pass
```

* `kind` is a unique string that identifies them the job in the database, and which a Go worker will recognize.
* `to_json()` defines how the job will serialize to JSON, which of course will have to be parseable as an object in Go.
- `kind` is a unique string that identifies them the job in the database, and which a Go worker will recognize.
- `to_json()` defines how the job will serialize to JSON, which of course will have to be parseable as an object in Go.

They may also respond to `insert_opts()` with an instance of `InsertOpts` to define insertion options that'll be used for all jobs of the kind.

Expand Down Expand Up @@ -95,16 +95,6 @@ insert_res.job
insert_res.unique_skipped_as_duplicated
```

### Custom advisory lock prefix

Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:

```python
client = riverqueue.Client(riversqlalchemy.Driver(engine), advisory_lock_prefix=123456)
```

Doing so has the downside of leaving only 32 bits for River's locks (64 bits total - 32-bit prefix), making them somewhat more likely to conflict with each other.

## Inserting jobs in bulk

Use `#insert_many` to bulk insert jobs as a single operation for improved efficiency:
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# all-features: false
# with-sources: false
# generate-hashes: false
# universal: false

-e file:.
asyncpg==0.29.0
Expand Down
1 change: 1 addition & 0 deletions requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# all-features: false
# with-sources: false
# generate-hashes: false
# universal: false

-e file:.
sqlalchemy==2.0.30
Expand Down
Loading

0 comments on commit 31a2cd1

Please sign in to comment.