Skip to content

Commit

Permalink
sort label values by numeric value (#243)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinmukhamedm authored Nov 26, 2024
1 parent c3a6543 commit aad5b79
Show file tree
Hide file tree
Showing 22 changed files with 3,676 additions and 611 deletions.
59 changes: 9 additions & 50 deletions app-server/src/api/v1/evaluations.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
use actix_web::{post, web, HttpResponse};
use chrono::Utc;
use serde::Deserialize;
use std::sync::Arc;
use uuid::Uuid;

use crate::{
ch::evaluation_scores::{insert_evaluation_scores, EvaluationScore},
db::{self, project_api_keys::ProjectApiKey, DB},
evaluations::utils::{
datapoints_to_labeling_queues, get_columns_from_points, EvaluationDatapointResult,
},
evaluations::{save_evaluation_scores, utils::EvaluationDatapointResult},
names::NameGenerator,
routes::types::ResponseResult,
};
Expand Down Expand Up @@ -52,51 +47,15 @@ async fn create_evaluation(
let evaluation =
db::evaluations::create_evaluation(&db.pool, &name, project_id, &group_id).await?;

let columns = get_columns_from_points(&points);
let ids = points.iter().map(|_| Uuid::new_v4()).collect::<Vec<_>>();
let labeling_queues =
datapoints_to_labeling_queues(db.clone(), &points, &ids, &project_id).await?;

for (queue_id, entries) in labeling_queues.iter() {
db::labeling_queues::push_to_labeling_queue(&db.pool, queue_id, &entries).await?;
}

let ids_clone = ids.clone();
let db_task = tokio::spawn(async move {
db::evaluations::set_evaluation_results(
db.clone(),
evaluation.id,
&ids_clone,
&columns.scores,
&columns.datas,
&columns.targets,
&columns.executor_outputs,
&columns.trace_ids,
)
.await
});

// Flattened scores from all evaluators to be recorded to Clickhouse
// Its length can be longer than the amount of evaluation datapoints
// since each datapoint can have multiple evaluators
let ch_evaluation_scores = EvaluationScore::from_evaluation_datapoint_results(
&points,
&ids,
project_id,
group_id,
save_evaluation_scores(
db.clone(),
clickhouse,
points,
evaluation.id,
Utc::now(),
);

let ch_task = tokio::spawn(insert_evaluation_scores(
clickhouse.clone(),
ch_evaluation_scores,
));

let (db_result, ch_result) = tokio::join!(db_task, ch_task);

db_result.map_err(|e| anyhow::anyhow!("Database task failed: {}", e))??;
ch_result.map_err(|e| anyhow::anyhow!("Clickhouse task failed: {}", e))??;
project_id,
&group_id,
)
.await?;

Ok(HttpResponse::Ok().json(evaluation))
}
23 changes: 23 additions & 0 deletions app-server/src/ch/evaluation_scores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,26 @@ WHERE project_id = ?

Ok(row)
}

pub async fn delete_evaluation_score(
clickhouse: clickhouse::Client,
project_id: Uuid,
result_id: Uuid,
label_id: Uuid,
) -> Result<()> {
if !is_feature_enabled(Feature::FullBuild) {
return Ok(());
}
// Note, this does not immediately physically delete the data.
// https://clickhouse.com/docs/en/sql-reference/statements/delete
clickhouse
.query(
"DELETE FROM evaluation_scores WHERE project_id = ? AND result_id = ? AND label_id = ?",
)
.bind(project_id)
.bind(result_id)
.bind(label_id)
.execute()
.await?;
Ok(())
}
148 changes: 59 additions & 89 deletions app-server/src/db/evaluations.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc};
use std::collections::HashMap;

use anyhow::Result;
use chrono::{DateTime, Utc};
Expand All @@ -7,8 +7,6 @@ use serde_json::Value;
use sqlx::{prelude::FromRow, PgPool};
use uuid::Uuid;

use super::DB;

