Skip to content

Commit

Permalink
feat: support with_transaction in postgres client
Browse files Browse the repository at this point in the history
  • Loading branch information
joshstevens19 committed Oct 7, 2024
1 parent eb1750d commit c6a3066
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 12 deletions.
53 changes: 42 additions & 11 deletions core/src/database/postgres/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{env, time::Duration};
use std::future::Future;

use bb8::{Pool, RunError};
use bb8_postgres::PostgresConnectionManager;
Expand All @@ -11,16 +12,16 @@ use tokio::{task, time::timeout};
use tokio_postgres::{
binary_copy::BinaryCopyInWriter,
config::SslMode,
types::{ToSql, Type as PgType},
Config, CopyInSink, Error as PgError, Row, Statement, ToStatement,
Transaction as PgTransaction,
};
pub use tokio_postgres::types::{ToSql, Type as PgType};
use tracing::{debug, error};

use crate::database::postgres::{
generate::generate_event_table_columns_names_sql, sql_type_wrapper::EthereumSqlTypeWrapper,
};


pub fn connection_string() -> Result<String, env::VarError> {
dotenv().ok();
let connection = env::var("DATABASE_URL")?;
Expand Down Expand Up @@ -57,8 +58,25 @@ pub enum PostgresError {
ConnectionPoolError(#[from] RunError<tokio_postgres::Error>),
}

pub struct PostgresTransaction {
pub transaction: PgTransaction<'static>,
pub struct PostgresTransaction<'a> {
pub transaction: PgTransaction<'a>,
}
impl<'a> PostgresTransaction<'a> {
pub async fn execute(
&mut self,
query: &str,
params: &[&(dyn ToSql + Sync)],
) -> Result<u64, PostgresError> {
self.transaction.execute(query, params).await.map_err(PostgresError::PgError)
}

pub async fn commit(self) -> Result<(), PostgresError> {
self.transaction.commit().await.map_err(PostgresError::PgError)
}

pub async fn rollback(self) -> Result<(), PostgresError> {
self.transaction.rollback().await.map_err(PostgresError::PgError)
}
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -167,14 +185,27 @@ impl PostgresClient {
conn.prepare_typed(query, parameter_types).await.map_err(PostgresError::PgError)
}

pub async fn transaction(&self) -> Result<PostgresTransaction, PostgresError> {
let mut conn = self.pool.get().await?;
pub async fn with_transaction<F, Fut, T, Q>(
&self,
query: &Q,
params: &[&(dyn ToSql + Sync)],
f: F,
) -> Result<T, PostgresError>
where
F: FnOnce(u64) -> Fut + Send,
Fut: Future<Output = Result<T, PostgresError>> + Send,
Q: ?Sized + ToStatement,
{
let mut conn = self.pool.get().await.map_err(PostgresError::ConnectionPoolError)?;
let transaction = conn.transaction().await.map_err(PostgresError::PgError)?;

// Wrap the transaction in a static lifetime
let boxed_transaction: Box<PgTransaction<'static>> =
unsafe { std::mem::transmute(Box::new(transaction)) };
Ok(PostgresTransaction { transaction: *boxed_transaction })
let count = transaction.execute(query, params).await.map_err(PostgresError::PgError)?;

let result = f(count).await?;

transaction.commit().await.map_err(PostgresError::PgError)?;

Ok(result)
}

pub async fn query<T>(
Expand Down Expand Up @@ -343,4 +374,4 @@ impl PostgresClient {
.map_err(|e| e.to_string())
}
}
}
}
4 changes: 3 additions & 1 deletion documentation/docs/pages/docs/changelog.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
### Features
-------------------------------------------------
- feat: expose an insert_bulk new postgres function to make inserting bulk data easier
- feat: expose new ethereum sql type wrappers for bytes types
- feat: expose new ethereum sql type wrappers for bytes types#
- feat: expose postgres ToSql trait
- feat: support with_transaction in postgres client

### Bug fixes
-------------------------------------------------
Expand Down

0 comments on commit c6a3066

Please sign in to comment.