diff --git a/api-server/api-server-common/src/storage/impls/postgres/mod.rs b/api-server/api-server-common/src/storage/impls/postgres/mod.rs index db4670e45e..2cf277e282 100644 --- a/api-server/api-server-common/src/storage/impls/postgres/mod.rs +++ b/api-server/api-server-common/src/storage/impls/postgres/mod.rs @@ -20,6 +20,7 @@ mod queries; use std::str::FromStr; use bb8_postgres::bb8::Pool; +use bb8_postgres::bb8::PooledConnection; use bb8_postgres::PostgresConnectionManager; use tokio_postgres::NoTls; @@ -32,6 +33,19 @@ use self::transactional::ApiServerPostgresTransactionalRw; pub struct TransactionalApiServerPostgresStorage { pool: Pool>, + /// This task is responsible for rolling back failed RW/RO transactions, since closing connections are pooled + tx_dropper_joiner: tokio::task::JoinHandle<()>, + /// This channel is used to send transactions that are not manually rolled back to the tx_dropper task to roll them back + db_tx_conn_sender: tokio::sync::mpsc::UnboundedSender< + PooledConnection<'static, PostgresConnectionManager>, + >, +} + +impl Drop for TransactionalApiServerPostgresStorage { + fn drop(&mut self) { + // Since the whole connection pool will be destroyed, we can safely abort all connections + self.tx_dropper_joiner.abort(); + } } impl TransactionalApiServerPostgresStorage { @@ -57,7 +71,26 @@ impl TransactionalApiServerPostgresStorage { )) })?; - let result = Self { pool }; + let (conn_tx, conn_rx) = tokio::sync::mpsc::unbounded_channel::< + PooledConnection<'static, PostgresConnectionManager>, + >(); + + let tx_dropper_joiner = tokio::task::spawn(async move { + let mut conn_rx = conn_rx; + while let Some(conn) = conn_rx.recv().await { + conn.batch_execute("ROLLBACK").await.unwrap_or_else(|e| { + logging::log::error!( + "CRITICAL ERROR: failed to rollback failed postgres RW transaction: {e}" + ) + }); + } + }); + + let result = Self { + pool, + tx_dropper_joiner, + db_tx_conn_sender: conn_tx, + }; result.initialize_if_not(chain_config).await?; @@ -80,10 +113,11 @@ impl TransactionalApiServerPostgresStorage { ) -> Result { let conn = self .pool - .get() + .get_owned() .await .map_err(|e| ApiServerStorageError::AcquiringConnectionFailed(e.to_string()))?; - ApiServerPostgresTransactionalRo::from_connection(conn).await + ApiServerPostgresTransactionalRo::from_connection(conn, self.db_tx_conn_sender.clone()) + .await } pub async fn begin_rw_transaction( @@ -91,9 +125,10 @@ impl TransactionalApiServerPostgresStorage { ) -> Result { let conn = self .pool - .get() + .get_owned() .await .map_err(|e| ApiServerStorageError::AcquiringConnectionFailed(e.to_string()))?; - ApiServerPostgresTransactionalRw::from_connection(conn).await + ApiServerPostgresTransactionalRw::from_connection(conn, self.db_tx_conn_sender.clone()) + .await } } diff --git a/api-server/api-server-common/src/storage/impls/postgres/transactional/mod.rs b/api-server/api-server-common/src/storage/impls/postgres/transactional/mod.rs index ebc9aa0cb4..0bbd3d3bea 100644 --- a/api-server/api-server-common/src/storage/impls/postgres/transactional/mod.rs +++ b/api-server/api-server-common/src/storage/impls/postgres/transactional/mod.rs @@ -30,34 +30,53 @@ use crate::storage::storage_api::{ use super::{queries::QueryFromConnection, TransactionalApiServerPostgresStorage}; +const CONN_ERR: &str = "CRITICAL ERROR: failed to get postgres tx connection. Invariant broken."; + pub struct ApiServerPostgresTransactionalRo<'a> { - connection: PooledConnection<'a, PostgresConnectionManager>, + // Note: This is an Option due to needing to pry the connection out of Self in Drop + connection: Option>>, finished: bool, + db_tx_sender: tokio::sync::mpsc::UnboundedSender< + PooledConnection<'static, PostgresConnectionManager>, + >, + // Note: This exists to enforce that a transaction never outlives the database object, + // given that all connections have 'static lifetimes + _marker: std::marker::PhantomData<&'a ()>, } impl<'a> ApiServerPostgresTransactionalRo<'a> { pub(super) async fn from_connection( - connection: PooledConnection<'a, PostgresConnectionManager>, - ) -> Result { + connection: PooledConnection<'static, PostgresConnectionManager>, + db_tx_sender: tokio::sync::mpsc::UnboundedSender< + PooledConnection<'static, PostgresConnectionManager>, + >, + ) -> Result, ApiServerStorageError> { let tx = Self { - connection, + connection: Some(connection), finished: false, + db_tx_sender, + _marker: std::marker::PhantomData, }; - tx.connection.batch_execute("BEGIN READ ONLY").await.map_err(|e| { - ApiServerStorageError::RoTxBeginFailed(format!("Transaction begin failed: {}", e)) - })?; + tx.connection + .as_ref() + .expect(CONN_ERR) + .batch_execute("BEGIN READ ONLY") + .await + .map_err(|e| { + ApiServerStorageError::RoTxBeginFailed(format!("Transaction begin failed: {}", e)) + })?; Ok(tx) } pub async fn is_initialized(&mut self) -> Result { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.is_initialized().await?; Ok(res) } pub async fn get_storage_version(&mut self) -> Result, ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.get_storage_version().await?; Ok(res) @@ -66,7 +85,7 @@ impl<'a> ApiServerPostgresTransactionalRo<'a> { pub async fn get_best_block( &mut self, ) -> Result<(BlockHeight, Id), ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.get_best_block().await?; Ok(res) @@ -76,7 +95,7 @@ impl<'a> ApiServerPostgresTransactionalRo<'a> { &mut self, block_height: BlockHeight, ) -> Result>, ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.get_main_chain_block_id(block_height).await?; Ok(res) @@ -86,7 +105,7 @@ impl<'a> ApiServerPostgresTransactionalRo<'a> { &mut self, block_id: Id, ) -> Result, ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.get_block(block_id).await?; Ok(res) @@ -97,7 +116,7 @@ impl<'a> ApiServerPostgresTransactionalRo<'a> { &mut self, transaction_id: Id, ) -> Result>, SignedTransaction)>, ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.get_transaction(transaction_id).await?; Ok(res) @@ -107,7 +126,7 @@ impl<'a> ApiServerPostgresTransactionalRo<'a> { &mut self, block_id: Id, ) -> Result, ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.get_block_aux_data(block_id).await?; Ok(res) @@ -115,35 +134,52 @@ impl<'a> ApiServerPostgresTransactionalRo<'a> { } pub struct ApiServerPostgresTransactionalRw<'a> { - connection: PooledConnection<'a, PostgresConnectionManager>, + // Note: This is an Option due to needing to pry the connection out of Self in Drop + connection: Option>>, finished: bool, + db_tx_sender: tokio::sync::mpsc::UnboundedSender< + PooledConnection<'static, PostgresConnectionManager>, + >, + // Note: This exists to enforce that a transaction never outlives the database object, + // given that all connections have 'static lifetimes + _marker: std::marker::PhantomData<&'a ()>, } impl<'a> Drop for ApiServerPostgresTransactionalRw<'a> { fn drop(&mut self) { if !self.finished { - futures::executor::block_on(self.connection.batch_execute("ROLLBACK")).unwrap_or_else( - |e| { + self.db_tx_sender + .send(self.connection.take().expect(CONN_ERR)) + .unwrap_or_else(|e| { logging::log::error!( - "CRITICAL ERROR: failed to rollback failed postgres RW transaction: {e}" + "CRITICAL ERROR: failed to send postgres RW transaction connection for closure: {e}" ) - }, - ); + }); } } } impl<'a> ApiServerPostgresTransactionalRw<'a> { pub(super) async fn from_connection( - connection: PooledConnection<'a, PostgresConnectionManager>, + connection: PooledConnection<'static, PostgresConnectionManager>, + db_tx_sender: tokio::sync::mpsc::UnboundedSender< + PooledConnection<'static, PostgresConnectionManager>, + >, ) -> Result, ApiServerStorageError> { let tx = Self { - connection, + connection: Some(connection), finished: false, + db_tx_sender, + _marker: std::marker::PhantomData, }; - tx.connection.batch_execute("BEGIN READ WRITE").await.map_err(|e| { - ApiServerStorageError::RwTxBeginFailed(format!("Transaction begin failed: {}", e)) - })?; + tx.connection + .as_ref() + .expect(CONN_ERR) + .batch_execute("BEGIN READ WRITE") + .await + .map_err(|e| { + ApiServerStorageError::RwTxBeginFailed(format!("Transaction begin failed: {}", e)) + })?; Ok(tx) } } @@ -152,6 +188,8 @@ impl<'a> ApiServerPostgresTransactionalRw<'a> { impl<'a> ApiServerTransactionRw for ApiServerPostgresTransactionalRw<'a> { async fn commit(mut self) -> Result<(), crate::storage::storage_api::ApiServerStorageError> { self.connection + .as_ref() + .expect(CONN_ERR) .batch_execute("COMMIT") .await .map_err(|e| ApiServerStorageError::TxCommitFailed(e.to_string()))?; @@ -161,6 +199,8 @@ impl<'a> ApiServerTransactionRw for ApiServerPostgresTransactionalRw<'a> { async fn rollback(mut self) -> Result<(), crate::storage::storage_api::ApiServerStorageError> { self.connection + .as_ref() + .expect(CONN_ERR) .batch_execute("ROLLBACK") .await .map_err(|e| ApiServerStorageError::TxCommitFailed(e.to_string()))?; @@ -179,31 +219,31 @@ impl<'a> ApiServerTransactionRo for ApiServerPostgresTransactionalRo<'a> { impl<'a> Drop for ApiServerPostgresTransactionalRo<'a> { fn drop(&mut self) { if !self.finished { - futures::executor::block_on(self.connection.batch_execute("ROLLBACK")).unwrap_or_else( - |e| { + self.db_tx_sender + .send(self.connection.take().expect(CONN_ERR)) + .unwrap_or_else(|e| { logging::log::error!( - "CRITICAL ERROR: failed to rollback failed postgres RO transaction: {e}" + "CRITICAL ERROR: failed to send postgres RO transaction connection for closure: {e}" ) - }, - ); + }); } } } #[async_trait::async_trait] -impl<'t> Transactional<'t> for TransactionalApiServerPostgresStorage { - type TransactionRo = ApiServerPostgresTransactionalRo<'t>; +impl<'tx> Transactional<'tx> for TransactionalApiServerPostgresStorage { + type TransactionRo = ApiServerPostgresTransactionalRo<'tx>; - type TransactionRw = ApiServerPostgresTransactionalRw<'t>; + type TransactionRw = ApiServerPostgresTransactionalRw<'tx>; - async fn transaction_ro<'s: 't>( - &'s self, + async fn transaction_ro<'db: 'tx>( + &'db self, ) -> Result { self.begin_ro_transaction().await } - async fn transaction_rw<'s: 't>( - &'s mut self, + async fn transaction_rw<'db: 'tx>( + &'db mut self, ) -> Result { self.begin_rw_transaction().await } diff --git a/api-server/api-server-common/src/storage/impls/postgres/transactional/read.rs b/api-server/api-server-common/src/storage/impls/postgres/transactional/read.rs index 8776f98cf8..3da7d3e233 100644 --- a/api-server/api-server-common/src/storage/impls/postgres/transactional/read.rs +++ b/api-server/api-server-common/src/storage/impls/postgres/transactional/read.rs @@ -17,14 +17,14 @@ use crate::storage::{ impls::postgres::queries::QueryFromConnection, storage_api::ApiServerStorageRead, }; -use super::ApiServerPostgresTransactionalRo; +use super::{ApiServerPostgresTransactionalRo, CONN_ERR}; #[async_trait::async_trait] impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> { async fn is_initialized( &mut self, ) -> Result { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.is_initialized().await?; Ok(res) @@ -33,7 +33,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> { async fn get_storage_version( &mut self, ) -> Result, crate::storage::storage_api::ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.get_storage_version().await?; Ok(res) @@ -48,7 +48,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> { ), crate::storage::storage_api::ApiServerStorageError, > { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.get_best_block().await?; Ok(res) @@ -59,7 +59,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> { block_id: common::primitives::Id, ) -> Result, crate::storage::storage_api::ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.get_block(block_id).await?; Ok(res) @@ -72,7 +72,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> { Option, crate::storage::storage_api::ApiServerStorageError, > { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.get_block_aux_data(block_id).await?; Ok(res) diff --git a/api-server/api-server-common/src/storage/impls/postgres/transactional/write.rs b/api-server/api-server-common/src/storage/impls/postgres/transactional/write.rs index 0de6e8160f..560f2c10f7 100644 --- a/api-server/api-server-common/src/storage/impls/postgres/transactional/write.rs +++ b/api-server/api-server-common/src/storage/impls/postgres/transactional/write.rs @@ -26,7 +26,7 @@ use crate::storage::{ }, }; -use super::ApiServerPostgresTransactionalRw; +use super::{ApiServerPostgresTransactionalRw, CONN_ERR}; #[async_trait::async_trait] impl<'a> ApiServerStorageWrite for ApiServerPostgresTransactionalRw<'a> { @@ -34,7 +34,7 @@ impl<'a> ApiServerStorageWrite for ApiServerPostgresTransactionalRw<'a> { &mut self, chain_config: &ChainConfig, ) -> Result<(), ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); conn.initialize_database(chain_config).await?; Ok(()) @@ -45,7 +45,7 @@ impl<'a> ApiServerStorageWrite for ApiServerPostgresTransactionalRw<'a> { block_height: BlockHeight, block_id: Id, ) -> Result<(), ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); conn.set_best_block(block_height, block_id).await?; Ok(()) @@ -56,7 +56,7 @@ impl<'a> ApiServerStorageWrite for ApiServerPostgresTransactionalRw<'a> { block_id: Id, block: &Block, ) -> Result<(), ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); conn.set_block(block_id, block).await?; Ok(()) @@ -68,7 +68,7 @@ impl<'a> ApiServerStorageWrite for ApiServerPostgresTransactionalRw<'a> { owning_block: Option>, transaction: &SignedTransaction, ) -> Result<(), ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); conn.set_transaction(transaction_id, owning_block, transaction).await?; Ok(()) @@ -79,7 +79,7 @@ impl<'a> ApiServerStorageWrite for ApiServerPostgresTransactionalRw<'a> { block_id: Id, block_aux_data: &BlockAuxData, ) -> Result<(), ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); conn.set_block_aux_data(block_id, block_aux_data).await?; Ok(()) @@ -90,7 +90,7 @@ impl<'a> ApiServerStorageWrite for ApiServerPostgresTransactionalRw<'a> { block_height: BlockHeight, block_id: Id, ) -> Result<(), ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); conn.set_main_chain_block_id(block_height, block_id).await?; Ok(()) @@ -100,7 +100,7 @@ impl<'a> ApiServerStorageWrite for ApiServerPostgresTransactionalRw<'a> { &mut self, block_height: BlockHeight, ) -> Result<(), ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); conn.del_main_chain_block_id(block_height).await?; Ok(()) @@ -110,14 +110,14 @@ impl<'a> ApiServerStorageWrite for ApiServerPostgresTransactionalRw<'a> { #[async_trait::async_trait] impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRw<'a> { async fn is_initialized(&mut self) -> Result { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.is_initialized().await?; Ok(res) } async fn get_storage_version(&mut self) -> Result, ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.get_storage_version().await?; Ok(res) @@ -126,7 +126,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRw<'a> { async fn get_best_block( &mut self, ) -> Result<(BlockHeight, Id), ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.get_best_block().await?; Ok(res) @@ -136,7 +136,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRw<'a> { &mut self, block_id: Id, ) -> Result, ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.get_block(block_id).await?; Ok(res) @@ -146,7 +146,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRw<'a> { &mut self, block_id: Id, ) -> Result, ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.get_block_aux_data(block_id).await?; Ok(res) @@ -156,7 +156,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRw<'a> { &mut self, block_height: BlockHeight, ) -> Result>, ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.get_main_chain_block_id(block_height).await?; Ok(res) @@ -166,7 +166,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRw<'a> { &mut self, transaction_id: Id, ) -> Result>, SignedTransaction)>, ApiServerStorageError> { - let mut conn = QueryFromConnection::new(&self.connection); + let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR)); let res = conn.get_transaction(transaction_id).await?; Ok(res)