#[derive(Serialize, FromRow)]
#[serde(rename_all = "camelCase")]
pub struct Evaluation {
Expand Down Expand Up @@ -69,62 +67,11 @@ pub async fn create_evaluation(
Ok(evaluation)
}

pub async fn get_evaluation(
db: Arc<DB>,
project_id: Uuid,
evaluation_id: Uuid,
) -> Result<Evaluation> {
let evaluation = sqlx::query_as::<_, Evaluation>(
"SELECT
id, name, project_id, created_at, group_id
FROM evaluations WHERE id = $1 AND project_id = $2",
)
.bind(evaluation_id)
.bind(project_id)
.fetch_one(&db.pool)
.await?;

Ok(evaluation)
}

pub async fn get_evaluations(pool: &PgPool, project_id: Uuid) -> Result<Vec<Evaluation>> {
let evaluations = sqlx::query_as::<_, Evaluation>(
"SELECT id, name, project_id, created_at, group_id
FROM evaluations WHERE project_id = $1
ORDER BY created_at DESC",
)
.bind(project_id)
.fetch_all(pool)
.await?;

Ok(evaluations)
}

pub async fn get_evaluations_grouped_by_current_evaluation(
pool: &PgPool,
project_id: Uuid,
current_evaluation_id: Uuid,
) -> Result<Vec<Evaluation>> {
let evaluations = sqlx::query_as::<_, Evaluation>(
"SELECT id, name, project_id, created_at, group_id
FROM evaluations
WHERE project_id = $1
AND group_id = (SELECT group_id FROM evaluations WHERE id = $2)
ORDER BY created_at DESC",
)
.bind(project_id)
.bind(current_evaluation_id)
.fetch_all(pool)
.await?;

Ok(evaluations)
}

/// Record evaluation results in the database.
///
/// Each target may contain an empty JSON object, if there is no target.
pub async fn set_evaluation_results(
db: Arc<DB>,
pool: &PgPool,
evaluation_id: Uuid,
ids: &Vec<Uuid>,
scores: &Vec<HashMap<String, f64>>,
Expand Down Expand Up @@ -164,7 +111,7 @@ pub async fn set_evaluation_results(
.bind(trace_ids)
.bind(&Vec::from_iter(0..ids.len() as i64))
.bind(evaluation_id)
.fetch_all(&db.pool)
.fetch_all(pool)
.await?;

// Each datapoint can have multiple scores, so unzip the scores and result ids.
Expand All @@ -191,49 +138,72 @@ pub async fn set_evaluation_results(
.bind(&score_result_ids)
.bind(&score_names)
.bind(&score_values)
.execute(&db.pool)
.execute(pool)
.await?;

Ok(())
}

pub async fn get_evaluation_results(
pub async fn add_evaluation_score(
pool: &PgPool,
evaluation_id: Uuid,
) -> Result<Vec<EvaluationDatapoint>> {
let results = sqlx::query_as::<_, EvaluationDatapoint>(
"WITH scores AS (
SELECT
result_id,
jsonb_object_agg(name, score) as scores
FROM evaluation_scores
GROUP BY result_id
)
SELECT
r.id,
r.created_at,
r.evaluation_id,
r.data,
r.target,
r.executor_output,
s.scores,
r.trace_id
FROM evaluation_results r
LEFT JOIN scores s ON r.id = s.result_id
WHERE evaluation_id = $1
ORDER BY created_at ASC, index_in_batch ASC NULLS FIRST",
result_id: Uuid,
name: &String,
score: f64,
label_id: Option<Uuid>,
) -> Result<()> {
sqlx::query(
"INSERT INTO evaluation_scores (result_id, name, score, label_id)
VALUES ($1, $2, $3, $4)",
)
.bind(evaluation_id)
.fetch_all(pool)
.bind(result_id)
.bind(name)
.bind(score)
.bind(label_id)
.execute(pool)
.await?;

Ok(results)
Ok(())
}

pub async fn delete_evaluation(pool: &PgPool, evaluation_id: &Uuid) -> Result<()> {
sqlx::query("DELETE FROM evaluations WHERE id = $1")
.bind(evaluation_id)
.execute(pool)
.await?;
pub async fn get_evaluation_by_result_id(
pool: &PgPool,
project_id: Uuid,
result_id: Uuid,
) -> Result<Evaluation> {
let evaluation = sqlx::query_as::<_, Evaluation>(
"SELECT id, created_at, name, project_id, group_id
FROM evaluations
WHERE id = (SELECT evaluation_id FROM evaluation_results WHERE id = $1 LIMIT 1)
AND project_id = $2",
)
.bind(result_id)
.bind(project_id)
.fetch_one(pool)
.await?;

Ok(evaluation)
}

pub async fn delete_evaluation_score(
pool: &PgPool,
project_id: Uuid,
result_id: Uuid,
label_id: Uuid,
) -> Result<()> {
sqlx::query(
"DELETE FROM evaluation_scores
WHERE result_id = $1 AND label_id = $2
AND result_id IN (
SELECT id FROM evaluation_results WHERE evaluation_id IN (
SELECT id from evaluations WHERE project_id = $3
)
)",
)
.bind(result_id)
.bind(label_id)
.bind(project_id)
.execute(pool)
.await?;

Ok(())
}
2 changes: 1 addition & 1 deletion app-server/src/db/labels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub async fn update_span_label(
VALUES ($1, $2, $3, $4, $5, now(), $6, $7)
ON CONFLICT (span_id, class_id, user_id)
DO UPDATE SET value = $5, updated_at = now(), label_source = $6,
reasoning = CASE WHEN $7 IS NOT NULL THEN $7 ELSE labels.reasoning END
reasoning = COALESCE($7, labels.reasoning)
RETURNING
id,
span_id,
Expand Down
Loading

0 comments on commit aad5b79

Please sign in to comment.