Skip to content

Commit

Permalink
WIP feat: convenient wrapper for Postgres advisory locks
Browse files Browse the repository at this point in the history
  • Loading branch information
abonander committed Jan 19, 2022
1 parent dbdedcd commit 3f4e859
Show file tree
Hide file tree
Showing 5 changed files with 345 additions and 0 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ postgres = [
"json",
"dirs",
"whoami",
"hkdf"
]
mysql = [
"sha-1",
Expand Down Expand Up @@ -164,3 +165,4 @@ bstr = { version = "0.2.17", default-features = false, features = ["std"], optio
git2 = { version = "0.13.25", default-features = false, optional = true }
hashlink = "0.7.0"
indexmap = "1.7.0"
hkdf = { version = "0.11.0", optional = true }
12 changes: 12 additions & 0 deletions sqlx-core/src/pool/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ impl<DB: Database> DerefMut for PoolConnection<DB> {
}
}

impl<DB: Database> AsRef<DB::Connection> for PoolConnection<DB> {
fn as_ref(&self) -> &DB::Connection {
self
}
}

impl<DB: Database> AsMut<DB::Connection> for PoolConnection<DB> {
fn as_mut(&mut self) -> &mut DB::Connection {
self
}
}

impl<DB: Database> PoolConnection<DB> {
/// Explicitly release a connection from the pool
#[deprecated = "renamed to `.detach()` for clarity"]
Expand Down
318 changes: 318 additions & 0 deletions sqlx-core/src/postgres/advisory_lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
use crate::error::Result;
use crate::postgres::message::Query;
use crate::postgres::PgConnection;
use crate::Either;
use hkdf::Hkdf;
use once_cell::sync::OnceCell;
use sha2::Sha256;
use std::ops::{Deref, DerefMut};

/// A mutex-like type utilizing Postgres advisory locks.
///
/// https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
#[derive(Debug, Clone)]
pub struct PgAdvisoryLock {
key: PgAdvisoryLockKey,
/// The query to execute to release this lock.
release_query: OnceCell<String>,
}

/// A key type natively used by Postgres advisory locks.
///
/// Currently, Postgres advisory locks have two different key spaces: one keyed by a single
/// 64-bit integer, and one keyed by a pair of two 32-bit integers. The Postgres docs
/// specify that these key spaces "do not overlap":
///
/// https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
///
/// The documentation for the `pg_locks` system view explains further how advisory locks
/// are treated in Postgres:
///
/// https://www.postgresql.org/docs/current/view-pg-locks.html
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum PgAdvisoryLockKey {
/// The keyspace designated by a single 64-bit integer.
///
/// When [PgAdvisoryLock] is constructed with [::new()][PgAdvisoryLock::new()],
/// this is the keyspace used.
BigInt(i64),
/// The keyspace designated by two 32-bit integers.
IntPair(i32, i32),
}

/// A wrapper for `PgConnection` (or a similar type) that represents a held Postgres advisory lock.
///
///
pub struct PgAdvisoryLockGuard<'lock, C: AsMut<PgConnection>> {
lock: &'lock PgAdvisoryLock,
conn: Option<C>,
}

impl PgAdvisoryLock {
/// Construct a `PgAdvisoryLock` using the given string as a key.
///
/// This is intended to make it easier to use an advisory lock by using a human-readable string
/// for a key as opposed to manually generating a unique integer key. The generated integer key
/// is guaranteed to be stable and in the single 64-bit integer keyspace
/// (see [`PgAdvisoryLockKey`] for details).
///
/// This is done by applying the [Hash-based Key Derivation Function (HKDF; IETF RFC 5869)][hkdf]
/// to the bytes of the input string. See the source of this method for details.
///
/// [hkdf]: https://datatracker.ietf.org/doc/html/rfc5869
/// ### Example
/// ```rust
/// # use sqlx_core as sqlx;
/// use sqlx::postgres::{PgAdvisoryLock, PgAdvisoryLockKey};
///
/// let lock = PgAdvisoryLock::new("my first Postgres advisory lock!");
/// // Negative values are fine because of how Postgres treats advisory lock keys.
/// // See the documentation for the `pg_locks` system view for details.
/// assert_eq!(lock.key(), &PgAdvisoryLockKey::BigInt(-5560419505042474287));
/// ```
pub fn new(key_string: impl AsRef<str>) -> Self {
let input_key_material = key_string.as_ref();

// HKDF was chosen because it is designed to concentrate the entropy in a variable-length
// input key and produce a higher quality but reduced-length output key with a
// well-specified and reproducible algorithm.
//
// Granted, the input key is usually meant to be pseudorandom and not human readable,
// but we're not trying to produce an unguessable value by any means; just one that's as
// unlikely to already be in use as possible, but still deterministic.
//
// SHA-256 was chosen as the hash function because it's already used in the Postgres driver,
// which should save on codegen and optimization.

// We don't supply a salt as that is intended to be random, but we want a deterministic key.
let hkdf = Hkdf::<Sha256>::new(None, input_key_material.as_bytes());

let mut output_key_material = [0u8; 8];

// The first string is the "info" string of the HKDF which is intended to tie the output
// exclusively to SQLx. This should avoid collisions with implementations using a similar
// strategy. If you _want_ this to match some other implementation then you should get
// the calculated integer key from it and use that directly.
//
// Do *not* change this string as it will affect the output!
hkdf.expand(
b"SQLx (Rust) Postgres advisory lock",
&mut output_key_material,
)
// `Hkdf::expand()` only returns an error if you ask for more than 255 times the digest size.
// This is specified by RFC 5869 but not elaborated upon:
// https://datatracker.ietf.org/doc/html/rfc5869#section-2.3
// Since we're only asking for 8 bytes, this error shouldn't be returned.
.expect("BUG: `output_key_material` should be of acceptable length");

// For ease of use, this method assumes the user doesn't care which keyspace is used.
//
// It doesn't seem likely that someone would care about using the `(int, int)` keyspace
// specifically unless they already had keys to use, in which case they wouldn't
// care about this method. That's why we also provide `with_key()`.
//
// The choice of `from_le_bytes()` is mostly due to x86 being the most popular
// architecture for server software, so it should be a no-op there.
let key = PgAdvisoryLockKey::BigInt(i64::from_le_bytes(output_key_material));

log::trace!(
"generated {:?} from key string {:?}",
key,
input_key_material
);

Self::with_key(key)
}

/// Construct a `PgAdvisoryLock` with a manually supplied key.
pub fn with_key(key: PgAdvisoryLockKey) -> Self {
Self {
key,
release_query: OnceCell::new(),
}
}

/// Returns the current key.
pub fn key(&self) -> &PgAdvisoryLockKey {
&self.key
}

/// Acquires an advisory lock using `pg_advisory_lock()`, waiting until the lock is acquired.
///
/// A connection-like type is required to execute the call. Allowed types include `PgConnection`,
/// `PoolConnection<Postgres>` and `Transaction<Postgres>`, as well as mutable references to
/// any of these.
///
/// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped,
/// which will be executed the next time the connection is used, or when returned to a
/// [`PgPool`][crate::postgres::PgPool] in the case of `PoolConnection<Postgres>`.
pub async fn acquire<C: AsMut<PgConnection>>(
&self,
mut conn: C,
) -> Result<PgAdvisoryLockGuard<'_, C>> {
match &self.key {
PgAdvisoryLockKey::BigInt(key) => {
crate::query::query("SELECT pg_advisory_lock($1)")
.bind(key)
.execute(conn.as_mut())
.await?;
}
PgAdvisoryLockKey::IntPair(key1, key2) => {
crate::query::query("SELECT pg_advisory_lock($1, $2)")
.bind(key1)
.bind(key2)
.execute(conn.as_mut())
.await?;
}
}

Ok(PgAdvisoryLockGuard::new(self, conn))
}

/// Acquires an advisory lock using `pg_try_advisory_lock()`, returning immediately
/// if the lock could not be acquired.
///
/// A connection-like type is required to execute the call. Allowed types include `PgConnection`,
/// `PoolConnection<Postgres>` and `Transaction<Postgres>`, as well as mutable references to
/// any of these. The connection is returned if the lock could not be acquired.
///
/// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped,
/// which will be executed the next time the connection is used, or when returned to a
/// [`PgPool`][crate::postgres::PgPool] in the case of `PoolConnection<Postgres>`.
pub async fn try_acquire<C: AsMut<PgConnection>>(
&self,
mut conn: C,
) -> Result<Either<PgAdvisoryLockGuard<'_, C>, C>> {
let locked: bool = match &self.key {
PgAdvisoryLockKey::BigInt(key) => {
crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1)")
.bind(key)
.fetch_one(conn.as_mut())
.await?
}
PgAdvisoryLockKey::IntPair(key1, key2) => {
crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1, $2)")
.bind(key1)
.bind(key2)
.fetch_one(conn.as_mut())
.await?
}
};

