Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store evals in flattened table #120

Merged
merged 1 commit into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions app-server/src/api/v1/evaluations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ async fn create_evaluation(
});

// Flattened scores from all evaluators to be recorded to Clickhouse
// Its length can be longer than the amount of evaluation datapoints since each evaluator can return multiple scores
// Its length can be longer than the amount of evaluation datapoints
// since each datapoint can have multiple evaluators
let ch_evaluation_scores = EvaluationScore::from_evaluation_datapoint_results(
&points,
ids,
&ids,
project_id,
group_id,
evaluation.id,
Expand Down
2 changes: 1 addition & 1 deletion app-server/src/ch/evaluation_scores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct EvaluationScore {
impl EvaluationScore {
pub fn from_evaluation_datapoint_results(
points: &Vec<EvaluationDatapointResult>,
result_ids: Vec<Uuid>,
result_ids: &Vec<Uuid>,
project_id: Uuid,
group_id: String,
evaluation_id: Uuid,
Expand Down
120 changes: 78 additions & 42 deletions app-server/src/db/evaluations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct Evaluation {

#[derive(Serialize, FromRow)]
#[serde(rename_all = "camelCase")]
pub struct EvaluationDatapointPreview {
pub struct EvaluationDatapoint {
pub id: Uuid,
pub created_at: DateTime<Utc>,
pub evaluation_id: Uuid,
Expand All @@ -35,6 +35,15 @@ pub struct EvaluationDatapointPreview {
pub trace_id: Uuid,
}

#[derive(Serialize, FromRow)]
#[serde(rename_all = "camelCase")]
pub struct EvaluationDatapointPreview {
pub id: Uuid,
pub created_at: DateTime<Utc>,
pub evaluation_id: Uuid,
pub trace_id: Uuid,
}

pub async fn create_evaluation(
pool: &PgPool,
name: &String,
Expand Down Expand Up @@ -113,7 +122,7 @@ pub async fn get_evaluations_grouped_by_current_evaluation(

/// Record evaluation results in the database.
///
/// Each target data may contain an empty JSON object, if there is no target data.
/// Each target may contain an empty JSON object, if there is no target.
pub async fn set_evaluation_results(
db: Arc<DB>,
evaluation_id: Uuid,
Expand All @@ -124,16 +133,11 @@ pub async fn set_evaluation_results(
executor_outputs: &Vec<Option<Value>>,
trace_ids: &Vec<Uuid>,
) -> Result<()> {
let scores = scores
.iter()
.map(|score| serde_json::to_value(score.clone()).unwrap())
.collect::<Vec<_>>();

let res = sqlx::query(
r#"INSERT INTO evaluation_results (
let results = sqlx::query_as!(
EvaluationDatapointPreview,
r"INSERT INTO evaluation_results (
id,
evaluation_id,
scores,
data,
target,
executor_output,
Expand All @@ -142,54 +146,86 @@ pub async fn set_evaluation_results(
)
SELECT
id,
$8 as evaluation_id,
scores,
$7 as evaluation_id,
data,
target,
executor_output,
trace_id,
index_in_batch
FROM
UNNEST ($1::uuid[], $2::jsonb[], $3::jsonb[], $4::jsonb[], $5::jsonb[], $6::uuid[], $7::int8[])
AS tmp_table(id, scores, data, target, executor_output, trace_id, index_in_batch)"#,
UNNEST ($1::uuid[], $2::jsonb[], $3::jsonb[], $4::jsonb[], $5::uuid[], $6::int8[])
AS tmp_table(id, data, target, executor_output, trace_id, index_in_batch)
RETURNING id, created_at, evaluation_id, trace_id
",
ids,
datas,
targets,
executor_outputs as &Vec<Option<Value>>,
trace_ids,
&Vec::from_iter(0..ids.len() as i64),
evaluation_id,
)
.bind(ids)
.bind(scores)
.bind(datas)
.bind(targets)
.bind(executor_outputs)
.bind(trace_ids)
.bind(Vec::from_iter(0..ids.len() as i64))
.bind(evaluation_id)
.execute(&db.pool)
.await;
.fetch_all(&db.pool)
.await?;

// Each datapoint can have multiple scores, so unzip the scores and result ids.
let (score_result_ids, (score_names, score_values)): (Vec<Uuid>, (Vec<String>, Vec<f64>)) =
scores
.iter()
.zip(results.iter())
.flat_map(|(score, result)| {
score
.iter()
.map(|(name, value)| (result.id, (name.clone(), value)))
})
.unzip();

if let Err(e) = res {
log::error!("Error inserting evaluation results: {}", e);
}
sqlx::query!(
"INSERT INTO evaluation_scores (result_id, name, score)
SELECT
result_id,
name,
score
FROM UNNEST ($1::uuid[], $2::text[], $3::float8[])
AS tmp_table(result_id, name, score)",
&score_result_ids,
&score_names,
&score_values,
)
.execute(&db.pool)
.await?;

Ok(())
}

pub async fn get_evaluation_results(
pool: &PgPool,
evaluation_id: Uuid,
) -> Result<Vec<EvaluationDatapointPreview>> {
let results = sqlx::query_as::<_, EvaluationDatapointPreview>(
"SELECT
id,
created_at,
evaluation_id,
data,
target,
executor_output,
scores,
trace_id
FROM evaluation_results
) -> Result<Vec<EvaluationDatapoint>> {
let results = sqlx::query_as!(
EvaluationDatapoint,
"WITH scores AS (
SELECT
result_id,
jsonb_object_agg(name, score) as scores
FROM evaluation_scores
GROUP BY result_id
)
SELECT
r.id,
r.created_at,
r.evaluation_id,
r.data,
r.target,
r.executor_output,
s.scores,
r.trace_id
FROM evaluation_results r
LEFT JOIN scores s ON r.id = s.result_id
WHERE evaluation_id = $1
ORDER BY created_at ASC, index_in_batch ASC NULLS FIRST",
evaluation_id,
)
.bind(evaluation_id)
.fetch_all(pool)
.await?;

Expand All @@ -207,8 +243,8 @@ pub async fn delete_evaluation(pool: &PgPool, evaluation_id: &Uuid) -> Result<()
pub async fn get_evaluation_datapoint(
pool: &PgPool,
evaluation_result_id: Uuid,
) -> Result<EvaluationDatapointPreview> {
let preview = sqlx::query_as::<_, EvaluationDatapointPreview>(
) -> Result<EvaluationDatapoint> {
let preview = sqlx::query_as::<_, EvaluationDatapoint>(
"SELECT
id,
created_at,
Expand Down
4 changes: 2 additions & 2 deletions app-server/src/routes/evaluations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
get_global_evaluation_scores_bounds, EvaluationScoreBucket,
},
db::{
evaluations::{self, Evaluation, EvaluationDatapointPreview},
evaluations::{self, Evaluation, EvaluationDatapoint},
DB,
},
};
Expand Down Expand Up @@ -63,7 +63,7 @@ async fn get_evaluations(
#[serde(rename_all = "camelCase")]
pub struct GetEvaluationResponse {
evaluation: Evaluation,
results: Vec<EvaluationDatapointPreview>,
results: Vec<EvaluationDatapoint>,
}

#[get("evaluations/{evaluation_id}")]
Expand Down
31 changes: 31 additions & 0 deletions frontend/lib/db/migrations/0001_reflective_nuke.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
CREATE TABLE IF NOT EXISTS "evaluation_scores" (
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL,
"created_at" timestamp with time zone DEFAULT now() NOT NULL,
"result_id" uuid DEFAULT gen_random_uuid() NOT NULL,
"name" text DEFAULT '' NOT NULL,
"score" double precision NOT NULL
);
--> statement-breakpoint
CREATE TABLE IF NOT EXISTS "labeling_queues" (
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL,
"created_at" timestamp with time zone DEFAULT now() NOT NULL,
"name" text NOT NULL,
"project_id" uuid NOT NULL
);
--> statement-breakpoint
ALTER TABLE "evaluation_results" ALTER COLUMN "scores" DROP NOT NULL;--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "evaluation_scores" ADD CONSTRAINT "evaluation_scores_result_id_fkey" FOREIGN KEY ("result_id") REFERENCES "public"."evaluation_results"("id") ON DELETE cascade ON UPDATE cascade;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "labeling_queues" ADD CONSTRAINT "labeling_queues_project_id_fkey" FOREIGN KEY ("project_id") REFERENCES "public"."projects"("id") ON DELETE cascade ON UPDATE cascade;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
--> statement-breakpoint
ALTER TABLE "evaluation_results" DROP COLUMN IF EXISTS "error";--> statement-breakpoint
ALTER TABLE "evaluations" DROP COLUMN IF EXISTS "score_names";--> statement-breakpoint
ALTER TABLE "evaluations" DROP COLUMN IF EXISTS "average_scores";
Loading