diff --git a/.env b/.env
new file mode 100644
index 0000000..8e24cbf
--- /dev/null
+++ b/.env
@@ -0,0 +1,7 @@
+DATABASE_HOST="localhost"
+DATABASE_PORT="5432"
+DATABASE_USER="postgres"
+DATABASE_PASS="password"
+DATABASE_NAME="swoop"
+DBMATE_MIGRATIONS_TABLE="swoop.schema_migrations"
+DATABASE_URL="postgres://${DATABASE_USER}:${DATABASE_PASS}@${DATABASE_HOST}:${DATABASE_PORT}/${DATABASE_NAME}?sslmode=disable"
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 9765c29..0410567 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -49,6 +49,19 @@ repos:
language: system
types: [text]
stages: [commit, push, manual]
+ - id: sqlfluff-fix
+ name: sqlfluff-fix
+ # Set a couple of default flags:
+ # - `--force` to disable confirmation
+ # - `--show-lint-violations` shows issues to not require running `sqlfluff lint`
+ # - `--processes 0` to use maximum parallelism
+ # By default, this hook applies all rules.
+ entry: sqlfluff fix --force --show-lint-violations --processes 0
+ language: python
+ description: "Fixes sql lint errors with `SQLFluff`"
+ types: [sql]
+ require_serial: true
+ exclude: ^db/schema.sql
# Currently disabled due to incomplete typing
#- id: mypy
# name: mypy
diff --git a/.sqlfluff b/.sqlfluff
new file mode 100644
index 0000000..2f4ec4a
--- /dev/null
+++ b/.sqlfluff
@@ -0,0 +1,20 @@
+[sqlfluff]
+dialect = postgres
+exclude_rules = ST07
+
+[sqlfluff:indentation]
+tab_space_size = 2
+indented_joins = True
+
+[sqlfluff:rules:capitalisation.identifiers]
+extended_capitalisation_policy = lower
+
+[sqlfluff:rules:capitalisation.functions]
+extended_capitalisation_policy = lower
+
+[sqlfluff:rules:capitalisation.types]
+extended_capitalisation_policy = lower
+
+[sqlfluff:rules:layout.long_lines]
+ignore_comment_lines = True
+ignore_comment_clauses = True
diff --git a/README.md b/README.md
index 197f0ee..b8a398f 100644
--- a/README.md
+++ b/README.md
@@ -10,24 +10,7 @@ brew install dbmate
## Database Setup / Migrations
-The DB schema and migrations are managed by [Dbmate](https://github.com/amacneil/dbmate#commands).
-
-Existing migrations can be found in: `/db/migrations/`
-
-### Database setup:
-
-Create a `.env` file (specifying a `user`, `password`, `port`):
-```
-touch .env
-
-echo "DATABASE_URL=\"postgres://{user}:{password}@127.0.0.1:{port}/swoop?sslmode=disable\"" >> .env2
-```
-
-Create the database and tables:
-```
-dbmate up
-```
-
+Instructions for this can be found in the Database README, found at: `/db/README.md`
## Environment Setup and Testing
diff --git a/db/Dockerfile b/db/Dockerfile
new file mode 100644
index 0000000..d5dd507
--- /dev/null
+++ b/db/Dockerfile
@@ -0,0 +1,32 @@
+FROM postgres:15
+
+# May not be ideal, I think it might be upgrading the pg version
+# and inflating the image size, but it works for testing.
+
+# install build deps and pg_partman
+RUN set -x && \
+ apt-get update && \
+ apt-get install -y postgresql-15-partman curl make patch perl && \
+ apt-get clean -y && \
+ rm -r /var/lib/apt/lists/*
+
+# install pg_prove
+RUN cpan TAP::Parser::SourceHandler::pgTAP
+
+
+# install dbmate and wrapper
+COPY dbmate /usr/local/bin/dbmate
+RUN set -x && \
+ DBMATE="/usr/local/bin/_dbmate" && \
+ ARCH="$(a="$(uname -m)"; [ "$a" = "aarch64" ] && echo "amd64" || echo "$a")" && \
+ curl -fsSL -o "$DBMATE" https://github.com/amacneil/dbmate/releases/download/v2.2.0/dbmate-linux-$ARCH && \
+ chmod +x "$DBMATE"
+
+# install pgtap
+RUN set -x && \
+ tmp="$(mktemp -d)" && \
+ trap "rm -rf '$tmp'" EXIT && \
+ cd "$tmp" && \
+ curl -fsSL https://github.com/theory/pgtap/archive/refs/tags/v1.2.0.tar.gz -o pgtap.tar.gz && \
+ tar -xzf pgtap.tar.gz --strip-components 1 && \
+ make install
diff --git a/db/README.md b/db/README.md
new file mode 100644
index 0000000..1c0260f
--- /dev/null
+++ b/db/README.md
@@ -0,0 +1,171 @@
+# `swoop-db`
+
+The swoop database schema is managed via `dbmate`. `dbmate` will apply all
+migration files found in the `./migrations` directory. We wrap the [upstream
+dbmate tool](https://github.com/amacneil/dbmate) with a bash script
+(`./dbmate`) to enforce a custom workflow where `./schema.sql` is not a "schema
+only" dump made after applying migrations but is instead a "manually-created
+reference schema" not generated by any `dbmate` commands.
+
+The `dbmate` help is overridden by the wrapper script and includes details
+about what is overridden and how. The main things of note:
+
+* `./schema.sql` is not automatically dumped by `dbmate` commands
+* `create` has an option to load a schema into the database
+* `verify` is a new command that will check differences between `./schema.sql`
+ and the schema generated by applying all of the migration files
+* `test` is a new command (specific to swoop) to run the database tests
+
+The `dbmate` wrapper can be run from the local OS against the postgres docker
+container, but is typically easier to run within the container itself. See
+details below.
+
+### What is a "schema"?
+
+Some have suggested that schema is an overloaded term, so it makes sense to
+better define what that means.
+
+In the context of postgres tooling like `pg_dump`, a schema is the structure of
+the database including any types, functions, or proceedures (among,
+potentially, many other things). With this definition perhaps it is better said
+what a schema does not include: tables rows. Thus, a "schema" in this view is
+everything but the data.
+
+This perspective ignores the fact that some data may actually be part of the
+structure of the database and required for proper database operations. Such
+data should be considered different than data inserted by applications using
+the database, and would therefore be considered an aspect of the schema we need
+to track.
+
+Moreover, some "schema" is actually generated from other commands, such that
+the set of sql commands required to reproduce a given "schema" may actually be
+much more limited than the result of running those commands. In an sense this
+is like any build process where the minimal set of source artifacts actually
+produce much more output than went in as they are built into the output
+artifacts. In such a case, the accepted best practice is to track only that
+minimal set of inputs, as the rest can be generated again at build time.
+
+A clear example of this could be something like enabling a database extension.
+The command to do so would be something like `CREATE EXTENSION
+;`. When running a schema-only dump with `pg_dump`, all tables,
+types, and other non-data items created in the database from that command would
+be present in the output. But from our perpective as application developers, we
+don't really care what the extension created, we just care to know we need to
+run `CREATE EXTENSION ;`. Therefore we should only track that
+single command in our schema.
+
+In these ways the schema we track in `./schema.sql` is different than what one
+gets running `pg_dump` and exporting only the schema. And in this way
+`./schema.sql` is different than what `dbmate` would dump into said file. But
+the content we end up with in `./schema.sql` is much more useful for our
+purposes.
+
+### Why wrap `dbmate`?
+
+The above difference in schema definitions is the reason. For more on the idea
+and intent behind the dbmate wrapper, see [this dbmate
+discussion](https://github.com/amacneil/dbmate/discussions/433). The choice to
+use a wrapper here is simply a pragmatic one; long-term either merging this
+behaivor into `dbmate` or creating a dedicated cli tool that uses `dbmate` as a
+library is preferred.
+
+### What does it mean if the schema and migrations are out of sync?
+
+The `./schema.sql` file represents what we want a new instance of the database
+to look like, whereas the migrations are a way to capture what operations need
+to be done to update an existing database from an older schema state to a new
+one. Therefore, when we want make changes to `./schema.sql`, we need a
+corresponding migration(s) to update existing databases with the older state.
+
+Or, if we approach this from the other way around: if we make a migration to
+make changes to existing databases, we also need to update the `./schema.sql`
+in a corresponding manner.
+
+In the event that changes are made to `./schema.sql` without a migraiton also
+making those changes, or if we have a migration and fail to update
+`./schema.sql`, then the schema and the migrations are out of sync. The
+`verify` command added by the `dbmate` wrapper is used to detect this condition
+and will provide a diff to help resolve any inconsistencies.
+
+## Extensions
+
+`swoop-db` makes use of two postgres extensions:
+
+* `pg_partman`: an automated table partition manager
+* `pgtap`: a postgres-native testing framework
+
+## Database testing with docker
+
+`./Dockerfile` defines the build steps for a database test container. The
+container includes the requsite postgres extensions and any other required
+utilities like `dbmate` and `pg_prove`. As the Dockerfile builds an image with
+all the database dependencies with fixed versions, using docker with that image
+is strongly recommended for all testing to help guarantee consistency between
+developers (running postgres in another way is fine if desired, but does
+require that the necessary extensions and utilities are installed, and that the
+connection information is correctly configured for tooling).
+
+To make using the docker container more convenient, a `docker-compose.yml` file
+is provided in the project root. The repo contents are mounted as `/swoop`
+inside the container to help facilitate database operations and testing using
+the included utilities. For example, to bring up the database and run the
+tests:
+
+```shell
+# load the .env vars
+source .env
+
+# bring up the database container in the background
+# --build forces rebuild of the container in case changes have been made
+# -V recreates any volumes instead of reusing data
+# -d run the composed images in daemon mode rather than in the foreground
+docker compose up --build -V -d
+
+# create the database and apply all migrations
+docker compose exec postgres dbmate up
+
+# run the database tests
+docker compose exec postgres dbmate test db/tests/
+
+# connect to the database with psql
+docker compose exec postgres psql -U postgres swoop
+```
+
+To verify the schema and migrations match:
+
+```shell
+# drop an existing database to start clean
+docker compose exec postgres dbmate drop
+
+# run the verification; any diff indicates schema/migrations out-of-sync
+docker compose exec postgres dbmate verify
+```
+
+To stop the `compose`d container(s):
+
+```shell
+docker compose down
+```
+
+### Adding a migration
+
+Use `dbmate` if needing to create a new migration file:
+
+```shell
+docker compose exec postgres dbmate new
+```
+
+### Adding database tests
+
+Database tests should be added as `.sql` files in the `./tests` directory.
+Follow the pattern of the existing test files. It's best to keep each file
+short and focused with a descriptive name. For more about the `pgtap` test
+framework see [the documentation](https://pgtap.org/documentation.html).
+
+## pre-commit hooks related to the database
+
+We use `sqlfluff` for linting sql. See the root `.sqlfluff` config file and the
+command defined in the `.pre-commit-config.yaml` for more information. Note
+that the tool is a bit slow and somewhat inaccurate at times; it is better than
+nothing but we should not hesitate to replace it with a better option if one
+becomes available.
diff --git a/db/dbmate b/db/dbmate
new file mode 100755
index 0000000..7d65e79
--- /dev/null
+++ b/db/dbmate
@@ -0,0 +1,338 @@
+#!/usr/bin/env bash
+#
+# Released under the MIT license at
+# https://gist.github.com/jkeifer/f75c65213c6a327229cf85ffa47e1efe
+#
+# Copyright 2023 Jarrett Keifer
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the “Software”), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+
+set -euo pipefail
+
+
+WRAPPER_NAME='dbmate-tupac'
+WRAPPER_VERSION='0.1.0'
+
+export DBMATE_NO_DUMP_SCHEMA=true
+
+# try to load .env file for vars from cwd
+. .env ||:
+
+
+echo2 () {
+ echo -e >&2 "${@}"
+}
+
+
+fatal () {
+ local msg="${1?message string required}"; shift
+ local rc=${1:-1}
+ echo2 "${msg:-}"
+ exit "$rc"
+}
+
+
+usage () {
+ cat </dev/null
+ dbmate dump > "$schema_dump"
+ dbmate drop > /dev/null
+ dbmate up >/dev/null
+ dbmate dump > "$migrations_dump"
+ dbmate drop >/dev/null
+
+ diff "$schema_dump" "$migrations_dump"
+}
+
+
+create () {
+ #overrides create to add --with-schema option
+ local arg
+ local schema_file=
+ local insert_migrations=false
+ while [ $# -gt 0 ]; do
+ arg=$1
+ shift ||:
+ case "$arg" in
+ -h|--help)
+ cat </dev/null || {
+ fatal "Tests require pg_prove to be installed from CPAN."
+ }
+
+ # pop the 'test' of args
+ shift
+
+ set -x
+ pg_prove \
+ -U "$DATABASE_USER" \
+ -d "$DATABASE_NAME" \
+ -h "$DATABASE_HOST" \
+ -p "$DATABASE_PORT" \
+ --ext .sql \
+ -v \
+ "$@"
+ set +x
+}
+
+
+find_dbmate() {
+ # to allow installing this script on the path,
+ # dbmate can be installed with the name _dbmate
+ local _dbmate
+ _dbmate="$(builtin type -P _dbmate || builtin type -P dbmate)"
+
+ [ -n "$_dbmate" ] || {
+ fatal "dbmate not found. See https://github.com/amacneil/dbmate."
+ }
+
+ [ "${BASH_SOURCE[0]}" != "$_dbmate" ] || {
+ fatal "Found dbmate on path, but appears to be ${WRAPPER_NAME}. Install dbmate to location on path with the name _dbmate."
+ }
+
+ echo "$_dbmate"
+}
+
+
+dbmate () {
+ DBMATE="$(find_dbmate)"
+
+ builtin type -P psql >/dev/null || {
+ fatal "psql not found. dbmate wrapper requires it for some operations."
+ }
+
+ local arg
+ for arg in "$@"; do
+ case "$arg" in
+ -h|--help)
+ usage
+ return
+ ;;
+ -v|--version)
+ version
+ return
+ ;;
+ dump)
+ dump "$@"
+ return
+ ;;
+ verify)
+ verify "$@"
+ return
+ ;;
+ create)
+ create "$@"
+ return
+ ;;
+ test)
+ dotest "$@"
+ return
+ ;;
+ -*)
+ ;;
+ *)
+ "$DBMATE" "$@"
+ return
+ ;;
+ esac
+ done
+
+ # if a command is missing default to help
+ usage
+}
+
+
+# check if we have been sourced and exit if so
+(return 0 2>/dev/null) && exit
+
+
+dbmate "$@"
diff --git a/db/migrations/20230328180857_initial_setup.sql b/db/migrations/20230328180857_initial_setup.sql
deleted file mode 100644
index f8d37a2..0000000
--- a/db/migrations/20230328180857_initial_setup.sql
+++ /dev/null
@@ -1,25 +0,0 @@
--- migrate:up
-
-CREATE TABLE IF NOT EXISTS jobs(
- pk SERIAL PRIMARY KEY,
- job_id UUID UNIQUE NOT NULL,
- payload json NOT NULL
-);
-
-CREATE TABLE IF NOT EXISTS task_executions(
- pk SERIAL PRIMARY KEY,
- task_id UUID UNIQUE NOT NULL
-);
-
-CREATE TABLE IF NOT EXISTS workflows(
- pk SERIAL PRIMARY KEY,
- workflow_id UUID UNIQUE NOT NULL
-);
-
--- migrate:down
-
-DROP TABLE IF EXISTS jobs;
-
-DROP TABLE IF EXISTS task_executions;
-
-DROP TABLE IF EXISTS workflows;
diff --git a/db/migrations/20230412000000_base.sql b/db/migrations/20230412000000_base.sql
new file mode 100644
index 0000000..3b5058f
--- /dev/null
+++ b/db/migrations/20230412000000_base.sql
@@ -0,0 +1,440 @@
+-- migrate:up
+
+CREATE SCHEMA IF NOT EXISTS swoop;
+CREATE SCHEMA partman;
+CREATE EXTENSION pg_partman SCHEMA partman;
+CREATE SCHEMA tap;
+CREATE EXTENSION pgtap SCHEMA tap;
+
+
+CREATE TABLE IF NOT EXISTS swoop.event_state (
+ name text PRIMARY KEY,
+ description text NOT NULL
+);
+
+INSERT INTO swoop.event_state (name, description) VALUES
+('PENDING', 'Action created and waiting to be executed'),
+('QUEUED', 'Action queued for handler'),
+('RUNNING', 'Action being run by handler'),
+('SUCCESSFUL', 'Action successful'),
+('FAILED', 'Action failed'),
+('CANCELED', 'Action canceled'),
+('TIMED_OUT', 'Action did not complete within allowed timeframe'),
+('UNKNOWN', 'Last update was unknown state'),
+('BACKOFF', 'Transient error, waiting to retry'),
+(
+ 'INVALID',
+ 'Action could not be completed successfully due to '
+ || 'configuration error or other incompatibility'
+),
+(
+ 'RETRIES_EXHAUSTED',
+ 'Call did not fail within allowed time or number of retries'
+),
+('INFO', 'Event is informational and does not change thread state');
+
+
+CREATE TABLE IF NOT EXISTS swoop.action (
+ action_uuid uuid NOT NULL DEFAULT gen_random_uuid(),
+ action_type text NOT NULL CHECK (action_type IN ('callback', 'workflow')),
+ action_name text,
+ handler_name text NOT NULL,
+ parent_uuid bigint, -- reference omitted, we don't need referential integrity
+ created_at timestamptz NOT NULL DEFAULT now(),
+ priority smallint DEFAULT 100,
+
+ CONSTRAINT workflow_or_callback CHECK (
+ CASE
+ WHEN
+ action_type = 'callback' THEN
+ parent_uuid IS NOT NULL
+ WHEN
+ action_type = 'workflow' THEN
+ action_name IS NOT NULL
+ END
+ )
+) PARTITION BY RANGE (created_at);
+
+CREATE INDEX ON swoop.action (created_at);
+CREATE INDEX ON swoop.action (action_uuid);
+CREATE INDEX ON swoop.action (handler_name);
+CREATE INDEX ON swoop.action (action_name);
+CREATE TABLE IF NOT EXISTS swoop.action_template (LIKE swoop.action);
+ALTER TABLE swoop.action_template ADD PRIMARY KEY (action_uuid);
+SELECT partman.create_parent(
+ 'swoop.action',
+ 'created_at',
+ 'native',
+ 'monthly',
+ p_template_table => 'swoop.action_template'
+);
+
+
+-- the noqa is here for GENERATED ALWAYS AS IDENTITY
+-- https://github.com/sqlfluff/sqlfluff/issues/4455
+CREATE TABLE IF NOT EXISTS swoop.thread ( -- noqa
+ created_at timestamptz NOT NULL,
+ last_update timestamptz NOT NULL,
+ -- action_uuid reference to action omitted, we don't need referential integrity
+ action_uuid uuid NOT NULL,
+
+ -- denormalize some values off action so we
+ -- don't have to join later in frequent queries
+ handler_name text NOT NULL,
+ priority smallint NOT NULL,
+
+ status text NOT NULL REFERENCES swoop.event_state ON DELETE RESTRICT,
+ next_attempt_after timestamptz,
+ error text,
+
+ -- We lock with advisory locks that take two int4 values, one for the table
+ -- OID and one for this lock_id. Note that this sequence can recycle values,
+ -- but temporal locality means recycled values should not be temporaly
+ -- conincident. Even if duplicates are "processable" at the same time, a lock
+ -- ID conflict at worst causes added latency in processing, not skipped
+ -- messages.
+ --
+ -- Recommended way to lock/unlock:
+ -- swoop.lock_thread(thread.lock_id)
+ -- swoop.unlock_thread(thread.lock_id)
+ lock_id integer GENERATED ALWAYS AS IDENTITY (
+ SEQUENCE NAME swoop.thread_lock_id_seq
+ MINVALUE -2147483648
+ START WITH 1
+ CYCLE
+ )
+) PARTITION BY RANGE (created_at);
+
+CREATE INDEX ON swoop.thread (created_at);
+CREATE INDEX ON swoop.thread (action_uuid);
+CREATE INDEX ON swoop.thread (status);
+CREATE INDEX ON swoop.thread (handler_name);
+CREATE TABLE IF NOT EXISTS swoop.thread_template (LIKE swoop.thread);
+ALTER TABLE swoop.thread_template ADD PRIMARY KEY (action_uuid);
+SELECT partman.create_parent(
+ 'swoop.thread',
+ 'created_at',
+ 'native',
+ 'monthly',
+ p_template_table => 'swoop.thread_template'
+);
+
+
+CREATE TABLE IF NOT EXISTS swoop.event (
+ event_time timestamptz NOT NULL,
+ action_uuid uuid NOT NULL, -- reference omitted, we don't need referential integrity
+ status text NOT NULL,
+ event_source text,
+ -- max backoff cannot be more than 1 day (even that seems extreme in most cases)
+ retry_seconds int CHECK (retry_seconds > 0 AND retry_seconds <= 86400),
+ error text
+) PARTITION BY RANGE (event_time);
+
+CREATE INDEX ON swoop.event (event_time);
+CREATE INDEX ON swoop.event (action_uuid);
+CREATE TABLE IF NOT EXISTS swoop.event_template (LIKE swoop.event);
+ALTER TABLE swoop.event_template ADD PRIMARY KEY (
+ action_uuid,
+ event_time,
+ status
+);
+SELECT partman.create_parent(
+ 'swoop.event',
+ 'event_time',
+ 'native',
+ 'monthly',
+ p_template_table => 'swoop.event_template'
+);
+
+
+CREATE OR REPLACE FUNCTION swoop.add_pending_event()
+RETURNS trigger
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+BEGIN
+ INSERT INTO swoop.event (event_time, action_uuid, status, event_source) VALUES
+ (NEW.created_at, NEW.action_uuid, 'PENDING', 'swoop-db');
+ RETURN NULL;
+END;
+$$;
+
+CREATE OR REPLACE TRIGGER add_pending_event
+AFTER INSERT ON swoop.action
+FOR EACH ROW EXECUTE FUNCTION swoop.add_pending_event();
+
+
+CREATE OR REPLACE FUNCTION swoop.add_thread()
+RETURNS trigger
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+BEGIN
+ INSERT INTO swoop.thread (
+ created_at,
+ last_update,
+ action_uuid,
+ handler_name,
+ priority,
+ status
+ ) VALUES (
+ NEW.created_at,
+ NEW.created_at,
+ NEW.action_uuid,
+ NEW.handler_name,
+ NEW.priority,
+ 'PENDING'
+ );
+ RETURN NULL;
+END;
+$$;
+
+CREATE OR REPLACE TRIGGER add_thread
+AFTER INSERT ON swoop.action
+FOR EACH ROW EXECUTE FUNCTION swoop.add_thread();
+
+
+CREATE OR REPLACE FUNCTION swoop.update_thread()
+RETURNS trigger
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+ _latest timestamptz;
+ _status text;
+ _next_attempt timestamptz;
+BEGIN
+ SELECT last_update FROM swoop.thread WHERE action_uuid = NEW.action_uuid INTO _latest;
+
+ -- If the event time is older than the last update we don't update the thread
+ -- (we can't use a trigger condition to filter this because we don't know the
+ -- last update time from the event alone).
+ IF _latest IS NOT NULL AND NEW.event_time < _latest THEN
+ RETURN NULL;
+ END IF;
+
+ -- Coerce status to UNKNOWN if it doesn't match a known status type
+ SELECT name from swoop.event_state WHERE name = NEW.status
+ UNION
+ SELECT 'UNKNOWN'
+ LIMIT 1
+ INTO _status;
+
+ -- If we need a next attempt time let's calculate it
+ IF NEW.retry_seconds IS NOT NULL THEN
+ SELECT NEW.event_time + (NEW.retry_seconds * interval '1 second') INTO _next_attempt;
+ END IF;
+
+ UPDATE swoop.thread as t SET
+ last_update = NEW.event_time,
+ status = _status,
+ next_attempt_after = _next_attempt,
+ error = NEW.error
+ WHERE
+ t.action_uuid = NEW.action_uuid;
+
+ -- We _could_ try to drop the thread lock here, which would be nice for
+ -- swoop-conductor so it didn't have to explicitly unlock, but the unlock
+ -- function raises a warning. Being explicit isn't the worst thing either,
+ -- given the complications with possible relocking and the need for clients
+ -- to stay aware of that possibility.
+
+ RETURN NULL;
+END;
+$$;
+
+CREATE OR REPLACE TRIGGER update_thread
+AFTER INSERT ON swoop.event
+FOR EACH ROW WHEN (NEW.status NOT IN ('PENDING', 'INFO')) -- noqa: CP02
+EXECUTE FUNCTION swoop.update_thread();
+
+
+CREATE OR REPLACE FUNCTION swoop.thread_is_processable(_thread swoop.thread) -- noqa: LT01
+RETURNS boolean
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+BEGIN
+ RETURN (
+ _thread.status = 'PENDING'
+ OR _thread.status = 'BACKOFF' AND _thread.next_attempt_after <= now()
+ );
+END;
+$$;
+
+
+CREATE OR REPLACE FUNCTION swoop.notify_for_processable_thread()
+RETURNS trigger
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+BEGIN
+ PERFORM
+ pg_notify(handler_name, NEW.action_uuid::text)
+ FROM
+ swoop.action
+ WHERE
+ action_uuid = NEW.action_uuid;
+ RETURN NULL;
+END;
+$$;
+
+CREATE OR REPLACE TRIGGER processable_notify
+AFTER INSERT OR UPDATE ON swoop.thread
+FOR EACH ROW WHEN (swoop.thread_is_processable(NEW)) -- noqa: CP02
+EXECUTE FUNCTION swoop.notify_for_processable_thread();
+
+
+-- If we are looking for processable rows we want to exclude any that already
+-- have locks, as the locking mechanism doesn't prevent the same session from
+-- "getting" a lock it already has. In other words, pg_try_advisory_lock()
+-- will return true multiple times when run in the same session, so cannot be
+-- used effectively as a filter mechanism here.
+--
+-- We could do that with a CTE like this:
+--
+-- WITH locks AS (
+-- SELECT objid AS lock_id
+-- FROM pg_locks
+-- WHERE
+-- granted
+-- AND database = (
+-- SELECT oid FROM pg_database WHERE datname = current_database()
+-- )
+-- AND locktype = 'advisory'
+-- AND classid = to_regclass('swoop.thread')::oid::integer
+-- )
+--
+-- And then adding a `LEFT JOIN locks AS l USING (lock_id)` and a column
+-- defined as `l.lock_id IS NOT NULL AS has_lock`. Then callers could filter
+-- on `has_lock` before attempting to lock.
+--
+-- But per the pg_locks docs:
+--
+-- Locking the regular and/or predicate lock manager could have some impact
+-- on database performance if this view is very frequently accessed. The
+-- locks are held only for the minimum amount of time necessary to obtain
+-- data from the lock managers, but this does not completely eliminate the
+-- possibility of a performance impact.
+--
+-- (https://www.postgresql.org/docs/current/view-pg-locks.html)
+--
+-- It remains up to callers to track their locked rows by `action_uuid` and:
+-- 1) filter rows from this view to prevent double-locking rows
+-- 2) ensure locks get released when they are no longer required
+--
+-- Note that row-level locks--a la `FOR UPDATE SKIP LOCKED`--are not any better
+-- in this regard. They _only_ support transaction-level locks, which means
+-- keeping a _transaction_ open until all locked rows have been updated. This
+-- also means the client cannot commit updates for the rows as they complete,
+-- but instead has to hold all updates until the slowest row has finished then
+-- commit the whole batch. Moreover, the lock holder will still get the locked
+-- rows back if not excluding them from queries. These locks also cannot be
+-- explicitly released: they persist until the end of the transaction.
+--
+-- In fact the only noticable advantage of the row-level locks is that they do
+-- not stack, so the client doesn't have to track how many times they've
+-- aquired a lock.
+CREATE OR REPLACE FUNCTION swoop.get_processable_actions(
+ _ignored_action_uuids uuid [],
+ _limit integer DEFAULT 10,
+ _handler_names text [] DEFAULT ARRAY[]::text []
+)
+RETURNS TABLE (action_uuid uuid, handler_name text)
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+BEGIN
+ RETURN QUERY
+ -- The CTE here is **critical**. If we don't use the CTE,
+ -- then the LIMIT likely will not be applied before the
+ -- WHERE clause, and we will lock rows that aren't returned.
+ -- Those rows will get stuck as locked until the session
+ -- drops or runs `pg_advisory_unlock_all()`.
+ WITH actions AS (
+ SELECT
+ t.action_uuid as action_uuid,
+ t.handler_name as handler_name,
+ t.lock_id as lock_id
+ FROM
+ swoop.thread as t
+ WHERE
+ swoop.thread_is_processable(t) -- noqa: RF02
+ AND (
+ -- strangely this returns null instead of 0 if
+ -- the array is empty
+ array_length(_handler_names, 1) IS NULL
+ OR t.handler_name = any(_handler_names)
+ )
+ AND (
+ -- see notes on the swoop.action_thread view for
+ -- the reasoning behind _ignored_action_uuids
+ array_length(_ignored_action_uuids, 1) IS NULL
+ OR NOT (t.action_uuid = any(_ignored_action_uuids))
+ )
+ ORDER BY t.priority
+ )
+
+ SELECT
+ a.action_uuid AS action_uuid,
+ a.handler_name AS handler_name
+ FROM actions AS a
+ WHERE swoop.lock_thread(a.lock_id)
+ LIMIT _limit;
+ RETURN;
+END;
+$$;
+
+
+CREATE OR REPLACE FUNCTION swoop.lock_thread(_lock_id integer)
+RETURNS bool
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+BEGIN
+ RETURN (
+ SELECT pg_try_advisory_lock(to_regclass('swoop.thread')::oid::integer, _lock_id)
+ );
+END;
+$$;
+
+
+CREATE OR REPLACE FUNCTION swoop.unlock_thread(_lock_id integer)
+RETURNS bool
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+BEGIN
+ RETURN (
+ SELECT pg_advisory_unlock(to_regclass('swoop.thread')::oid::integer, _lock_id)
+ );
+END;
+$$;
+
+
+-- migrate:down
+
+DROP FUNCTION swoop.unlock_thread;
+DROP FUNCTION swoop.lock_thread;
+DROP FUNCTION swoop.get_processable_actions;
+DROP VIEW swoop.action_thread;
+DROP TRIGGER processable_notify ON swoop.thread;
+DROP FUNCTION swoop.notify_for_processable_thread;
+DROP FUNCTION swoop.thread_is_processable;
+DROP TRIGGER update_thread ON swoop.event;
+DROP FUNCTION swoop.update_thread;
+DROP TRIGGER add_thread ON swoop.action;
+DROP FUNCTION swoop.add_thread;
+DROP TRIGGER add_pending_event ON swoop.action;
+DROP FUNCTION swoop.add_pending_event;
+DROP TABLE swoop.event_template;
+DROP TABLE swoop.event;
+DROP TABLE swoop.thread_template;
+DROP TABLE swoop.thread;
+DROP TABLE swoop.action_template;
+DROP TABLE swoop.action;
+DROP TABLE swoop.event_state;
+DROP EXTENSION pgtap;
+DROP SCHEMA tap CASCADE;
+DROP EXTENSION pg_partman;
+DROP SCHEMA partman CASCADE;
+DROP SCHEMA swoop CASCADE;
diff --git a/db/schema.sql b/db/schema.sql
index 87046aa..87698d1 100644
--- a/db/schema.sql
+++ b/db/schema.sql
@@ -1,203 +1,414 @@
-SET statement_timeout = 0;
-SET lock_timeout = 0;
-SET idle_in_transaction_session_timeout = 0;
-SET client_encoding = 'UTF8';
-SET standard_conforming_strings = on;
-SELECT pg_catalog.set_config('search_path', '', false);
-SET check_function_bodies = false;
-SET xmloption = content;
-SET client_min_messages = warning;
-SET row_security = off;
+CREATE SCHEMA swoop;
+CREATE SCHEMA partman;
+CREATE EXTENSION pg_partman SCHEMA partman;
+CREATE SCHEMA tap;
+CREATE EXTENSION pgtap SCHEMA tap;
-SET default_tablespace = '';
-SET default_table_access_method = heap;
-
---
--- Name: jobs; Type: TABLE; Schema: public; Owner: -
---
-
-CREATE TABLE public.jobs (
- pk integer NOT NULL,
- job_id uuid NOT NULL,
- payload json NOT NULL
+CREATE TABLE swoop.schema_migrations (
+ version character varying(128) PRIMARY KEY
);
---
--- Name: jobs_pk_seq; Type: SEQUENCE; Schema: public; Owner: -
---
-
-CREATE SEQUENCE public.jobs_pk_seq
- AS integer
- START WITH 1
- INCREMENT BY 1
- NO MINVALUE
- NO MAXVALUE
- CACHE 1;
-
-
---
--- Name: jobs_pk_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
---
-
-ALTER SEQUENCE public.jobs_pk_seq OWNED BY public.jobs.pk;
-
-
---
--- Name: schema_migrations; Type: TABLE; Schema: public; Owner: -
---
-
-CREATE TABLE public.schema_migrations (
- version character varying(128) NOT NULL
+CREATE TABLE swoop.event_state (
+ name text PRIMARY KEY,
+ description text NOT NULL
);
-
---
--- Name: task_executions; Type: TABLE; Schema: public; Owner: -
---
-
-CREATE TABLE public.task_executions (
- pk integer NOT NULL,
- task_id uuid NOT NULL
+INSERT INTO swoop.event_state (name, description) VALUES
+('PENDING', 'Action created and waiting to be executed'),
+('QUEUED', 'Action queued for handler'),
+('RUNNING', 'Action being run by handler'),
+('SUCCESSFUL', 'Action successful'),
+('FAILED', 'Action failed'),
+('CANCELED', 'Action canceled'),
+('TIMED_OUT', 'Action did not complete within allowed timeframe'),
+('UNKNOWN', 'Last update was unknown state'),
+('BACKOFF', 'Transient error, waiting to retry'),
+(
+ 'INVALID',
+ 'Action could not be completed successfully due to '
+ || 'configuration error or other incompatibility'
+),
+(
+ 'RETRIES_EXHAUSTED',
+ 'Call did not fail within allowed time or number of retries'
+),
+('INFO', 'Event is informational and does not change thread state');
+
+
+CREATE TABLE swoop.action (
+ action_uuid uuid NOT NULL DEFAULT gen_random_uuid(),
+ action_type text NOT NULL CHECK (action_type IN ('callback', 'workflow')),
+ action_name text,
+ handler_name text NOT NULL,
+ parent_uuid bigint, -- reference omitted, we don't need referential integrity
+ created_at timestamptz NOT NULL DEFAULT now(),
+ priority smallint DEFAULT 100,
+
+ CONSTRAINT workflow_or_callback CHECK (
+ CASE
+ WHEN
+ action_type = 'callback' THEN
+ parent_uuid IS NOT NULL
+ WHEN
+ action_type = 'workflow' THEN
+ action_name IS NOT NULL
+ END
+ )
+) PARTITION BY RANGE (created_at);
+
+CREATE INDEX ON swoop.action (created_at);
+CREATE INDEX ON swoop.action (action_uuid);
+CREATE INDEX ON swoop.action (handler_name);
+CREATE INDEX ON swoop.action (action_name);
+CREATE TABLE swoop.action_template (LIKE swoop.action);
+ALTER TABLE swoop.action_template ADD PRIMARY KEY (action_uuid);
+SELECT partman.create_parent(
+ 'swoop.action',
+ 'created_at',
+ 'native',
+ 'monthly',
+ p_template_table => 'swoop.action_template'
);
---
--- Name: task_executions_pk_seq; Type: SEQUENCE; Schema: public; Owner: -
---
-
-CREATE SEQUENCE public.task_executions_pk_seq
- AS integer
+-- the noqa is here for GENERATED ALWAYS AS IDENTITY
+-- https://github.com/sqlfluff/sqlfluff/issues/4455
+CREATE TABLE swoop.thread ( -- noqa
+ created_at timestamptz NOT NULL,
+ last_update timestamptz NOT NULL,
+ -- action_uuid reference to action omitted, we don't need referential integrity
+ action_uuid uuid NOT NULL,
+
+ -- denormalize some values off action so we
+ -- don't have to join later in frequent queries
+ handler_name text NOT NULL,
+ priority smallint NOT NULL,
+
+ status text NOT NULL REFERENCES swoop.event_state ON DELETE RESTRICT,
+ next_attempt_after timestamptz,
+ error text,
+
+ -- We lock with advisory locks that take two int4 values, one for the table
+ -- OID and one for this lock_id. Note that this sequence can recycle values,
+ -- but temporal locality means recycled values should not be temporaly
+ -- conincident. Even if duplicates are "processable" at the same time, a lock
+ -- ID conflict at worst causes added latency in processing, not skipped
+ -- messages.
+ --
+ -- Recommended way to lock/unlock:
+ -- swoop.lock_thread(thread.lock_id)
+ -- swoop.unlock_thread(thread.lock_id)
+ lock_id integer GENERATED ALWAYS AS IDENTITY (
+ SEQUENCE NAME swoop.thread_lock_id_seq
+ MINVALUE -2147483648
START WITH 1
- INCREMENT BY 1
- NO MINVALUE
- NO MAXVALUE
- CACHE 1;
-
-
---
--- Name: task_executions_pk_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
---
-
-ALTER SEQUENCE public.task_executions_pk_seq OWNED BY public.task_executions.pk;
-
-
---
--- Name: workflows; Type: TABLE; Schema: public; Owner: -
---
-
-CREATE TABLE public.workflows (
- pk integer NOT NULL,
- workflow_id uuid NOT NULL
+ CYCLE
+ )
+) PARTITION BY RANGE (created_at);
+
+CREATE INDEX ON swoop.thread (created_at);
+CREATE INDEX ON swoop.thread (action_uuid);
+CREATE INDEX ON swoop.thread (status);
+CREATE INDEX ON swoop.thread (handler_name);
+CREATE TABLE swoop.thread_template (LIKE swoop.thread);
+ALTER TABLE swoop.thread_template ADD PRIMARY KEY (action_uuid);
+SELECT partman.create_parent(
+ 'swoop.thread',
+ 'created_at',
+ 'native',
+ 'monthly',
+ p_template_table => 'swoop.thread_template'
);
---
--- Name: workflows_pk_seq; Type: SEQUENCE; Schema: public; Owner: -
---
-
-CREATE SEQUENCE public.workflows_pk_seq
- AS integer
- START WITH 1
- INCREMENT BY 1
- NO MINVALUE
- NO MAXVALUE
- CACHE 1;
-
-
---
--- Name: workflows_pk_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
---
-
-ALTER SEQUENCE public.workflows_pk_seq OWNED BY public.workflows.pk;
-
-
---
--- Name: jobs pk; Type: DEFAULT; Schema: public; Owner: -
---
-
-ALTER TABLE ONLY public.jobs ALTER COLUMN pk SET DEFAULT nextval('public.jobs_pk_seq'::regclass);
-
-
---
--- Name: task_executions pk; Type: DEFAULT; Schema: public; Owner: -
---
-
-ALTER TABLE ONLY public.task_executions ALTER COLUMN pk SET DEFAULT nextval('public.task_executions_pk_seq'::regclass);
-
-
---
--- Name: workflows pk; Type: DEFAULT; Schema: public; Owner: -
---
-
-ALTER TABLE ONLY public.workflows ALTER COLUMN pk SET DEFAULT nextval('public.workflows_pk_seq'::regclass);
-
-
---
--- Name: jobs jobs_job_id_key; Type: CONSTRAINT; Schema: public; Owner: -
---
-
-ALTER TABLE ONLY public.jobs
- ADD CONSTRAINT jobs_job_id_key UNIQUE (job_id);
-
-
---
--- Name: jobs jobs_pkey; Type: CONSTRAINT; Schema: public; Owner: -
---
-
-ALTER TABLE ONLY public.jobs
- ADD CONSTRAINT jobs_pkey PRIMARY KEY (pk);
-
-
---
--- Name: schema_migrations schema_migrations_pkey; Type: CONSTRAINT; Schema: public; Owner: -
---
-
-ALTER TABLE ONLY public.schema_migrations
- ADD CONSTRAINT schema_migrations_pkey PRIMARY KEY (version);
-
-
---
--- Name: task_executions task_executions_pkey; Type: CONSTRAINT; Schema: public; Owner: -
---
-
-ALTER TABLE ONLY public.task_executions
- ADD CONSTRAINT task_executions_pkey PRIMARY KEY (pk);
-
-
---
--- Name: task_executions task_executions_task_id_key; Type: CONSTRAINT; Schema: public; Owner: -
---
-
-ALTER TABLE ONLY public.task_executions
- ADD CONSTRAINT task_executions_task_id_key UNIQUE (task_id);
-
-
---
--- Name: workflows workflows_pkey; Type: CONSTRAINT; Schema: public; Owner: -
---
-
-ALTER TABLE ONLY public.workflows
- ADD CONSTRAINT workflows_pkey PRIMARY KEY (pk);
-
-
---
--- Name: workflows workflows_workflow_id_key; Type: CONSTRAINT; Schema: public; Owner: -
---
-
-ALTER TABLE ONLY public.workflows
- ADD CONSTRAINT workflows_workflow_id_key UNIQUE (workflow_id);
-
-
---
--- PostgreSQL database dump complete
---
-
+CREATE TABLE swoop.event (
+ event_time timestamptz NOT NULL,
+ action_uuid uuid NOT NULL, -- reference omitted, we don't need referential integrity
+ status text NOT NULL,
+ event_source text,
+ -- max backoff cannot be more than 1 day (even that seems extreme in most cases)
+ retry_seconds int CHECK (retry_seconds > 0 AND retry_seconds <= 86400),
+ error text
+) PARTITION BY RANGE (event_time);
+
+CREATE INDEX ON swoop.event (event_time);
+CREATE INDEX ON swoop.event (action_uuid);
+CREATE TABLE swoop.event_template (LIKE swoop.event);
+ALTER TABLE swoop.event_template ADD PRIMARY KEY (
+ action_uuid,
+ event_time,
+ status
+);
+SELECT partman.create_parent(
+ 'swoop.event',
+ 'event_time',
+ 'native',
+ 'monthly',
+ p_template_table => 'swoop.event_template'
+);
---
--- Dbmate schema migrations
---
-INSERT INTO public.schema_migrations (version) VALUES
- ('20230328180857');
+CREATE FUNCTION swoop.add_pending_event()
+RETURNS trigger
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+BEGIN
+ INSERT INTO swoop.event (event_time, action_uuid, status, event_source) VALUES
+ (NEW.created_at, NEW.action_uuid, 'PENDING', 'swoop-db');
+ RETURN NULL;
+END;
+$$;
+
+CREATE TRIGGER add_pending_event
+AFTER INSERT ON swoop.action
+FOR EACH ROW EXECUTE FUNCTION swoop.add_pending_event();
+
+
+CREATE FUNCTION swoop.add_thread()
+RETURNS trigger
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+BEGIN
+ INSERT INTO swoop.thread (
+ created_at,
+ last_update,
+ action_uuid,
+ handler_name,
+ priority,
+ status
+ ) VALUES (
+ NEW.created_at,
+ NEW.created_at,
+ NEW.action_uuid,
+ NEW.handler_name,
+ NEW.priority,
+ 'PENDING'
+ );
+ RETURN NULL;
+END;
+$$;
+
+CREATE TRIGGER add_thread
+AFTER INSERT ON swoop.action
+FOR EACH ROW EXECUTE FUNCTION swoop.add_thread();
+
+
+CREATE FUNCTION swoop.update_thread()
+RETURNS trigger
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+ _latest timestamptz;
+ _status text;
+ _next_attempt timestamptz;
+BEGIN
+ SELECT last_update FROM swoop.thread WHERE action_uuid = NEW.action_uuid INTO _latest;
+
+ -- If the event time is older than the last update we don't update the thread
+ -- (we can't use a trigger condition to filter this because we don't know the
+ -- last update time from the event alone).
+ IF _latest IS NOT NULL AND NEW.event_time < _latest THEN
+ RETURN NULL;
+ END IF;
+
+ -- Coerce status to UNKNOWN if it doesn't match a known status type
+ SELECT name from swoop.event_state WHERE name = NEW.status
+ UNION
+ SELECT 'UNKNOWN'
+ LIMIT 1
+ INTO _status;
+
+ -- If we need a next attempt time let's calculate it
+ IF NEW.retry_seconds IS NOT NULL THEN
+ SELECT NEW.event_time + (NEW.retry_seconds * interval '1 second') INTO _next_attempt;
+ END IF;
+
+ UPDATE swoop.thread as t SET
+ last_update = NEW.event_time,
+ status = _status,
+ next_attempt_after = _next_attempt,
+ error = NEW.error
+ WHERE
+ t.action_uuid = NEW.action_uuid;
+
+ -- We _could_ try to drop the thread lock here, which would be nice for
+ -- swoop-conductor so it didn't have to explicitly unlock, but the unlock
+ -- function raises a warning. Being explicit isn't the worst thing either,
+ -- given the complications with possible relocking and the need for clients
+ -- to stay aware of that possibility.
+
+ RETURN NULL;
+END;
+$$;
+
+CREATE TRIGGER update_thread
+AFTER INSERT ON swoop.event
+FOR EACH ROW WHEN (NEW.status NOT IN ('PENDING', 'INFO')) -- noqa: CP02
+EXECUTE FUNCTION swoop.update_thread();
+
+
+CREATE FUNCTION swoop.thread_is_processable(_thread swoop.thread) -- noqa: LT01
+RETURNS boolean
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+BEGIN
+ RETURN (
+ _thread.status = 'PENDING'
+ OR _thread.status = 'BACKOFF' AND _thread.next_attempt_after <= now()
+ );
+END;
+$$;
+
+
+CREATE FUNCTION swoop.notify_for_processable_thread()
+RETURNS trigger
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+BEGIN
+ PERFORM
+ pg_notify(handler_name, NEW.action_uuid::text)
+ FROM
+ swoop.action
+ WHERE
+ action_uuid = NEW.action_uuid;
+ RETURN NULL;
+END;
+$$;
+
+CREATE TRIGGER processable_notify
+AFTER INSERT OR UPDATE ON swoop.thread
+FOR EACH ROW WHEN (swoop.thread_is_processable(NEW)) -- noqa: CP02
+EXECUTE FUNCTION swoop.notify_for_processable_thread();
+
+
+-- If we are looking for processable rows we want to exclude any that already
+-- have locks, as the locking mechanism doesn't prevent the same session from
+-- "getting" a lock it already has. In other words, pg_try_advisory_lock()
+-- will return true multiple times when run in the same session, so cannot be
+-- used effectively as a filter mechanism here.
+--
+-- We could do that with a CTE like this:
+--
+-- WITH locks AS (
+-- SELECT objid AS lock_id
+-- FROM pg_locks
+-- WHERE
+-- granted
+-- AND database = (
+-- SELECT oid FROM pg_database WHERE datname = current_database()
+-- )
+-- AND locktype = 'advisory'
+-- AND classid = to_regclass('swoop.thread')::oid::integer
+-- )
+--
+-- And then adding a `LEFT JOIN locks AS l USING (lock_id)` and a column
+-- defined as `l.lock_id IS NOT NULL AS has_lock`. Then callers could filter
+-- on `has_lock` before attempting to lock.
+--
+-- But per the pg_locks docs:
+--
+-- Locking the regular and/or predicate lock manager could have some impact
+-- on database performance if this view is very frequently accessed. The
+-- locks are held only for the minimum amount of time necessary to obtain
+-- data from the lock managers, but this does not completely eliminate the
+-- possibility of a performance impact.
+--
+-- (https://www.postgresql.org/docs/current/view-pg-locks.html)
+--
+-- It remains up to callers to track their locked rows by `action_uuid` and:
+-- 1) filter rows from this view to prevent double-locking rows
+-- 2) ensure locks get released when they are no longer required
+--
+-- Note that row-level locks--a la `FOR UPDATE SKIP LOCKED`--are not any better
+-- in this regard. They _only_ support transaction-level locks, which means
+-- keeping a _transaction_ open until all locked rows have been updated. This
+-- also means the client cannot commit updates for the rows as they complete,
+-- but instead has to hold all updates until the slowest row has finished then
+-- commit the whole batch. Moreover, the lock holder will still get the locked
+-- rows back if not excluding them from queries. These locks also cannot be
+-- explicitly released: they persist until the end of the transaction.
+--
+-- In fact the only noticable advantage of the row-level locks is that they do
+-- not stack, so the client doesn't have to track how many times they've
+-- aquired a lock.
+CREATE FUNCTION swoop.get_processable_actions(
+ _ignored_action_uuids uuid [],
+ _limit integer DEFAULT 10,
+ _handler_names text [] DEFAULT ARRAY[]::text []
+)
+RETURNS TABLE (action_uuid uuid, handler_name text)
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+BEGIN
+ RETURN QUERY
+ -- The CTE here is **critical**. If we don't use the CTE,
+ -- then the LIMIT likely will not be applied before the
+ -- WHERE clause, and we will lock rows that aren't returned.
+ -- Those rows will get stuck as locked until the session
+ -- drops or runs `pg_advisory_unlock_all()`.
+ WITH actions AS (
+ SELECT
+ t.action_uuid as action_uuid,
+ t.handler_name as handler_name,
+ t.lock_id as lock_id
+ FROM
+ swoop.thread as t
+ WHERE
+ swoop.thread_is_processable(t) -- noqa: RF02
+ AND (
+ -- strangely this returns null instead of 0 if
+ -- the array is empty
+ array_length(_handler_names, 1) IS NULL
+ OR t.handler_name = any(_handler_names)
+ )
+ AND (
+ -- see notes on the swoop.action_thread view for
+ -- the reasoning behind _ignored_action_uuids
+ array_length(_ignored_action_uuids, 1) IS NULL
+ OR NOT (t.action_uuid = any(_ignored_action_uuids))
+ )
+ ORDER BY t.priority
+ )
+
+ SELECT
+ a.action_uuid AS action_uuid,
+ a.handler_name AS handler_name
+ FROM actions AS a
+ WHERE swoop.lock_thread(a.lock_id)
+ LIMIT _limit;
+ RETURN;
+END;
+$$;
+
+
+CREATE FUNCTION swoop.lock_thread(_lock_id integer)
+RETURNS bool
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+BEGIN
+ RETURN (
+ SELECT pg_try_advisory_lock(to_regclass('swoop.thread')::oid::integer, _lock_id)
+ );
+END;
+$$;
+
+
+CREATE FUNCTION swoop.unlock_thread(_lock_id integer)
+RETURNS bool
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+BEGIN
+ RETURN (
+ SELECT pg_advisory_unlock(to_regclass('swoop.thread')::oid::integer, _lock_id)
+ );
+END;
+$$;
diff --git a/db/tests/action.sql b/db/tests/action.sql
new file mode 100644
index 0000000..661f2fd
--- /dev/null
+++ b/db/tests/action.sql
@@ -0,0 +1,240 @@
+BEGIN;
+
+SET search_path = tap, public;
+SELECT plan(13);
+
+INSERT INTO swoop.action (
+ action_uuid,
+ action_type,
+ handler_name,
+ action_name,
+ created_at
+) VALUES (
+ 'b15120b8-b7ab-4180-9b7a-b0384758f468'::uuid,
+ 'workflow',
+ 'argo-workflow',
+ 'workflow-a',
+ '2023-04-13 00:25:07.388012+00'::timestamptz
+);
+
+-- check event created as expected
+SELECT results_eq(
+ $$
+ SELECT
+ event_time,
+ action_uuid,
+ status,
+ retry_seconds,
+ error
+ FROM
+ swoop.event
+ WHERE action_uuid = 'b15120b8-b7ab-4180-9b7a-b0384758f468'
+ $$,
+ $$
+ VALUES (
+ '2023-04-13 00:25:07.388012+00'::timestamptz,
+ 'b15120b8-b7ab-4180-9b7a-b0384758f468'::uuid,
+ 'PENDING',
+ null::integer,
+ null
+ )
+ $$,
+ 'event should be created on action insert'
+);
+
+-- check thread created as expected
+SELECT results_eq(
+ $$
+ SELECT
+ last_update,
+ action_uuid,
+ status,
+ next_attempt_after
+ FROM
+ swoop.thread
+ WHERE
+ action_uuid = 'b15120b8-b7ab-4180-9b7a-b0384758f468'
+ $$,
+ $$
+ VALUES (
+ '2023-04-13 00:25:07.388012+00'::timestamptz,
+ 'b15120b8-b7ab-4180-9b7a-b0384758f468'::uuid,
+ 'PENDING',
+ null::timestamptz
+ )
+ $$,
+ 'thread should be created on event insert'
+);
+
+-- get the processable action
+SELECT is_empty(
+ $$
+ SELECT swoop.get_processable_actions(
+ _ignored_action_uuids => array[]::uuid[],
+ _handler_names => array['bogus']
+ )
+ $$,
+ 'should not return any processable actions - bad action name'
+);
+
+SELECT is_empty(
+ $$
+ SELECT swoop.get_processable_actions(
+ _ignored_action_uuids => array['b15120b8-b7ab-4180-9b7a-b0384758f468'::uuid]
+ )
+ $$,
+ 'should not return any processable actions - filtered action uuid'
+);
+
+SELECT results_eq(
+ $$
+ SELECT
+ action_uuid,
+ handler_name
+ FROM
+ swoop.get_processable_actions(
+ _ignored_action_uuids => array[]::uuid[],
+ _handler_names => array['argo-workflow']
+ )
+ $$,
+ $$
+ SELECT
+ action_uuid,
+ handler_name
+ FROM
+ swoop.action
+ WHERE
+ action_uuid = 'b15120b8-b7ab-4180-9b7a-b0384758f468'
+ $$,
+ 'should get our processable action'
+);
+
+-- check locks
+SELECT results_eq(
+ $$
+ SELECT
+ classid,
+ objid
+ FROM
+ pg_locks
+ WHERE
+ locktype = 'advisory'
+ $$,
+ $$
+ SELECT
+ to_regclass('swoop.thread')::oid,
+ lock_id::oid
+ FROM
+ swoop.thread
+ WHERE
+ action_uuid = 'b15120b8-b7ab-4180-9b7a-b0384758f468'
+ $$,
+ 'should have an advisory lock for the processable action we grabbed'
+);
+
+-- insert backoff event for action, drop lock,
+-- check thread update, and check processable
+INSERT INTO swoop.event (
+ event_time,
+ action_uuid,
+ status,
+ retry_seconds,
+ error
+) VALUES (
+ '2023-04-13 00:25:08.388012+00'::timestamptz,
+ 'b15120b8-b7ab-4180-9b7a-b0384758f468'::uuid,
+ 'BACKOFF',
+ 1,
+ 'some error string'
+);
+
+SELECT
+ ok(
+ swoop.unlock_thread(lock_id),
+ 'should release the lock on our row'
+ )
+FROM swoop.thread
+WHERE
+ action_uuid = 'b15120b8-b7ab-4180-9b7a-b0384758f468'::uuid;
+
+SELECT is_empty(
+ $$
+ SELECT
+ classid,
+ objid
+ FROM
+ pg_locks
+ WHERE
+ locktype = 'advisory'
+ $$,
+ 'should not have any advisory locks'
+);
+
+SELECT
+ matches(
+ status,
+ 'BACKOFF',
+ 'thread status should be backoff'
+ ) AS matches
+FROM swoop.thread;
+
+SELECT
+ cmp_ok(
+ next_attempt_after,
+ '=',
+ last_update + interval '1 second',
+ 'thread next attempt should be last update plus backoff time'
+ ) AS cmp
+FROM swoop.thread;
+
+SELECT results_eq(
+ $$
+ SELECT
+ action_uuid,
+ handler_name
+ FROM
+ swoop.get_processable_actions(
+ _ignored_action_uuids => array[]::uuid[],
+ _handler_names => array['argo-workflow']
+ )
+ $$,
+ $$
+ SELECT action_uuid, handler_name
+ FROM swoop.action
+ $$,
+ 'should get our processable action in the backoff state'
+);
+
+-- insert queued event, drop lock, and check it is not processable
+INSERT INTO swoop.event (
+ event_time,
+ action_uuid,
+ status
+) VALUES (
+ '2023-04-13 00:25:10.388012+00'::timestamptz,
+ 'b15120b8-b7ab-4180-9b7a-b0384758f468'::uuid,
+ 'QUEUED'
+);
+
+SELECT
+ ok(
+ swoop.unlock_thread(lock_id),
+ 'should release the lock on our row once more'
+ )
+FROM swoop.thread
+WHERE
+ action_uuid = 'b15120b8-b7ab-4180-9b7a-b0384758f468'::uuid;
+
+
+SELECT is_empty(
+ $$
+ SELECT swoop.get_processable_actions(
+ _ignored_action_uuids => array[]::uuid[]
+ )
+ $$,
+ 'should not return any processable actions due to state'
+);
+
+
+SELECT * FROM finish(); -- noqa
+ROLLBACK;
diff --git a/db/tests/limit-locking.sql b/db/tests/limit-locking.sql
new file mode 100644
index 0000000..8a9349b
--- /dev/null
+++ b/db/tests/limit-locking.sql
@@ -0,0 +1,63 @@
+BEGIN;
+
+SET search_path = tap, public;
+SELECT plan(3);
+
+DO
+$$
+BEGIN
+ FOR i in 1..300 LOOP
+ INSERT INTO swoop.action (
+ action_uuid,
+ action_type,
+ handler_name,
+ action_name,
+ created_at
+ ) VALUES (
+ gen_random_uuid(),
+ 'workflow',
+ 'argo-workflow',
+ 'workflow-a',
+ now()
+ );
+ END LOOP;
+END;
+$$;
+
+SELECT
+ is(
+ count(*),
+ 300::bigint,
+ 'should have expected number of processable threads'
+ )
+FROM
+ swoop.thread AS t --noqa: AL05
+WHERE
+ swoop.thread_is_processable(t);
+
+SELECT
+ is(
+ count(*),
+ 10::bigint,
+ 'should get expected number of processable actions'
+ )
+FROM
+ swoop.get_processable_actions(
+ _ignored_action_uuids => ARRAY[]::uuid []
+ );
+
+SELECT
+ is(
+ count(*),
+ 10::bigint,
+ 'should have exepcted number of locks on threads'
+ )
+FROM
+ pg_locks
+WHERE
+ locktype = 'advisory'
+ AND classid = to_regclass('swoop.thread')::oid;
+
+
+SELECT * FROM finish(); -- noqa
+ROLLBACK;
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..72ab15a
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,18 @@
+version: '3.8'
+
+services:
+ postgres:
+ build:
+ context: ./db
+ dockerfile: Dockerfile
+ restart: always
+ environment:
+ POSTGRES_DB: "${DATABASE_NAME:-postgres}"
+ POSTGRES_PASSWORD: "${DATABASE_PASS:-password}"
+ POSTGRES_USER: "${DATABASE_USER:-postgres}"
+ POSTGRES_HOST_AUTH_METHOD: "trust"
+ ports:
+ - "${DATABASE_PORT:-5432}:5432"
+ volumes:
+ - "./:/swoop"
+ working_dir: "/swoop"
diff --git a/pyproject.toml b/pyproject.toml
index feb1dc6..0ec5174 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -39,10 +39,11 @@ dev = [
"httpx >=0.24.0",
"ruff >=0.0.253",
"mypy >=1.0.1",
- "pip-tools >= 6.12.3",
+ "pip-tools >=6.12.3",
"pytest >=7.2.2",
"pytest-cov >=4.0.0",
"pyupgrade >=3.3.1",
+ "sqlfluff >=2.0.3",
]
[tool.setuptools.dynamic]