if locked {
Ok(Either::Left(PgAdvisoryLockGuard::new(self, conn)))
} else {
Ok(Either::Right(conn))
}
}

fn get_release_query(&self) -> &str {
self.release_query.get_or_init(|| match &self.key {
PgAdvisoryLockKey::BigInt(key) => format!("SELECT pg_advisory_unlock({})", key),
PgAdvisoryLockKey::IntPair(key1, key2) => {
format!("SELECT pg_advisory_unlock({}, {})", key1, key2)
}
})
}
}

impl PgAdvisoryLockKey {
/// Converts `Self::Bigint(bigint)` to `Some(bigint)` and all else to `None`.
pub fn as_bigint(&self) -> Option<i64> {
if let Self::BigInt(bigint) = self {
Some(*bigint)
} else {
None
}
}
}

const NONE_ERR: &str = "BUG: PgAdvisoryLockGuard.conn taken";

impl<'lock, C: AsMut<PgConnection>> PgAdvisoryLockGuard<'lock, C> {
fn new(lock: &'lock PgAdvisoryLock, conn: C) -> Self {
PgAdvisoryLockGuard {
lock,
conn: Some(conn),
}
}

/// Immediately release the held advisory lock instead of on the next use of the connection.
///
/// An error should only be returned if there is something wrong with the connection,
/// in which case the lock will be automatically released by the connection closing anyway.
pub async fn release_now(mut self) -> Result<C> {
let mut conn = self.conn.take().expect(NONE_ERR);

let unlocked: bool = match &self.lock.key {
PgAdvisoryLockKey::BigInt(key) => {
crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1)")
.bind(key)
.fetch_one(conn.as_mut())
.await?
}
PgAdvisoryLockKey::IntPair(key1, key2) => {
crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1, $2)")
.bind(key1)
.bind(key2)
.fetch_one(conn.as_mut())
.await?
}
};

