Skip to content

Commit

Permalink
send backend metrics to their own postgres channel (#843)
Browse files Browse the repository at this point in the history
<!-- ELLIPSIS_HIDDEN -->



> [!IMPORTANT]
> Separate backend metrics into their own PostgreSQL channel using
`emit_backend_metrics` in `subscribe.rs`.
> 
>   - **Behavior**:
> - `publish_metrics` in `backend.rs` now uses `emit_backend_metrics` to
send metrics to `plane_backend_metrics` channel.
> - Removes `emit_ephemeral_with_key` and `emit_ephemeral_impl` from
`subscribe.rs`.
>   - **Functions**:
> - Adds `emit_backend_metrics` in `subscribe.rs` to handle backend
metrics separately.
>     - Modifies `emit_impl` to support new channel logic.
>   - **Constants**:
> - Adds `BACKEND_METRICS_EVENT_CHANNEL` in `subscribe.rs` for backend
metrics.
> 
> <sup>This description was created by </sup>[<img alt="Ellipsis"
src="https://img.shields.io/badge/Ellipsis-blue?color=175173">](https://www.ellipsis.dev?ref=jamsocket%2Fplane&utm_source=github&utm_medium=referral)<sup>
for b6ec34a. It will automatically
update as commits are pushed.</sup>

<!-- ELLIPSIS_HIDDEN -->
  • Loading branch information
rolyatmax authored Nov 15, 2024
1 parent 72e20d6 commit 97d5748
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 39 deletions.
4 changes: 2 additions & 2 deletions plane/src/database/backend.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{
subscribe::{emit_ephemeral_with_key, emit_with_key},
subscribe::{emit_backend_metrics, emit_with_key},
PlaneDatabase,
};
use crate::{
Expand Down Expand Up @@ -455,7 +455,7 @@ impl<'a> BackendDatabase<'a> {

pub async fn publish_metrics(&self, metrics: BackendMetricsMessage) -> sqlx::Result<()> {
let mut txn = self.db.pool.begin().await?;
emit_ephemeral_with_key(&mut txn, &metrics.backend_id.to_string(), &metrics).await?;
emit_backend_metrics(&mut txn, &metrics.backend_id.to_string(), &metrics).await?;
txn.commit().await?;
Ok(())
}
Expand Down
59 changes: 22 additions & 37 deletions plane/src/database/subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::database::util::MapSqlxError;
use crate::database::{backend::BackendMetricsMessage, util::MapSqlxError};
use crate::util::ExponentialBackoff;
use chrono::{DateTime, Utc};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
Expand All @@ -17,7 +17,8 @@ use tokio::{

type ListenerMap = Arc<RwLock<HashMap<(String, Option<String>), Box<dyn TypedSender>>>>;

const EVENT_CHANNEL: &str = "plane_events";
pub const EVENT_CHANNEL: &str = "plane_events";
pub const BACKEND_METRICS_EVENT_CHANNEL: &str = "plane_backend_metrics";

pub trait NotificationPayload:
Serialize + DeserializeOwned + Debug + Send + Sync + Clone + 'static
Expand Down Expand Up @@ -382,12 +383,26 @@ pub async fn emit_impl<T: NotificationPayload>(
Ok(())
}

pub async fn emit_ephemeral_impl<T: NotificationPayload>(
pub async fn emit<T: NotificationPayload>(
db: &mut PgConnection,
key: Option<&str>,
payload: &T,
) -> Result<(), sqlx::Error> {
let kind = T::kind().to_string();
emit_impl(db, None, payload).await
}

pub async fn emit_with_key<T: NotificationPayload>(
db: &mut PgConnection,
key: &str,
payload: &T,
) -> Result<(), sqlx::Error> {
emit_impl(db, Some(key), payload).await
}

pub async fn emit_backend_metrics(
db: &mut PgConnection,
key: &str,
payload: &BackendMetricsMessage,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
select pg_notify(
Expand All @@ -399,43 +414,13 @@ pub async fn emit_ephemeral_impl<T: NotificationPayload>(
'key', $2::text
)::text
)"#,
kind,
BackendMetricsMessage::kind().to_string(),
key,
serde_json::to_value(&payload).map_sqlx_error()?,
EVENT_CHANNEL,
BACKEND_METRICS_EVENT_CHANNEL,
)
.execute(db)
.await?;

Ok(())
}

pub async fn emit<T: NotificationPayload>(
db: &mut PgConnection,
payload: &T,
) -> Result<(), sqlx::Error> {
emit_impl(db, None, payload).await
}

pub async fn emit_with_key<T: NotificationPayload>(
db: &mut PgConnection,
key: &str,
payload: &T,
) -> Result<(), sqlx::Error> {
emit_impl(db, Some(key), payload).await
}

pub async fn emit_ephemeral<T: NotificationPayload>(
db: &mut PgConnection,
payload: &T,
) -> Result<(), sqlx::Error> {
emit_ephemeral_impl(db, None, payload).await
}

pub async fn emit_ephemeral_with_key<T: NotificationPayload>(
db: &mut PgConnection,
key: &str,
payload: &T,
) -> Result<(), sqlx::Error> {
emit_ephemeral_impl(db, Some(key), payload).await
}

0 comments on commit 97d5748

Please sign in to comment.