From e1817f0a9dd97d4a0865f9ce76248a3a42a460bf Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Thu, 24 Mar 2022 17:38:24 -0700 Subject: [PATCH] feat: convenient wrapper for Postgres advisory locks (#1641) --- Cargo.lock | 15 +- sqlx-core/Cargo.toml | 2 + sqlx-core/src/pool/connection.rs | 12 + sqlx-core/src/postgres/advisory_lock.rs | 421 +++++++++++++++++++++++ sqlx-core/src/postgres/connection/mod.rs | 10 +- sqlx-core/src/postgres/mod.rs | 2 + sqlx-core/src/postgres/transaction.rs | 6 +- tests/postgres/postgres.rs | 66 +++- 8 files changed, 525 insertions(+), 9 deletions(-) create mode 100644 sqlx-core/src/postgres/advisory_lock.rs diff --git a/Cargo.lock b/Cargo.lock index 9505ec3c3f..d3ea1cdec4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1098,6 +1098,16 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hkdf" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01706d578d5c281058480e673ae4086a9f4710d8df1ad80a5b03e39ece5f886b" +dependencies = [ + "digest", + "hmac", +] + [[package]] name = "hmac" version = "0.11.0" @@ -1978,9 +1988,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.5.4" +version = "1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" dependencies = [ "aho-corasick", "memchr", @@ -2439,6 +2449,7 @@ dependencies = [ "git2", "hashlink", "hex", + "hkdf", "hmac", "indexmap", "ipnetwork", diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml index 20753f15db..1e06eb4b33 100644 --- a/sqlx-core/Cargo.toml +++ b/sqlx-core/Cargo.toml @@ -33,6 +33,7 @@ postgres = [ "json", "dirs", "whoami", + "hkdf" ] mysql = [ "sha-1", @@ -168,6 +169,7 @@ hashlink = "0.7.0" # NOTE: *must* remain below 1.7.0 to allow users to avoid the `ahash` cyclic dependency problem by pinning the version # https://github.com/tkaitchuck/aHash/issues/95#issuecomment-874150078 indexmap = "1.6.0" +hkdf = { version = "0.11.0", optional = true } [dev-dependencies] sqlx = { version = "0.5.11", path = "..", features = ["postgres", "sqlite"] } diff --git a/sqlx-core/src/pool/connection.rs b/sqlx-core/src/pool/connection.rs index 88864566c1..b9b78bb7b1 100644 --- a/sqlx-core/src/pool/connection.rs +++ b/sqlx-core/src/pool/connection.rs @@ -59,6 +59,18 @@ impl DerefMut for PoolConnection { } } +impl AsRef for PoolConnection { + fn as_ref(&self) -> &DB::Connection { + self + } +} + +impl AsMut for PoolConnection { + fn as_mut(&mut self) -> &mut DB::Connection { + self + } +} + impl PoolConnection { /// Explicitly release a connection from the pool #[deprecated = "renamed to `.detach()` for clarity"] diff --git a/sqlx-core/src/postgres/advisory_lock.rs b/sqlx-core/src/postgres/advisory_lock.rs new file mode 100644 index 0000000000..ce807484a6 --- /dev/null +++ b/sqlx-core/src/postgres/advisory_lock.rs @@ -0,0 +1,421 @@ +use crate::error::Result; +use crate::postgres::PgConnection; +use crate::Either; +use hkdf::Hkdf; +use once_cell::sync::OnceCell; +use sha2::Sha256; +use std::ops::{Deref, DerefMut}; + +/// A mutex-like type utilizing [Postgres advisory locks]. +/// +/// Advisory locks are a mechanism provided by Postgres to have mutually exclusive or shared +/// locks tracked in the database with application-defined semantics, as opposed to the standard +/// row-level or table-level locks which may not fit all use-cases. +/// +/// This API provides a convenient wrapper for generating and storing the integer keys that +/// advisory locks use, as well as RAII guards for releasing advisory locks when they fall out +/// of scope. +/// +/// This API only handles session-scoped advisory locks (explicitly locked and unlocked, or +/// automatically released when a connection is closed). +/// +/// It is also possible to use transaction-scoped locks but those can be used by beginning a +/// transaction and calling the appropriate lock functions (e.g. `SELECT pg_advisory_xact_lock()`) +/// manually, and cannot be explicitly released, but are automatically released when a transaction +/// ends (is committed or rolled back). +/// +/// Session-level locks can be acquired either inside or outside a transaction and are not +/// tied to transaction semantics; a lock acquired inside a transaction is still held when that +/// transaction is committed or rolled back, until explicitly released or the connection is closed. +/// +/// Locks can be acquired in either shared or exclusive modes, which can be thought of as read locks +/// and write locks, respectively. Multiple shared locks are allowed for the same key, but a single +/// exclusive lock prevents any other lock being taken for a given key until it is released. +/// +/// [Postgres advisory locks]: https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS +#[derive(Debug, Clone)] +pub struct PgAdvisoryLock { + key: PgAdvisoryLockKey, + /// The query to execute to release this lock. + release_query: OnceCell, +} + +/// A key type natively used by Postgres advisory locks. +/// +/// Currently, Postgres advisory locks have two different key spaces: one keyed by a single +/// 64-bit integer, and one keyed by a pair of two 32-bit integers. The Postgres docs +/// specify that these key spaces "do not overlap": +/// +/// https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS +/// +/// The documentation for the `pg_locks` system view explains further how advisory locks +/// are treated in Postgres: +/// +/// https://www.postgresql.org/docs/current/view-pg-locks.html +#[derive(Debug, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub enum PgAdvisoryLockKey { + /// The keyspace designated by a single 64-bit integer. + /// + /// When [PgAdvisoryLock] is constructed with [::new()][PgAdvisoryLock::new()], + /// this is the keyspace used. + BigInt(i64), + /// The keyspace designated by two 32-bit integers. + IntPair(i32, i32), +} + +/// A wrapper for `PgConnection` (or a similar type) that represents a held Postgres advisory lock. +/// +/// Can be acquired by [`PgAdvisoryLock::acquire()`] or [`PgAdvisoryLock::try_acquire()`]. +/// Released on-drop or via [`Self::release_now()`]. +/// +/// ### Note: Release-on-drop is not immediate! +/// On drop, this guard queues a `pg_advisory_unlock()` call on the connection which will be +/// flushed to the server the next time it is used, or when it is returned to +/// a [`PgPool`][crate::postgres::PgPool] in the case of +/// [`PoolConnection`][crate::pool::PoolConnection]. +/// +/// This means the lock is not actually released as soon as the guard is dropped. To ensure the +/// lock is eagerly released, you can call [`.release_now().await`][Self::release_now()]. +pub struct PgAdvisoryLockGuard<'lock, C: AsMut> { + lock: &'lock PgAdvisoryLock, + conn: Option, +} + +impl PgAdvisoryLock { + /// Construct a `PgAdvisoryLock` using the given string as a key. + /// + /// This is intended to make it easier to use an advisory lock by using a human-readable string + /// for a key as opposed to manually generating a unique integer key. The generated integer key + /// is guaranteed to be stable and in the single 64-bit integer keyspace + /// (see [`PgAdvisoryLockKey`] for details). + /// + /// This is done by applying the [Hash-based Key Derivation Function (HKDF; IETF RFC 5869)][hkdf] + /// to the bytes of the input string, but in a way that the calculated integer is unlikely + /// to collide with any similar implementations (although we don't currently know of any). + /// See the source of this method for details. + /// + /// [hkdf]: https://datatracker.ietf.org/doc/html/rfc5869 + /// ### Example + /// ```rust + /// # extern crate sqlx_core as sqlx; + /// use sqlx::postgres::{PgAdvisoryLock, PgAdvisoryLockKey}; + /// + /// let lock = PgAdvisoryLock::new("my first Postgres advisory lock!"); + /// // Negative values are fine because of how Postgres treats advisory lock keys. + /// // See the documentation for the `pg_locks` system view for details. + /// assert_eq!(lock.key(), &PgAdvisoryLockKey::BigInt(-5560419505042474287)); + /// ``` + pub fn new(key_string: impl AsRef) -> Self { + let input_key_material = key_string.as_ref(); + + // HKDF was chosen because it is designed to concentrate the entropy in a variable-length + // input key and produce a higher quality but reduced-length output key with a + // well-specified and reproducible algorithm. + // + // Granted, the input key is usually meant to be pseudorandom and not human readable, + // but we're not trying to produce an unguessable value by any means; just one that's as + // unlikely to already be in use as possible, but still deterministic. + // + // SHA-256 was chosen as the hash function because it's already used in the Postgres driver, + // which should save on codegen and optimization. + + // We don't supply a salt as that is intended to be random, but we want a deterministic key. + let hkdf = Hkdf::::new(None, input_key_material.as_bytes()); + + let mut output_key_material = [0u8; 8]; + + // The first string is the "info" string of the HKDF which is intended to tie the output + // exclusively to SQLx. This should avoid collisions with implementations using a similar + // strategy. If you _want_ this to match some other implementation then you should get + // the calculated integer key from it and use that directly. + // + // Do *not* change this string as it will affect the output! + hkdf.expand( + b"SQLx (Rust) Postgres advisory lock", + &mut output_key_material, + ) + // `Hkdf::expand()` only returns an error if you ask for more than 255 times the digest size. + // This is specified by RFC 5869 but not elaborated upon: + // https://datatracker.ietf.org/doc/html/rfc5869#section-2.3 + // Since we're only asking for 8 bytes, this error shouldn't be returned. + .expect("BUG: `output_key_material` should be of acceptable length"); + + // For ease of use, this method assumes the user doesn't care which keyspace is used. + // + // It doesn't seem likely that someone would care about using the `(int, int)` keyspace + // specifically unless they already had keys to use, in which case they wouldn't + // care about this method. That's why we also provide `with_key()`. + // + // The choice of `from_le_bytes()` is mostly due to x86 being the most popular + // architecture for server software, so it should be a no-op there. + let key = PgAdvisoryLockKey::BigInt(i64::from_le_bytes(output_key_material)); + + log::trace!( + "generated {:?} from key string {:?}", + key, + input_key_material + ); + + Self::with_key(key) + } + + /// Construct a `PgAdvisoryLock` with a manually supplied key. + pub fn with_key(key: PgAdvisoryLockKey) -> Self { + Self { + key, + release_query: OnceCell::new(), + } + } + + /// Returns the current key. + pub fn key(&self) -> &PgAdvisoryLockKey { + &self.key + } + + // Why doesn't this use `Acquire`? Well, I tried it and got really useless errors + // about "cannot project lifetimes to parent scope". + // + // It has something to do with how lifetimes work on the `Acquire` trait, I couldn't + // be bothered to figure it out. Probably another issue with a lack of `async fn` in traits + // or lazy normalization. + + /// Acquires an exclusive lock using `pg_advisory_lock()`, waiting until the lock is acquired. + /// + /// For a version that returns immediately instead of waiting, see [`Self::try_acquire()`]. + /// + /// A connection-like type is required to execute the call. Allowed types include `PgConnection`, + /// `PoolConnection` and `Transaction`, as well as mutable references to + /// any of these. + /// + /// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped, + /// which will be executed the next time the connection is used, or when returned to a + /// [`PgPool`][crate::postgres::PgPool] in the case of `PoolConnection`. + /// + /// Postgres allows a single connection to acquire a given lock more than once without releasing + /// it first, so in that sense the lock is re-entrant. However, the number of unlock operations + /// must match the number of lock operations for the lock to actually be released. + /// + /// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details. + /// + /// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS + pub async fn acquire>( + &self, + mut conn: C, + ) -> Result> { + match &self.key { + PgAdvisoryLockKey::BigInt(key) => { + crate::query::query("SELECT pg_advisory_lock($1)") + .bind(key) + .execute(conn.as_mut()) + .await?; + } + PgAdvisoryLockKey::IntPair(key1, key2) => { + crate::query::query("SELECT pg_advisory_lock($1, $2)") + .bind(key1) + .bind(key2) + .execute(conn.as_mut()) + .await?; + } + } + + Ok(PgAdvisoryLockGuard::new(self, conn)) + } + + /// Acquires an exclusive lock using `pg_try_advisory_lock()`, returning immediately + /// if the lock could not be acquired. + /// + /// For a version that waits until the lock is acquired, see [`Self::acquire()`]. + /// + /// A connection-like type is required to execute the call. Allowed types include `PgConnection`, + /// `PoolConnection` and `Transaction`, as well as mutable references to + /// any of these. The connection is returned if the lock could not be acquired. + /// + /// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped, + /// which will be executed the next time the connection is used, or when returned to a + /// [`PgPool`][crate::postgres::PgPool] in the case of `PoolConnection`. + /// + /// Postgres allows a single connection to acquire a given lock more than once without releasing + /// it first, so in that sense the lock is re-entrant. However, the number of unlock operations + /// must match the number of lock operations for the lock to actually be released. + /// + /// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details. + /// + /// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS + pub async fn try_acquire>( + &self, + mut conn: C, + ) -> Result, C>> { + let locked: bool = match &self.key { + PgAdvisoryLockKey::BigInt(key) => { + crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1)") + .bind(key) + .fetch_one(conn.as_mut()) + .await? + } + PgAdvisoryLockKey::IntPair(key1, key2) => { + crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1, $2)") + .bind(key1) + .bind(key2) + .fetch_one(conn.as_mut()) + .await? + } + }; + + if locked { + Ok(Either::Left(PgAdvisoryLockGuard::new(self, conn))) + } else { + Ok(Either::Right(conn)) + } + } + + /// Execute `pg_advisory_unlock()` for this lock's key on the given connection. + /// + /// This is used by [`PgAdvisoryLockGuard::release_now()`] and is also provided for manually + /// releasing the lock from connections returned by [`PgAdvisoryLockGuard::leak()`]. + /// + /// An error should only be returned if there is something wrong with the connection, + /// in which case the lock will be automatically released by the connection closing anyway. + /// + /// The `boolean` value is that returned by `pg_advisory_lock()`. If it is `false`, it + /// indicates that the lock was not actually held by the given connection and that a warning + /// has been logged by the Postgres server. + pub async fn force_release>(&self, mut conn: C) -> Result<(C, bool)> { + let released: bool = match &self.key { + PgAdvisoryLockKey::BigInt(key) => { + crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1)") + .bind(key) + .fetch_one(conn.as_mut()) + .await? + } + PgAdvisoryLockKey::IntPair(key1, key2) => { + crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1, $2)") + .bind(key1) + .bind(key2) + .fetch_one(conn.as_mut()) + .await? + } + }; + + Ok((conn, released)) + } + + fn get_release_query(&self) -> &str { + self.release_query.get_or_init(|| match &self.key { + PgAdvisoryLockKey::BigInt(key) => format!("SELECT pg_advisory_unlock({})", key), + PgAdvisoryLockKey::IntPair(key1, key2) => { + format!("SELECT pg_advisory_unlock({}, {})", key1, key2) + } + }) + } +} + +impl PgAdvisoryLockKey { + /// Converts `Self::Bigint(bigint)` to `Some(bigint)` and all else to `None`. + pub fn as_bigint(&self) -> Option { + if let Self::BigInt(bigint) = self { + Some(*bigint) + } else { + None + } + } +} + +const NONE_ERR: &str = "BUG: PgAdvisoryLockGuard.conn taken"; + +impl<'lock, C: AsMut> PgAdvisoryLockGuard<'lock, C> { + fn new(lock: &'lock PgAdvisoryLock, conn: C) -> Self { + PgAdvisoryLockGuard { + lock, + conn: Some(conn), + } + } + + /// Immediately release the held advisory lock instead of when the connection is next used. + /// + /// An error should only be returned if there is something wrong with the connection, + /// in which case the lock will be automatically released by the connection closing anyway. + /// + /// If `pg_advisory_unlock()` returns `false`, a warning will be logged, both by SQLx as + /// well as the Postgres server. This would only happen if the lock was released without + /// using this guard, or the connection was swapped using [`std::mem::replace()`]. + pub async fn release_now(mut self) -> Result { + let (conn, released) = self + .lock + .force_release(self.conn.take().expect(NONE_ERR)) + .await?; + + if !released { + log::warn!( + "PgAdvisoryLockGuard: advisory lock {:?} was not held by the contained connection", + self.lock.key + ); + } + + Ok(conn) + } + + /// Cancel the release of the advisory lock, keeping it held until the connection is closed. + /// + /// To manually release the lock later, see [`PgAdvisoryLock::force_release()`]. + pub fn leak(mut self) -> C { + self.conn.take().expect(NONE_ERR) + } +} + +impl<'lock, C: AsMut + AsRef> Deref for PgAdvisoryLockGuard<'lock, C> { + type Target = PgConnection; + + fn deref(&self) -> &Self::Target { + self.conn.as_ref().expect(NONE_ERR).as_ref() + } +} + +/// Mutable access to the underlying connection is provided so it can still be used like normal, +/// even allowing locks to be taken recursively. +/// +/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`] +/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this +/// guard attempts to release the lock. +impl<'lock, C: AsMut + AsRef> DerefMut + for PgAdvisoryLockGuard<'lock, C> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + self.conn.as_mut().expect(NONE_ERR).as_mut() + } +} + +impl<'lock, C: AsMut + AsRef> AsRef + for PgAdvisoryLockGuard<'lock, C> +{ + fn as_ref(&self) -> &PgConnection { + self.conn.as_ref().expect(NONE_ERR).as_ref() + } +} + +/// Mutable access to the underlying connection is provided so it can still be used like normal, +/// even allowing locks to be taken recursively. +/// +/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`] +/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this +/// guard attempts to release the lock. +impl<'lock, C: AsMut> AsMut for PgAdvisoryLockGuard<'lock, C> { + fn as_mut(&mut self) -> &mut PgConnection { + self.conn.as_mut().expect(NONE_ERR).as_mut() + } +} + +/// Queues a `pg_advisory_unlock()` call on the wrapped connection which will be flushed +/// to the server the next time it is used, or when it is returned to [`PgPool`][crate::postgres::PgPool] +/// in the case of [`PoolConnection`][crate::pool::PoolConnection]. +impl<'lock, C: AsMut> Drop for PgAdvisoryLockGuard<'lock, C> { + fn drop(&mut self) { + if let Some(mut conn) = self.conn.take() { + // Queue a simple query message to execute next time the connection is used. + // The `async fn` versions can safely use the prepared statement protocol, + // but this is the safest way to queue a query to execute on the next opportunity. + conn.as_mut() + .queue_simple_query(self.lock.get_release_query()); + } + } +} diff --git a/sqlx-core/src/postgres/connection/mod.rs b/sqlx-core/src/postgres/connection/mod.rs index 5241af0210..f9c28c9d52 100644 --- a/sqlx-core/src/postgres/connection/mod.rs +++ b/sqlx-core/src/postgres/connection/mod.rs @@ -12,7 +12,7 @@ use crate::executor::Executor; use crate::ext::ustr::UStr; use crate::io::Decode; use crate::postgres::message::{ - Close, Message, MessageFormat, ReadyForQuery, Terminate, TransactionStatus, + Close, Message, MessageFormat, Query, ReadyForQuery, Terminate, TransactionStatus, }; use crate::postgres::statement::PgStatementMetadata; use crate::postgres::{PgConnectOptions, PgTypeInfo, Postgres}; @@ -101,6 +101,14 @@ impl PgConnection { Ok(()) } + + /// Queue a simple query (not prepared) to execute the next time this connection is used. + /// + /// Used for rolling back transactions and releasing advisory locks. + pub(crate) fn queue_simple_query(&mut self, query: &str) { + self.pending_ready_for_query_count += 1; + self.stream.write(Query(query)); + } } impl Debug for PgConnection { diff --git a/sqlx-core/src/postgres/mod.rs b/sqlx-core/src/postgres/mod.rs index 85f8d2457b..e2966c6da8 100644 --- a/sqlx-core/src/postgres/mod.rs +++ b/sqlx-core/src/postgres/mod.rs @@ -2,6 +2,7 @@ use crate::executor::Executor; +mod advisory_lock; mod arguments; mod column; mod connection; @@ -23,6 +24,7 @@ mod value; #[cfg(feature = "migrate")] mod migrate; +pub use advisory_lock::{PgAdvisoryLock, PgAdvisoryLockGuard, PgAdvisoryLockKey}; pub use arguments::{PgArgumentBuffer, PgArguments}; pub use column::PgColumn; pub use connection::{PgConnection, PgConnectionInfo}; diff --git a/sqlx-core/src/postgres/transaction.rs b/sqlx-core/src/postgres/transaction.rs index efb11b8223..c65e772099 100644 --- a/sqlx-core/src/postgres/transaction.rs +++ b/sqlx-core/src/postgres/transaction.rs @@ -2,7 +2,6 @@ use futures_core::future::BoxFuture; use crate::error::Error; use crate::executor::Executor; -use crate::postgres::message::Query; use crate::postgres::{PgConnection, Postgres}; use crate::transaction::{ begin_ansi_transaction_sql, commit_ansi_transaction_sql, rollback_ansi_transaction_sql, @@ -54,10 +53,7 @@ impl TransactionManager for PgTransactionManager { fn start_rollback(conn: &mut PgConnection) { if conn.transaction_depth > 0 { - conn.pending_ready_for_query_count += 1; - conn.stream.write(Query(&rollback_ansi_transaction_sql( - conn.transaction_depth, - ))); + conn.queue_simple_query(&rollback_ansi_transaction_sql(conn.transaction_depth)); conn.transaction_depth -= 1; } diff --git a/tests/postgres/postgres.rs b/tests/postgres/postgres.rs index ac755018bc..10092263a4 100644 --- a/tests/postgres/postgres.rs +++ b/tests/postgres/postgres.rs @@ -1,11 +1,12 @@ use futures::{StreamExt, TryStreamExt}; use sqlx::postgres::{ - PgConnectOptions, PgConnection, PgDatabaseError, PgErrorPosition, PgSeverity, + PgAdvisoryLock, PgConnectOptions, PgConnection, PgDatabaseError, PgErrorPosition, PgSeverity, }; use sqlx::postgres::{PgConnectionInfo, PgPoolOptions, PgRow, Postgres}; use sqlx::{Column, Connection, Executor, Row, Statement, TypeInfo}; use sqlx_test::{new, setup_if_needed}; use std::env; +use std::sync::Arc; use std::time::Duration; #[sqlx_macros::test] @@ -1445,3 +1446,66 @@ CREATE TABLE issue_1254 (id INT4 PRIMARY KEY, pairs PAIR[]); Ok(()) } + +#[sqlx_macros::test] +async fn test_advisory_locks() -> anyhow::Result<()> { + let pool = PgPoolOptions::new() + .max_connections(2) + .connect(&dotenv::var("DATABASE_URL")?) + .await?; + + let lock1 = Arc::new(PgAdvisoryLock::new("sqlx-postgres-tests-1")); + let lock2 = Arc::new(PgAdvisoryLock::new("sqlx-postgres-tests-2")); + + let conn1 = pool.acquire().await?; + let mut conn1_lock1 = lock1.acquire(conn1).await?; + + // try acquiring a recursive lock through a mutable reference then dropping + drop(lock1.acquire(&mut conn1_lock1).await?); + + let conn2 = pool.acquire().await?; + // leak so we can take it across the task boundary + let conn2_lock2 = lock2.acquire(conn2).await?.leak(); + + sqlx_rt::spawn({ + let lock1 = lock1.clone(); + let lock2 = lock2.clone(); + + async move { + let conn2_lock2 = lock1.try_acquire(conn2_lock2).await?.right_or_else(|_| { + panic!( + "acquired lock but wasn't supposed to! Key: {:?}", + lock1.key() + ) + }); + + let (conn2, released) = lock2.force_release(conn2_lock2).await?; + assert!(released); + + // acquire both locks but let the pool release them + let conn2_lock1 = lock1.acquire(conn2).await?; + let _conn2_lock1and2 = lock2.acquire(conn2_lock1).await?; + + anyhow::Ok(()) + } + }); + + // acquire lock2 on conn1, we leak the lock1 guard so we can manually release it before lock2 + let conn1_lock1and2 = lock2.acquire(conn1_lock1.leak()).await?; + + // release lock1 while holding lock2 + let (conn1_lock2, released) = lock1.force_release(conn1_lock1and2).await?; + assert!(released); + + let conn1 = conn1_lock2.release_now().await?; + + // acquire both locks to be sure they were released + { + let conn1_lock1 = lock1.acquire(conn1).await?; + let _conn1_lock1and2 = lock2.acquire(conn1_lock1).await?; + } + + pool.close().await; + + Ok(()) +}