Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Link labels and evals with a new column #243

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 9 additions & 50 deletions app-server/src/api/v1/evaluations.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
use actix_web::{post, web, HttpResponse};
use chrono::Utc;
use serde::Deserialize;
use std::sync::Arc;
use uuid::Uuid;

use crate::{
ch::evaluation_scores::{insert_evaluation_scores, EvaluationScore},
db::{self, project_api_keys::ProjectApiKey, DB},
evaluations::utils::{
datapoints_to_labeling_queues, get_columns_from_points, EvaluationDatapointResult,
},
evaluations::{save_evaluation_scores, utils::EvaluationDatapointResult},
names::NameGenerator,
routes::types::ResponseResult,
};
Expand Down Expand Up @@ -52,51 +47,15 @@ async fn create_evaluation(
let evaluation =
db::evaluations::create_evaluation(&db.pool, &name, project_id, &group_id).await?;

let columns = get_columns_from_points(&points);
let ids = points.iter().map(|_| Uuid::new_v4()).collect::<Vec<_>>();
let labeling_queues =
datapoints_to_labeling_queues(db.clone(), &points, &ids, &project_id).await?;

for (queue_id, entries) in labeling_queues.iter() {
db::labeling_queues::push_to_labeling_queue(&db.pool, queue_id, &entries).await?;
}

let ids_clone = ids.clone();
let db_task = tokio::spawn(async move {
db::evaluations::set_evaluation_results(
db.clone(),
evaluation.id,
&ids_clone,
&columns.scores,
&columns.datas,
&columns.targets,
&columns.executor_outputs,
&columns.trace_ids,
)
.await
});

// Flattened scores from all evaluators to be recorded to Clickhouse
// Its length can be longer than the amount of evaluation datapoints
// since each datapoint can have multiple evaluators
let ch_evaluation_scores = EvaluationScore::from_evaluation_datapoint_results(
&points,
&ids,
project_id,
group_id,
save_evaluation_scores(
db.clone(),
clickhouse,
points,
evaluation.id,
Utc::now(),
);

let ch_task = tokio::spawn(insert_evaluation_scores(
clickhouse.clone(),
ch_evaluation_scores,
));

let (db_result, ch_result) = tokio::join!(db_task, ch_task);

db_result.map_err(|e| anyhow::anyhow!("Database task failed: {}", e))??;
ch_result.map_err(|e| anyhow::anyhow!("Clickhouse task failed: {}", e))??;
project_id,
&group_id,
)
.await?;

Ok(HttpResponse::Ok().json(evaluation))
}
23 changes: 23 additions & 0 deletions app-server/src/ch/evaluation_scores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,26 @@ WHERE project_id = ?

Ok(row)
}

pub async fn delete_evaluation_score(
clickhouse: clickhouse::Client,
project_id: Uuid,
result_id: Uuid,
label_id: Uuid,
) -> Result<()> {
if !is_feature_enabled(Feature::FullBuild) {
return Ok(());
}
// Note, this does not immediately physically delete the data.
// https://clickhouse.com/docs/en/sql-reference/statements/delete
clickhouse
.query(
"DELETE FROM evaluation_scores WHERE project_id = ? AND result_id = ? AND label_id = ?",
)
.bind(project_id)
.bind(result_id)
.bind(label_id)
.execute()
.await?;
Ok(())
}
148 changes: 59 additions & 89 deletions app-server/src/db/evaluations.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc};
use std::collections::HashMap;

use anyhow::Result;
use chrono::{DateTime, Utc};
Expand All @@ -7,8 +7,6 @@ use serde_json::Value;
use sqlx::{prelude::FromRow, PgPool};
use uuid::Uuid;

use super::DB;

