Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that transactions are wrapped asynchronously in the parent database object instead of blocking on transactions' drop function. #1223

Merged
merged 5 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions api-server/api-server-common/src/storage/impls/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod queries;
use std::str::FromStr;

use bb8_postgres::bb8::Pool;
use bb8_postgres::bb8::PooledConnection;
use bb8_postgres::PostgresConnectionManager;
use tokio_postgres::NoTls;

Expand All @@ -32,6 +33,19 @@ use self::transactional::ApiServerPostgresTransactionalRw;

pub struct TransactionalApiServerPostgresStorage {
pool: Pool<PostgresConnectionManager<NoTls>>,
/// This task is responsible for rolling back failed RW/RO transactions, since closing connections are pooled
tx_dropper_joiner: tokio::task::JoinHandle<()>,
/// This channel is used to send transactions that are not manually rolled back to the tx_dropper task to roll them back
db_tx_conn_sender: tokio::sync::mpsc::UnboundedSender<
PooledConnection<'static, PostgresConnectionManager<NoTls>>,
>,
}

impl Drop for TransactionalApiServerPostgresStorage {
fn drop(&mut self) {
// Since the whole connection pool will be destroyed, we can safely abort all connections
self.tx_dropper_joiner.abort();
Comment on lines +44 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is abort really the clean thing to do here? If there is some connection waiting to be rolled back in the channel and the dropper task is aborted, the transaction is not going to be finalised. Or did I miss something?

Copy link
Contributor Author

@TheQuantumPhysicist TheQuantumPhysicist Sep 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly. The lower level connection will be dropped since the connection pool will be destroyed, and hence the session will be dropped, and hence the transaction will be aborted anyway. At least this is my understanding of how postgres and mysql work from my research into this problem, but I'm open to being corrected here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, if it's so that sounds acceptable, albeit still not the cleanest. If I understand it correctly, some transactions may not be explicitly rolled back by issuing the ROLLBACK query, relying on the connection cleanup procedure to effectively abort it. Is that about right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I'm not sure whether there's a misunderstanding, so let me elaborate.

This is exclusively at the end of life of the database, which is probably obvious.

So, IF we're shutting down the system (hence the database object, not a transaction object) is about to be dropped, and there are connections that were transactions and are still not rolled back (even though their transaction objects are destroyed, by definition, because a transaction cannot outlive the database given the lifetime constraint with that phantom data), then aborting will, yes, cause the connection cleanup to effectively rollback the transaction, because the sessions of the connection pool will be ended, which cancels all low-level postgres transactions that were open.

Having said that, again, this depends on my understanding of how database connections/sessions work and based my research on this problem. I'm happy to be challenged on that.

Now the alternative is that we have to block when shutting down the database. I'm happy to do that if it turned out to be necessary... in fact it's like a line of code in tokio, but I consider it less clean because of my eternal fear of deadlocks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the alternative is that we have to block when shutting down the database. I'm happy to do that if it turned out to be necessary... in fact it's like a line of code in tokio, but I consider it less clean because of my eternal fear of deadlocks.

That could be mitigated by using spawn_blocking for the cleanup task but then you won't be able to use the single-threaded tokio runtime. It's already the case in mempool because of the blocking subsystem handle business but it's not great and I'd like to avoid that if possible.

}
}

impl TransactionalApiServerPostgresStorage {
Expand All @@ -57,7 +71,26 @@ impl TransactionalApiServerPostgresStorage {
))
})?;

let result = Self { pool };
let (conn_tx, conn_rx) = tokio::sync::mpsc::unbounded_channel::<
PooledConnection<'static, PostgresConnectionManager<NoTls>>,
>();

let tx_dropper_joiner = tokio::task::spawn(async move {
let mut conn_rx = conn_rx;
while let Some(conn) = conn_rx.recv().await {
conn.batch_execute("ROLLBACK").await.unwrap_or_else(|e| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the documentation, maybe batch() rather than batch_execute() is better when only doing a single query?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batch_execute() is what was used in postgres_async::Transaction. It seems the right thing to do. Not sure.

logging::log::error!(
"CRITICAL ERROR: failed to rollback failed postgres RW transaction: {e}"
)
});
}
});

