Skip to content

Commit

Permalink
better log messages
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Whitehead <cywolf@gmail.com>
  • Loading branch information
andrewwhitehead committed Sep 11, 2023
1 parent 93e28be commit 653fb3b
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 30 deletions.
21 changes: 16 additions & 5 deletions askar-storage/src/backend/db_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,15 @@ impl<DB: ExtDatabase> DbSession<DB> {
{
if let DbSessionState::Pending { pool, transaction } = &self.state {
debug!("Acquire pool connection");
let mut conn = pool.acquire().await?;
let mut conn = pool
.acquire()
.await
.map_err(err_map!(Backend, "Error acquiring pool connection"))?;
if *transaction {
debug!("Start transaction");
DB::start_transaction(&mut conn, false).await?;
DB::start_transaction(&mut conn, false)
.await
.map_err(err_map!(Backend, "Error starting transaction"))?;
self.txn_depth += 1;
}
self.state = DbSessionState::Active { conn };
Expand Down Expand Up @@ -274,7 +279,9 @@ impl<'q, DB: ExtDatabase> DbSessionActive<'q, DB> {
'q: 't,
{
debug!("Start nested transaction");
DB::start_transaction(self.connection_mut(), true).await?;
DB::start_transaction(self.connection_mut(), true)
.await
.map_err(err_map!(Backend, "Error starting nested transaction"))?;
self.inner.txn_depth += 1;
Ok(DbSessionTxn {
inner: &mut *self.inner,
Expand All @@ -289,7 +296,9 @@ impl<'q, DB: ExtDatabase> DbSessionActive<'q, DB> {
{
if self.inner.txn_depth == 0 {
debug!("Start transaction");
DB::start_transaction(self.connection_mut(), false).await?;
DB::start_transaction(self.connection_mut(), false)
.await
.map_err(err_map!(Backend, "Error starting transaction"))?;
self.inner.txn_depth += 1;
Ok(DbSessionTxn {
inner: &mut *self.inner,
Expand Down Expand Up @@ -323,7 +332,9 @@ impl<'a, DB: ExtDatabase> DbSessionTxn<'a, DB> {
self.inner.txn_depth -= 1;
let conn = self.connection_mut();
debug!("Commit transaction");
DB::TransactionManager::commit(conn).await?;
DB::TransactionManager::commit(conn)
.await
.map_err(err_map!(Backend, "Error committing transaction"))?;
}
Ok(())
}
Expand Down
35 changes: 23 additions & 12 deletions askar-storage/src/backend/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ impl Backend for PostgresBackend {
let profile: Option<String> = sqlx::query_scalar(CONFIG_FETCH_QUERY)
.bind("default_profile")
.fetch_one(conn.as_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error fetching default profile name"))?;
Ok(profile.unwrap_or_default())
})
}
Expand All @@ -168,7 +169,8 @@ impl Backend for PostgresBackend {
.bind("default_profile")
.bind(profile)
.execute(conn.as_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error setting default profile name"))?;
Ok(())
})
}
Expand All @@ -178,7 +180,8 @@ impl Backend for PostgresBackend {
let mut conn = self.conn_pool.acquire().await?;
let rows = sqlx::query("SELECT name FROM profiles")
.fetch_all(conn.as_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error fetching profile list"))?;
let names = rows.into_iter().flat_map(|r| r.try_get(0)).collect();
Ok(names)
})
Expand All @@ -190,7 +193,8 @@ impl Backend for PostgresBackend {
Ok(sqlx::query("DELETE FROM profiles WHERE name=$1")
.bind(&name)
.execute(conn.as_mut())
.await?
.await
.map_err(err_map!(Backend, "Error removing profile"))?
.rows_affected()
!= 0)
})
Expand Down Expand Up @@ -340,7 +344,8 @@ impl BackendSession for DbSession<Postgres> {
let mut active = acquire_session(&mut *self).await?;
let count = sqlx::query_scalar_with(query.as_str(), params)
.fetch_one(active.connection_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error performing count query"))?;
Ok(count)
})
}
Expand Down Expand Up @@ -380,7 +385,8 @@ impl BackendSession for DbSession<Postgres> {
.bind(enc_category)
.bind(enc_name)
.fetch_optional(active.connection_mut())
.await?
.await
.map_err(err_map!(Backend, "Error performing fetch query"))?
{
let value = row.try_get(1)?;
let tags = row.try_get::<Option<String>, _>(2)?.map(String::into_bytes);
Expand Down Expand Up @@ -550,7 +556,9 @@ impl BackendSession for DbSession<Postgres> {
let mut sess = acquire_session(&mut *self).await?;
if sess.in_transaction() {
// the profile row is locked, perform a typical ping
sqlx::Connection::ping(sess.connection_mut()).await?;
sqlx::Connection::ping(sess.connection_mut())
.await
.map_err(err_map!(Backend, "Error pinging session"))?;
} else {
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM profiles WHERE id=$1")
.bind(sess.profile_id)
Expand Down Expand Up @@ -668,7 +676,7 @@ async fn perform_insert(
.bind(expiry_ms.map(expiry_timestamp).transpose()?)
.fetch_optional(active.connection_mut())
.await?
.ok_or_else(|| err_msg!(Duplicate, "Duplicate row"))?
.ok_or_else(|| err_msg!(Duplicate, "Duplicate entry"))?
} else {
trace!("Update entry");
let row_id: i64 = sqlx::query_scalar(UPDATE_QUERY)
Expand All @@ -680,11 +688,12 @@ async fn perform_insert(
.bind(expiry_ms.map(expiry_timestamp).transpose()?)
.fetch_one(active.connection_mut())
.await
.map_err(|_| err_msg!(NotFound, "Error updating existing row"))?;
.map_err(|_| err_msg!(NotFound, "Error updating existing entry"))?;
sqlx::query(TAG_DELETE_QUERY)
.bind(row_id)
.execute(active.connection_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error removing existing entry tags"))?;
row_id
};
if let Some(tags) = enc_tags {
Expand All @@ -695,7 +704,8 @@ async fn perform_insert(
.bind(&tag.value)
.bind(tag.plaintext as i16)
.execute(active.connection_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error inserting entry tags"))?;
}
}
Ok(())
Expand All @@ -715,7 +725,8 @@ async fn perform_remove<'q>(
.bind(enc_category)
.bind(enc_name)
.execute(active.connection_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error removing entry"))?;
if done.rows_affected() == 0 && !ignore_error {
Err(err_msg!(NotFound, "Entry not found"))
} else {
Expand Down
37 changes: 24 additions & 13 deletions askar-storage/src/backend/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ impl Backend for SqliteBackend {
let profile: Option<String> = sqlx::query_scalar(CONFIG_FETCH_QUERY)
.bind("default_profile")
.fetch_one(conn.as_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error fetching default profile name"))?;
Ok(profile.unwrap_or_default())
})
}
Expand All @@ -162,7 +163,8 @@ impl Backend for SqliteBackend {
.bind("default_profile")
.bind(profile)
.execute(conn.as_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error setting default profile name"))?;
Ok(())
})
}
Expand All @@ -172,7 +174,8 @@ impl Backend for SqliteBackend {
let mut conn = self.conn_pool.acquire().await?;
let rows = sqlx::query("SELECT name FROM profiles")
.fetch_all(conn.as_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error fetching profile list"))?;
let names = rows.into_iter().flat_map(|r| r.try_get(0)).collect();
Ok(names)
})
Expand All @@ -184,7 +187,8 @@ impl Backend for SqliteBackend {
Ok(sqlx::query("DELETE FROM profiles WHERE name=?")
.bind(&name)
.execute(conn.as_mut())
.await?
.await
.map_err(err_map!(Backend, "Error removing profile"))?
.rows_affected()
!= 0)
})
Expand Down Expand Up @@ -323,7 +327,8 @@ impl BackendSession for DbSession<Sqlite> {
let mut active = acquire_session(&mut *self).await?;
let count = sqlx::query_scalar_with(query.as_str(), params)
.fetch_one(active.connection_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error performing count query"))?;
Ok(count)
})
}
Expand Down Expand Up @@ -359,7 +364,8 @@ impl BackendSession for DbSession<Sqlite> {
.bind(enc_category)
.bind(enc_name)
.fetch_optional(active.connection_mut())
.await?
.await
.map_err(err_map!(Backend, "Error performing fetch query"))?
{
let value = row.try_get(1)?;
let tags = row.try_get(2)?;
Expand Down Expand Up @@ -583,7 +589,8 @@ async fn resolve_profile_key(
} else if let Some(row) = sqlx::query("SELECT id, profile_key FROM profiles WHERE name=?1")
.bind(profile.as_str())
.fetch_optional(conn.as_mut())
.await?
.await
.map_err(err_map!(Backend, "Error fetching profile key"))?
{
let pid = row.try_get(0)?;
let key = Arc::new(cache.load_key(row.try_get(1)?).await?);
Expand Down Expand Up @@ -615,9 +622,10 @@ async fn perform_insert(
.bind(enc_value)
.bind(expiry_ms.map(expiry_timestamp).transpose()?)
.execute(active.connection_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error inserting new entry"))?;
if done.rows_affected() == 0 {
return Err(err_msg!(Duplicate, "Duplicate row"));
return Err(err_msg!(Duplicate, "Duplicate entry"));
}
done.last_insert_rowid()
} else {
Expand All @@ -631,11 +639,12 @@ async fn perform_insert(
.bind(expiry_ms.map(expiry_timestamp).transpose()?)
.fetch_one(active.connection_mut())
.await
.map_err(|_| err_msg!(NotFound, "Error updating existing row"))?;
.map_err(|_| err_msg!(NotFound, "Error updating existing entry"))?;
sqlx::query(TAG_DELETE_QUERY)
.bind(row_id)
.execute(active.connection_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error removing existing entry tags"))?;
row_id
};
if let Some(tags) = enc_tags {
Expand All @@ -646,7 +655,8 @@ async fn perform_insert(
.bind(&tag.value)
.bind(tag.plaintext as i16)
.execute(active.connection_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error inserting entry tags"))?;
}
}
Ok(())
Expand All @@ -666,7 +676,8 @@ async fn perform_remove<'q>(
.bind(enc_category)
.bind(enc_name)
.execute(active.connection_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error removing entry"))?;
if done.rows_affected() == 0 && !ignore_error {
Err(err_msg!(NotFound, "Entry not found"))
} else {
Expand Down

0 comments on commit 653fb3b

Please sign in to comment.