#[derive(Serialize, FromRow)]
#[serde(rename_all = "camelCase")]
pub struct Evaluation {
Expand Down Expand Up @@ -69,62 +67,11 @@ pub async fn create_evaluation(
Ok(evaluation)
}

pub async fn get_evaluation(
db: Arc<DB>,
project_id: Uuid,
evaluation_id: Uuid,
) -> Result<Evaluation> {
let evaluation = sqlx::query_as::<_, Evaluation>(
"SELECT
id, name, project_id, created_at, group_id
FROM evaluations WHERE id = $1 AND project_id = $2",
)
.bind(evaluation_id)
.bind(project_id)
.fetch_one(&db.pool)
.await?;

Ok(evaluation)
}

pub async fn get_evaluations(pool: &PgPool, project_id: Uuid) -> Result<Vec<Evaluation>> {
let evaluations = sqlx::query_as::<_, Evaluation>(
"SELECT id, name, project_id, created_at, group_id
FROM evaluations WHERE project_id = $1
ORDER BY created_at DESC",
)
.bind(project_id)
.fetch_all(pool)
.await?;

Ok(evaluations)
}

pub async fn get_evaluations_grouped_by_current_evaluation(
pool: &PgPool,
project_id: Uuid,
current_evaluation_id: Uuid,
) -> Result<Vec<Evaluation>> {
let evaluations = sqlx::query_as::<_, Evaluation>(
"SELECT id, name, project_id, created_at, group_id
FROM evaluations
WHERE project_id = $1
AND group_id = (SELECT group_id FROM evaluations WHERE id = $2)
ORDER BY created_at DESC",
)
.bind(project_id)
.bind(current_evaluation_id)
.fetch_all(pool)
.await?;

Ok(evaluations)
}

/// Record evaluation results in the database.
///
/// Each target may contain an empty JSON object, if there is no target.
pub async fn set_evaluation_results(
db: Arc<DB>,
pool: &PgPool,
evaluation_id: Uuid,
ids: &Vec<Uuid>,
scores: &Vec<HashMap<String, f64>>,
Expand Down Expand Up @@ -164,7 +111,7 @@ pub async fn set_evaluation_results(
.bind(trace_ids)
.bind(&Vec::from_iter(0..ids.len() as i64))
.bind(evaluation_id)
.fetch_all(&db.pool)
.fetch_all(pool)
.await?;

// Each datapoint can have multiple scores, so unzip the scores and result ids.
Expand All @@ -191,49 +138,72 @@ pub async fn set_evaluation_results(
.bind(&score_result_ids)
.bind(&score_names)
.bind(&score_values)
.execute(&db.pool)
.execute(pool)
.await?;

Ok(())
}

pub async fn get_evaluation_results(
pub async fn add_evaluation_score(
pool: &PgPool,
evaluation_id: Uuid,
) -> Result<Vec<EvaluationDatapoint>> {
let results = sqlx::query_as::<_, EvaluationDatapoint>(
"WITH scores AS (
SELECT
result_id,
jsonb_object_agg(name, score) as scores
FROM evaluation_scores
GROUP BY result_id
)
SELECT
r.id,
r.created_at,
r.evaluation_id,
r.data,
r.target,
r.executor_output,
s.scores,
r.trace_id
FROM evaluation_results r
LEFT JOIN scores s ON r.id = s.result_id
WHERE evaluation_id = $1
ORDER BY created_at ASC, index_in_batch ASC NULLS FIRST",
result_id: Uuid,
name: &String,
score: f64,
label_id: Option<Uuid>,
) -> Result<()> {
sqlx::query(
"INSERT INTO evaluation_scores (result_id, name, score, label_id)
VALUES ($1, $2, $3, $4)",
)
.bind(evaluation_id)
.fetch_all(pool)
.bind(result_id)
.bind(name)
.bind(score)
.bind(label_id)
.execute(pool)
.await?;

Ok(results)
Ok(())
}

pub async fn delete_evaluation(pool: &PgPool, evaluation_id: &Uuid) -> Result<()> {
sqlx::query("DELETE FROM evaluations WHERE id = $1")
.bind(evaluation_id)
.execute(pool)
.await?;
pub async fn get_evaluation_by_result_id(
pool: &PgPool,
project_id: Uuid,
result_id: Uuid,
) -> Result<Evaluation> {
let evaluation = sqlx::query_as::<_, Evaluation>(
"SELECT id, created_at, name, project_id, group_id
FROM evaluations
WHERE id = (SELECT evaluation_id FROM evaluation_results WHERE id = $1 LIMIT 1)
AND project_id = $2",
)
.bind(result_id)
.bind(project_id)
.fetch_one(pool)
.await?;

Ok(evaluation)
}

pub async fn delete_evaluation_score(
pool: &PgPool,
project_id: Uuid,
result_id: Uuid,
label_id: Uuid,
) -> Result<()> {
sqlx::query(
"DELETE FROM evaluation_scores
WHERE result_id = $1 AND label_id = $2
AND result_id IN (
SELECT id FROM evaluation_results WHERE evaluation_id IN (
SELECT id from evaluations WHERE project_id = $3
)
)",
)
.bind(result_id)
.bind(label_id)
.bind(project_id)
.execute(pool)
.await?;

Ok(())
}
2 changes: 1 addition & 1 deletion app-server/src/db/labels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub async fn update_span_label(
VALUES ($1, $2, $3, $4, $5, now(), $6, $7)
ON CONFLICT (span_id, class_id, user_id)
DO UPDATE SET value = $5, updated_at = now(), label_source = $6,
reasoning = CASE WHEN $7 IS NOT NULL THEN $7 ELSE labels.reasoning END
reasoning = COALESCE($7, labels.reasoning)
RETURNING
id,
span_id,
Expand Down
Loading