if !unlocked {
log::warn!("advisory lock {:?} was already unlocked", self.lock.key);
}

Ok(conn)
}

/// Cancel the release of the advisory lock. The lock will be held until the connection is closed!
pub fn cancel(mut self) -> C {
self.conn.take().expect(NONE_ERR)
}
}

impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> Deref for PgAdvisoryLockGuard<'lock, C> {
type Target = PgConnection;

fn deref(&self) -> &Self::Target {
self.conn.as_ref().expect(NONE_ERR).as_ref()
}
}

impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> DerefMut
for PgAdvisoryLockGuard<'lock, C>
{
fn deref_mut(&mut self) -> &mut Self::Target {
self.conn.as_mut().expect(NONE_ERR).as_mut()
}
}

impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> AsRef<PgConnection>
for PgAdvisoryLockGuard<'lock, C>
{
fn as_ref(&self) -> &PgConnection {
self.conn.as_ref().expect(NONE_ERR).as_ref()
}
}

impl<'lock, C: AsMut<PgConnection>> AsMut<PgConnection> for PgAdvisoryLockGuard<'lock, C> {
fn as_mut(&mut self) -> &mut PgConnection {
self.conn.as_mut().expect(NONE_ERR).as_mut()
}
}

impl<'lock, C: AsMut<PgConnection>> Drop for PgAdvisoryLockGuard<'lock, C> {
fn drop(&mut self) {
if let Some(mut conn) = self.conn.take() {
// Queue a simple query message to execute next time the connection is used.
// The `async fn` versions can safely use the prepared statement protocol,
// but this is the safest way to queue a query to execute on the next opportunity.
conn.as_mut()
.stream
.write(Query(self.lock.get_release_query()));
}
}
}
2 changes: 2 additions & 0 deletions sqlx-core/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use crate::executor::Executor;

mod advisory_lock;
mod arguments;
mod column;
mod connection;
Expand All @@ -23,6 +24,7 @@ mod value;
#[cfg(feature = "migrate")]
mod migrate;

pub use advisory_lock::{PgAdvisoryLock, PgAdvisoryLockGuard, PgAdvisoryLockKey};
pub use arguments::{PgArgumentBuffer, PgArguments};
pub use column::PgColumn;
pub use connection::{PgConnection, PgConnectionInfo};
Expand Down

0 comments on commit 3f4e859

Please sign in to comment.