Skip to content

Commit

Permalink
Instument everything.. again
Browse files Browse the repository at this point in the history
  • Loading branch information
fistons committed Feb 14, 2024
1 parent 6f706c9 commit 6229efe
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 56 deletions.
58 changes: 55 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ tracing = { version = "0.1", features = ["log"] }
thiserror = "1"
anyhow = "1"
reqwest = { version = "0.11", features = ["tokio-rustls", "rustls-tls", "json"] }
reqwest-tracing = { version = "0.4", features = ["opentelemetry_0_21"] }
reqwest-middleware = "0.2"
feed-rs = "1"
sqlx = { version = "0.7", features = ["runtime-tokio", "uuid", "macros", "postgres", "chrono", "migrate"] }
fake = { version = "2.8", features = ["chrono"] }
Expand All @@ -43,7 +45,7 @@ opentelemetry = { version = "0.21"}
opentelemetry_sdk = {version="0.21", features = ["trace"]}
opentelemetry-jaeger = { version = "0.20", features = ["rt-tokio"] }
opentelemetry-datadog = { version = "0.9", features = ["reqwest", "reqwest-client"] }
scraper = "0.17"
scraper = "0.18"
once_cell = "1"
rand = "0.8.5"
tokio-cron-scheduler = "0.9.4"
Expand Down
28 changes: 18 additions & 10 deletions src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use redis::AsyncCommands;
use secrecy::Secret;
use serde::{Deserialize, Serialize};
use sha2::Sha256;
use tracing::{debug_span, instrument, Instrument};

use crate::common::model::{User, UserRole};
use crate::common::users::*;
Expand Down Expand Up @@ -105,7 +106,7 @@ async fn extract_authenticated_user(
};

