Skip to content

Commit

Permalink
send backend metrics to their own postgres channel
Browse files Browse the repository at this point in the history
  • Loading branch information
rolyatmax committed Nov 15, 2024
1 parent 72e20d6 commit b6ec34a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 39 deletions.
4 changes: 2 additions & 2 deletions plane/src/database/backend.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{
subscribe::{emit_ephemeral_with_key, emit_with_key},
subscribe::{emit_backend_metrics, emit_with_key},
PlaneDatabase,
};
use crate::{
Expand Down Expand Up @@ -455,7 +455,7 @@ impl<'a> BackendDatabase<'a> {

pub async fn publish_metrics(&self, metrics: BackendMetricsMessage) -> sqlx::Result<()> {
let mut txn = self.db.pool.begin().await?;
emit_ephemeral_with_key(&mut txn, &metrics.backend_id.to_string(), &metrics).await?;
emit_backend_metrics(&mut txn, &metrics.backend_id.to_string(), &metrics).await?;
txn.commit().await?;
Ok(())
}
Expand Down
59 changes: 22 additions & 37 deletions plane/src/database/subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::database::util::MapSqlxError;
use crate::database::{backend::BackendMetricsMessage, util::MapSqlxError};
use crate::util::ExponentialBackoff;
use chrono::{DateTime, Utc};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
Expand All @@ -17,7 +17,8 @@ use tokio::{

type ListenerMap = Arc<RwLock<HashMap<(String, Option<String>), Box<dyn TypedSender>>>>;

const EVENT_CHANNEL: &str = "plane_events";
pub const EVENT_CHANNEL: &str = "plane_events";
pub const BACKEND_METRICS_EVENT_CHANNEL: &str = "plane_backend_metrics";

pub trait NotificationPayload:
Serialize + DeserializeOwned + Debug + Send + Sync + Clone + 'static
Expand Down Expand Up @@ -382,12 +383,26 @@ pub async fn emit_impl<T: NotificationPayload>(
Ok(())
}

pub async fn emit_ephemeral_impl<T: NotificationPayload>(
pub async fn emit<T: NotificationPayload>(
db: &mut PgConnection,
key: Option<&str>,
payload: &T,
) -> Result<(), sqlx::Error> {
let kind = T::kind().to_string();
emit_impl(db, None, payload).await
}

pub async fn emit_with_key<T: NotificationPayload>(
db: &mut PgConnection,
key: &str,
payload: &T,
) -> Result<(), sqlx::Error> {
emit_impl(db, Some(key), payload).await
}

pub async fn emit_backend_metrics(
db: &mut PgConnection,
key: &str,
payload: &BackendMetricsMessage,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
select pg_notify(
Expand All @@ -399,43 +414,13 @@ pub async fn emit_ephemeral_impl<T: NotificationPayload>(
'key', $2::text
)::text
)"#,
kind,
BackendMetricsMessage::kind().to_string(),
key,
serde_json::to_value(&payload).map_sqlx_error()?,
EVENT_CHANNEL,
BACKEND_METRICS_EVENT_CHANNEL,
)
.execute(db)
.await?;

Ok(())
}

pub async fn emit<T: NotificationPayload>(
db: &mut PgConnection,
payload: &T,
) -> Result<(), sqlx::Error> {
emit_impl(db, None, payload).await
}

pub async fn emit_with_key<T: NotificationPayload>(
db: &mut PgConnection,
key: &str,
payload: &T,
) -> Result<(), sqlx::Error> {
emit_impl(db, Some(key), payload).await
}

pub async fn emit_ephemeral<T: NotificationPayload>(
db: &mut PgConnection,
payload: &T,
) -> Result<(), sqlx::Error> {
emit_ephemeral_impl(db, None, payload).await
}

pub async fn emit_ephemeral_with_key<T: NotificationPayload>(
db: &mut PgConnection,
key: &str,
payload: &T,
) -> Result<(), sqlx::Error> {
emit_ephemeral_impl(db, Some(key), payload).await
}

0 comments on commit b6ec34a

Please sign in to comment.