let result = Self {
pool,
tx_dropper_joiner,
db_tx_conn_sender: conn_tx,
};

result.initialize_if_not(chain_config).await?;

Expand All @@ -80,20 +113,22 @@ impl TransactionalApiServerPostgresStorage {
) -> Result<ApiServerPostgresTransactionalRo, ApiServerStorageError> {
let conn = self
.pool
.get()
.get_owned()
.await
.map_err(|e| ApiServerStorageError::AcquiringConnectionFailed(e.to_string()))?;
ApiServerPostgresTransactionalRo::from_connection(conn).await
ApiServerPostgresTransactionalRo::from_connection(conn, self.db_tx_conn_sender.clone())
.await
}

pub async fn begin_rw_transaction(
&self,
) -> Result<ApiServerPostgresTransactionalRw, ApiServerStorageError> {
let conn = self
.pool
.get()
.get_owned()
.await
.map_err(|e| ApiServerStorageError::AcquiringConnectionFailed(e.to_string()))?;
ApiServerPostgresTransactionalRw::from_connection(conn).await
ApiServerPostgresTransactionalRw::from_connection(conn, self.db_tx_conn_sender.clone())
.await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,53 @@ use crate::storage::storage_api::{

use super::{queries::QueryFromConnection, TransactionalApiServerPostgresStorage};

const CONN_ERR: &str = "CRITICAL ERROR: failed to get postgres tx connection. Invariant broken.";

pub struct ApiServerPostgresTransactionalRo<'a> {
connection: PooledConnection<'a, PostgresConnectionManager<NoTls>>,
// Note: This is an Option due to needing to pry the connection out of Self in Drop
connection: Option<PooledConnection<'static, PostgresConnectionManager<NoTls>>>,
finished: bool,
db_tx_sender: tokio::sync::mpsc::UnboundedSender<
PooledConnection<'static, PostgresConnectionManager<NoTls>>,
>,
// Note: This exists to enforce that a transaction never outlives the database object,
// given that all connections have 'static lifetimes
_marker: std::marker::PhantomData<&'a ()>,
}

impl<'a> ApiServerPostgresTransactionalRo<'a> {
pub(super) async fn from_connection(
connection: PooledConnection<'a, PostgresConnectionManager<NoTls>>,
) -> Result<ApiServerPostgresTransactionalRo, ApiServerStorageError> {
connection: PooledConnection<'static, PostgresConnectionManager<NoTls>>,
db_tx_sender: tokio::sync::mpsc::UnboundedSender<
PooledConnection<'static, PostgresConnectionManager<NoTls>>,
>,
) -> Result<ApiServerPostgresTransactionalRo<'a>, ApiServerStorageError> {
let tx = Self {
connection,
connection: Some(connection),
finished: false,
db_tx_sender,
_marker: std::marker::PhantomData,
};
tx.connection.batch_execute("BEGIN READ ONLY").await.map_err(|e| {
ApiServerStorageError::RoTxBeginFailed(format!("Transaction begin failed: {}", e))
})?;
tx.connection
.as_ref()
.expect(CONN_ERR)
.batch_execute("BEGIN READ ONLY")
.await
.map_err(|e| {
ApiServerStorageError::RoTxBeginFailed(format!("Transaction begin failed: {}", e))
})?;
Ok(tx)
}

pub async fn is_initialized(&mut self) -> Result<bool, ApiServerStorageError> {
let mut conn = QueryFromConnection::new(&self.connection);
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.is_initialized().await?;

Ok(res)
}

pub async fn get_storage_version(&mut self) -> Result<Option<u32>, ApiServerStorageError> {
let mut conn = QueryFromConnection::new(&self.connection);
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.get_storage_version().await?;

Ok(res)
Expand All @@ -66,7 +85,7 @@ impl<'a> ApiServerPostgresTransactionalRo<'a> {
pub async fn get_best_block(
&mut self,
) -> Result<(BlockHeight, Id<GenBlock>), ApiServerStorageError> {
let mut conn = QueryFromConnection::new(&self.connection);
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.get_best_block().await?;

Ok(res)
Expand All @@ -76,7 +95,7 @@ impl<'a> ApiServerPostgresTransactionalRo<'a> {
&mut self,
block_height: BlockHeight,
) -> Result<Option<Id<Block>>, ApiServerStorageError> {
let mut conn = QueryFromConnection::new(&self.connection);
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.get_main_chain_block_id(block_height).await?;

Ok(res)
Expand All @@ -86,7 +105,7 @@ impl<'a> ApiServerPostgresTransactionalRo<'a> {
&mut self,
block_id: Id<Block>,
) -> Result<Option<Block>, ApiServerStorageError> {
let mut conn = QueryFromConnection::new(&self.connection);
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.get_block(block_id).await?;

Ok(res)
Expand All @@ -97,7 +116,7 @@ impl<'a> ApiServerPostgresTransactionalRo<'a> {
&mut self,
transaction_id: Id<Transaction>,
) -> Result<Option<(Option<Id<Block>>, SignedTransaction)>, ApiServerStorageError> {
let mut conn = QueryFromConnection::new(&self.connection);
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.get_transaction(transaction_id).await?;

Ok(res)
Expand All @@ -107,43 +126,60 @@ impl<'a> ApiServerPostgresTransactionalRo<'a> {
&mut self,
block_id: Id<Block>,
) -> Result<Option<BlockAuxData>, ApiServerStorageError> {
let mut conn = QueryFromConnection::new(&self.connection);
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.get_block_aux_data(block_id).await?;

Ok(res)
}
}

pub struct ApiServerPostgresTransactionalRw<'a> {
connection: PooledConnection<'a, PostgresConnectionManager<NoTls>>,
// Note: This is an Option due to needing to pry the connection out of Self in Drop
connection: Option<PooledConnection<'static, PostgresConnectionManager<NoTls>>>,
finished: bool,
db_tx_sender: tokio::sync::mpsc::UnboundedSender<
PooledConnection<'static, PostgresConnectionManager<NoTls>>,
>,
// Note: This exists to enforce that a transaction never outlives the database object,
// given that all connections have 'static lifetimes
_marker: std::marker::PhantomData<&'a ()>,
}

impl<'a> Drop for ApiServerPostgresTransactionalRw<'a> {
fn drop(&mut self) {
if !self.finished {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since getting a connection has been change from get() to get_owned(), should we always send() the connection back to the poll regardless of finished?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to send the connection back if finished, because it's already committed/rolled back. It should be destroyed so that the database object retrieves the connection again.

futures::executor::block_on(self.connection.batch_execute("ROLLBACK")).unwrap_or_else(
|e| {
self.db_tx_sender
.send(self.connection.take().expect(CONN_ERR))
.unwrap_or_else(|e| {
logging::log::error!(
"CRITICAL ERROR: failed to rollback failed postgres RW transaction: {e}"
"CRITICAL ERROR: failed to send postgres RW transaction connection for closure: {e}"
)
},
);
});
}
}
}

impl<'a> ApiServerPostgresTransactionalRw<'a> {
pub(super) async fn from_connection(
connection: PooledConnection<'a, PostgresConnectionManager<NoTls>>,
connection: PooledConnection<'static, PostgresConnectionManager<NoTls>>,
db_tx_sender: tokio::sync::mpsc::UnboundedSender<
PooledConnection<'static, PostgresConnectionManager<NoTls>>,
>,
) -> Result<ApiServerPostgresTransactionalRw<'a>, ApiServerStorageError> {
let tx = Self {
connection,
connection: Some(connection),
finished: false,
db_tx_sender,
_marker: std::marker::PhantomData,
};
tx.connection.batch_execute("BEGIN READ WRITE").await.map_err(|e| {
ApiServerStorageError::RwTxBeginFailed(format!("Transaction begin failed: {}", e))
})?;
tx.connection
.as_ref()
.expect(CONN_ERR)
.batch_execute("BEGIN READ WRITE")
.await
.map_err(|e| {
ApiServerStorageError::RwTxBeginFailed(format!("Transaction begin failed: {}", e))
})?;
Ok(tx)
}
}
Expand All @@ -152,6 +188,8 @@ impl<'a> ApiServerPostgresTransactionalRw<'a> {
impl<'a> ApiServerTransactionRw for ApiServerPostgresTransactionalRw<'a> {
async fn commit(mut self) -> Result<(), crate::storage::storage_api::ApiServerStorageError> {
self.connection
.as_ref()
.expect(CONN_ERR)
.batch_execute("COMMIT")
.await
.map_err(|e| ApiServerStorageError::TxCommitFailed(e.to_string()))?;
Expand All @@ -161,6 +199,8 @@ impl<'a> ApiServerTransactionRw for ApiServerPostgresTransactionalRw<'a> {

async fn rollback(mut self) -> Result<(), crate::storage::storage_api::ApiServerStorageError> {
self.connection
.as_ref()
.expect(CONN_ERR)
.batch_execute("ROLLBACK")
.await
.map_err(|e| ApiServerStorageError::TxCommitFailed(e.to_string()))?;
Expand All @@ -179,31 +219,31 @@ impl<'a> ApiServerTransactionRo for ApiServerPostgresTransactionalRo<'a> {
impl<'a> Drop for ApiServerPostgresTransactionalRo<'a> {
fn drop(&mut self) {
if !self.finished {
futures::executor::block_on(self.connection.batch_execute("ROLLBACK")).unwrap_or_else(
|e| {
self.db_tx_sender
.send(self.connection.take().expect(CONN_ERR))
.unwrap_or_else(|e| {
logging::log::error!(
"CRITICAL ERROR: failed to rollback failed postgres RO transaction: {e}"
"CRITICAL ERROR: failed to send postgres RO transaction connection for closure: {e}"
)
},
);
});
}
}
}

#[async_trait::async_trait]
impl<'t> Transactional<'t> for TransactionalApiServerPostgresStorage {
type TransactionRo = ApiServerPostgresTransactionalRo<'t>;
impl<'tx> Transactional<'tx> for TransactionalApiServerPostgresStorage {
type TransactionRo = ApiServerPostgresTransactionalRo<'tx>;

type TransactionRw = ApiServerPostgresTransactionalRw<'t>;
type TransactionRw = ApiServerPostgresTransactionalRw<'tx>;

async fn transaction_ro<'s: 't>(
&'s self,
async fn transaction_ro<'db: 'tx>(
&'db self,
) -> Result<Self::TransactionRo, ApiServerStorageError> {
self.begin_ro_transaction().await
}

async fn transaction_rw<'s: 't>(
&'s mut self,
async fn transaction_rw<'db: 'tx>(
&'db mut self,
) -> Result<Self::TransactionRw, ApiServerStorageError> {
self.begin_rw_transaction().await
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ use crate::storage::{
impls::postgres::queries::QueryFromConnection, storage_api::ApiServerStorageRead,
};

use super::ApiServerPostgresTransactionalRo;
use super::{ApiServerPostgresTransactionalRo, CONN_ERR};

#[async_trait::async_trait]
impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
async fn is_initialized(
&mut self,
) -> Result<bool, crate::storage::storage_api::ApiServerStorageError> {
let mut conn = QueryFromConnection::new(&self.connection);
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.is_initialized().await?;

Ok(res)
Expand All @@ -33,7 +33,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
async fn get_storage_version(
&mut self,
) -> Result<Option<u32>, crate::storage::storage_api::ApiServerStorageError> {
let mut conn = QueryFromConnection::new(&self.connection);
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.get_storage_version().await?;

Ok(res)
Expand All @@ -48,7 +48,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
),
crate::storage::storage_api::ApiServerStorageError,
> {
let mut conn = QueryFromConnection::new(&self.connection);
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.get_best_block().await?;

Ok(res)
Expand All @@ -59,7 +59,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
block_id: common::primitives::Id<common::chain::Block>,
) -> Result<Option<common::chain::Block>, crate::storage::storage_api::ApiServerStorageError>
{
let mut conn = QueryFromConnection::new(&self.connection);
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.get_block(block_id).await?;

Ok(res)
Expand All @@ -72,7 +72,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
Option<crate::storage::storage_api::block_aux_data::BlockAuxData>,
crate::storage::storage_api::ApiServerStorageError,
> {
let mut conn = QueryFromConnection::new(&self.connection);
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.get_block_aux_data(block_id).await?;

Ok(res)
Expand Down
Loading