let app_state = req.app_data::<Data<AppState>>().unwrap();
check_and_get_authed_user(
check_and_get_authenticated_user(
&user,
&password,
&app_state.db,
Expand Down Expand Up @@ -136,6 +137,7 @@ fn extract_value_authentication_header(headers: &HeaderMap) -> Result<&str, Auth
}

/// # Retrieve a user and check its credentials
#[instrument(skip(connection, password))]
async fn check_and_get_user(
connection: &DbPool,
username: &str,
Expand Down Expand Up @@ -163,20 +165,23 @@ async fn check_and_get_user(
}

/// # Retrieve a user and check its credentials
async fn check_and_get_authed_user(
/// Check in redis if the user has already pass authentication by looking for its header. If not, try to authenticate
/// it by matching password
/// If it's ok, store the header in redis.
/// This is to avoid decoding the password for each request, because it's a costing operation.
#[instrument(skip_all)]
async fn check_and_get_authenticated_user(
user: &str,
password: &Secret<String>,
connection: &DbPool,
redis_pool: &Pool,
redis_key: &str,
redis: &Pool,
key: &str,
) -> Result<AuthenticatedUser, AuthenticationError> {
// Fist, check that the user is not already in the cache
let mut redis = redis_pool
.get()
.await
.context("Couldn't get redis connection")?;
let mut redis = redis.get().await.context("Couldn't get redis connection")?;
let value: Option<String> = redis
.get(format!("user:{}:{}", user, redis_key))
.get(format!("user:{}:{}", user, key))
.instrument(debug_span!("getting_http_token_in_redis"))
.await
.context("Could not get value")?;

Expand All @@ -195,10 +200,11 @@ async fn check_and_get_authed_user(
let serialized_user = serde_json::to_string(&user).context("Could serialize user for redis")?;
redis
.set_ex::<_, _, ()>(
&format!("user:{}:Basic:{}", user.login, redis_key),
&format!("user:{}:Basic:{}", user.login, key),
serialized_user,
60 * 15,
)
.instrument(debug_span!("store_http_token_in_redis"))
.await
.context("Could not store user in redis")?;

Expand All @@ -214,6 +220,7 @@ fn extract_credentials_from_http_basic(
}

/// # Generate a JWT for the given user password
#[instrument(skip(connection, password))]
pub async fn get_jwt_from_login_request(
user: &str,
password: &Secret<String>,
Expand Down Expand Up @@ -241,6 +248,7 @@ pub fn extract_login_from_refresh_token(token: &str) -> &str {
token.split('.').collect::<Vec<&str>>()[1]
}

#[instrument(skip_all)]
async fn verify_jwt(token: &str) -> Result<AuthenticatedUser, AuthenticationError> {
let claims: Claims = token.verify_with_key(&(*JWT_KEY))?;

Expand Down
34 changes: 17 additions & 17 deletions src/common/channels.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chrono::{DateTime, Utc};
use sqlx::Result;
use tokio::task;
use tracing::{debug, error, info};
use tracing::{debug, error, info, instrument};

use crate::common::model::{Channel, ChannelError, PagedResult, UsersChannel};
use crate::common::rss::check_feed;
Expand All @@ -10,7 +10,7 @@ use crate::services::fetching;
use deadpool_redis::Pool as RedisPool;

/// Returns the whole list of errors associated to the given channel id.
#[tracing::instrument(skip(db))]
#[instrument(skip(db))]
pub async fn select_errors_by_chan_id(
db: &Pool,
channel_id: i32,
Expand Down Expand Up @@ -40,7 +40,7 @@ pub async fn select_errors_by_chan_id(
}

/// Returns an optional given channel with the given user's metadata.
#[tracing::instrument(skip(db))]
#[instrument(skip(db))]
pub async fn select_by_id_and_user_id(
db: &Pool,
channel_id: i32,
Expand Down Expand Up @@ -76,19 +76,19 @@ pub async fn select_by_id_and_user_id(
}

/// Mark the given channel as read for the given user
#[tracing::instrument(skip(db))]
#[instrument(skip(db))]
pub async fn mark_channel_as_read(db: &Pool, channel_id: i32, user_id: i32) -> Result<()> {
mark_channel(db, channel_id, user_id, true).await
}

/// Mark the given channel as unread for the given user
#[tracing::instrument(skip(db))]
#[instrument(skip(db))]
pub async fn mark_channel_as_unread(db: &Pool, channel_id: i32, user_id: i32) -> Result<()> {
mark_channel(db, channel_id, user_id, false).await
}

/// Select all the channels of a user, along side the total number of items
#[tracing::instrument(skip(db))]
#[instrument(skip(db))]
pub async fn select_page_by_user_id(
db: &Pool,
user_id: i32,
Expand Down Expand Up @@ -157,7 +157,7 @@ pub async fn select_page_by_user_id(
}

/// Create or linked an existing channel to a user, returning the channel id
#[tracing::instrument(skip(db, redis))]
#[instrument(skip(db, redis))]
pub async fn create_or_link_channel(
db: &Pool,
redis: &RedisPool,
Expand Down Expand Up @@ -214,7 +214,7 @@ pub async fn create_or_link_channel(
}

/// Enable a channel and reset it's failure count
#[tracing::instrument(skip(db))]
#[instrument(skip(db))]
pub async fn enable_channel(db: &Pool, channel_id: i32) -> Result<()> {
sqlx::query!(
r#"
Expand All @@ -229,6 +229,7 @@ pub async fn enable_channel(db: &Pool, channel_id: i32) -> Result<()> {
}

/// Disable a channel
#[instrument(skip(db))]
pub async fn disable_channel(db: &Pool, channel_id: i32) -> Result<()> {
sqlx::query!(
r#"
Expand All @@ -243,7 +244,7 @@ pub async fn disable_channel(db: &Pool, channel_id: i32) -> Result<()> {
}

/// Disable channels whom failure count is higher than the given threshold
#[tracing::instrument(skip(db))]
#[instrument(skip(db))]
pub async fn disable_channels(db: &Pool, threshold: u32) -> Result<()> {
let disabled_channels = sqlx::query!(
r#"
Expand All @@ -260,7 +261,7 @@ pub async fn disable_channels(db: &Pool, threshold: u32) -> Result<()> {
}

/// Return the list of user IDs of of a given channel
#[tracing::instrument(skip(db))]
#[instrument(skip(db))]
pub async fn get_user_ids_of_channel(db: &Pool, channel_id: i32) -> Result<Vec<i32>> {
sqlx::query_scalar!(
r#"
Expand All @@ -273,7 +274,7 @@ pub async fn get_user_ids_of_channel(db: &Pool, channel_id: i32) -> Result<Vec<i
}

/// Return the list of all enabled channels
#[tracing::instrument(skip(db))]
#[instrument(skip(db))]
pub async fn get_all_enabled_channels(db: &Pool) -> Result<Vec<Channel>> {
sqlx::query_as!(
Channel,
Expand All @@ -286,7 +287,7 @@ pub async fn get_all_enabled_channels(db: &Pool) -> Result<Vec<Channel>> {
}

/// Update the last fetched timestamp of a channel
#[tracing::instrument(skip(db))]
#[instrument(skip(db))]
pub async fn update_last_fetched(db: &Pool, channel_id: i32, date: &DateTime<Utc>) -> Result<()> {
sqlx::query!(
r#"
Expand All @@ -302,7 +303,7 @@ pub async fn update_last_fetched(db: &Pool, channel_id: i32, date: &DateTime<Utc
}

/// Retrieve the last update of channel
#[tracing::instrument(skip(db))]
#[instrument(skip(db))]
pub async fn get_last_update(db: &Pool, channel_id: &i32) -> Result<Option<DateTime<Utc>>> {
let last_update = sqlx::query!(
r#"
Expand All @@ -318,7 +319,7 @@ pub async fn get_last_update(db: &Pool, channel_id: &i32) -> Result<Option<DateT

/// Update the failure count of the given channel and insert the error in the dedicated table
/// TODO: Transaction
#[tracing::instrument(skip(db))]
#[instrument(skip(db))]
pub async fn fail_channel(db: &Pool, channel_id: i32, error_cause: &str) -> Result<()> {
let mut transaction = db.begin().await?;
sqlx::query!(
Expand Down Expand Up @@ -346,7 +347,7 @@ pub async fn fail_channel(db: &Pool, channel_id: i32, error_cause: &str) -> Resu
}

/// # Create a new channel in the database, returning the created channel id
#[tracing::instrument(skip(db, redis))]
#[instrument(skip(db, redis))]
async fn create_new_channel(
db: &Pool,
redis: &RedisPool,
Expand Down Expand Up @@ -385,7 +386,6 @@ async fn create_new_channel(
Ok((channel_id, channel_name))
}

#[tracing::instrument(skip(db))]
async fn mark_channel(db: &Pool, channel_id: i32, user_id: i32, read: bool) -> Result<()> {
sqlx::query!(
r#"
Expand All @@ -402,7 +402,7 @@ async fn mark_channel(db: &Pool, channel_id: i32, user_id: i32, read: bool) -> R
}

/// Unsubscribe a user from a channel
#[tracing::instrument(skip(db))]
#[instrument(skip(db))]
pub async fn unsubscribe_channel(db: &Pool, channel_id: i32, user_id: i32) -> Result<()> {
let result = sqlx::query!(
r#"
Expand Down
3 changes: 2 additions & 1 deletion src/common/email.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use once_cell::sync::Lazy;
use serde::Serialize;
use serde_json::Value;
use std::{env, error::Error, ops::Deref};
use tracing::{debug, error, warn};
use tracing::{debug, error, instrument, warn};

static HANDLEBARS: Lazy<Handlebars> = Lazy::new(|| {
let mut handlebars = Handlebars::new();
Expand All @@ -25,6 +25,7 @@ static EMAIL_PROPERTIES: Lazy<anyhow::Result<EmailApiProperties>> =
Err(err) => Err(anyhow!("Could not load Email properties: {:?}", err)),
});

#[instrument(skip(email_content))]
pub async fn send_email<T>(template_name: &str, email_content: &T) -> anyhow::Result<()>
where
T: Serialize,
Expand Down
Loading

0 comments on commit 6229efe

Please sign in to comment.