Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve UI for workspaces + internal fixes #193

Merged
merged 9 commits into from
Nov 10, 2024
2 changes: 2 additions & 0 deletions app-server/src/api/v1/evaluations.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use actix_web::{post, web, HttpResponse};
use chrono::Utc;
use serde::Deserialize;
use std::sync::Arc;
use uuid::Uuid;
Expand Down Expand Up @@ -84,6 +85,7 @@ async fn create_evaluation(
project_id,
group_id,
evaluation.id,
Utc::now(),
);

let ch_task = tokio::spawn(insert_evaluation_scores(
Expand Down
51 changes: 34 additions & 17 deletions app-server/src/ch/evaluation_scores.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
use anyhow::Result;
use chrono::{DateTime, Utc};
use clickhouse::Row;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Serialize, Serializer};
use uuid::Uuid;

use crate::evaluations::utils::EvaluationDatapointResult;

use super::utils::{execute_query, validate_string_against_injection};
use super::utils::{chrono_to_nanoseconds, execute_query, validate_string_against_injection};

fn serialize_timestamp<S>(timestamp: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_i64(chrono_to_nanoseconds(timestamp.clone()))
}

/// Evaluation score
#[derive(Row, Serialize)]
Expand All @@ -21,6 +29,8 @@ pub struct EvaluationScore {
// Note that one evaluator can produce multiple scores
pub name: String,
pub value: f64,
#[serde(serialize_with = "serialize_timestamp")]
pub timestamp: DateTime<Utc>,
}

impl EvaluationScore {
Expand All @@ -30,6 +40,10 @@ impl EvaluationScore {
project_id: Uuid,
group_id: String,
evaluation_id: Uuid,
// TODO: timestamp must be set in each point. This needs to be sent from
// client libraries. For now the same timestamp is used for all scores,
// which is fine.
timestamp: DateTime<Utc>,
) -> Vec<EvaluationScore> {
points
.iter()
Expand All @@ -45,6 +59,7 @@ impl EvaluationScore {
result_id: *result_id,
name: name.to_string(),
value: value.clone(),
timestamp,
}
})
})
Expand Down Expand Up @@ -98,10 +113,11 @@ pub async fn get_average_evaluation_score(
validate_string_against_injection(&name)?;

let query = format!(
"SELECT avg(value) as average_value FROM evaluation_scores WHERE project_id = '{}' AND evaluation_id = '{}' AND name = '{}'",
project_id,
evaluation_id,
name
"SELECT avg(value) as average_value
FROM evaluation_scores
WHERE project_id = '{project_id}'
AND evaluation_id = '{evaluation_id}'
AND name = '{name}'",
);

let rows: Vec<AverageEvaluationScore> = execute_query(&clickhouse, &query).await?;
Expand Down Expand Up @@ -137,10 +153,10 @@ pub async fn get_evaluation_score_buckets_based_on_bounds(
"
WITH intervals AS (
SELECT
arrayJoin([{}]) AS interval_num,
arrayJoin([{interval_nums}]) AS interval_num,
{:?} + ((interval_num - 1) * {:?}) AS lower_bound,
CASE
WHEN interval_num = {} THEN {:?}
WHEN interval_num = {bucket_count} THEN {:?}
ELSE {:?} + (interval_num * {:?})
END AS upper_bound
)
Expand All @@ -149,17 +165,19 @@ SELECT
intervals.upper_bound,
COUNT(CASE
WHEN value >= intervals.lower_bound AND value < intervals.upper_bound THEN 1
WHEN intervals.interval_num = {} AND value >= intervals.lower_bound AND value <= intervals.upper_bound THEN 1
WHEN intervals.interval_num = {bucket_count}
AND value >= intervals.lower_bound
AND value <= intervals.upper_bound THEN 1
ELSE NULL
END) AS height
FROM evaluation_scores
JOIN intervals ON 1 = 1
WHERE project_id = '{}'
AND evaluation_id = '{}'
AND name = '{}'
WHERE project_id = '{project_id}'
AND evaluation_id = '{evaluation_id}'
AND name = '{name}'
GROUP BY intervals.lower_bound, intervals.upper_bound, intervals.interval_num
ORDER BY intervals.interval_num",
interval_nums, lower_bound, step_size, bucket_count, upper_bound, lower_bound, step_size, bucket_count, project_id, evaluation_id, name
lower_bound, step_size, upper_bound, lower_bound, step_size
);

let rows: Vec<EvaluationScoreBucket> = execute_query(&clickhouse, &query).await?;
Expand Down Expand Up @@ -191,10 +209,9 @@ pub async fn get_global_evaluation_scores_bounds(
SELECT
MAX(value) AS upper_bound
FROM evaluation_scores
WHERE project_id = '{}'
AND evaluation_id IN ({})
AND name = '{}'",
project_id, evaluation_ids_str, name
WHERE project_id = '{project_id}'
AND evaluation_id IN ({evaluation_ids_str})
AND name = '{name}'",
);

let rows: Vec<ComparedEvaluationScoresBounds> = execute_query(&clickhouse, &query).await?;
Expand Down
24 changes: 0 additions & 24 deletions app-server/src/db/evaluations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,27 +237,3 @@ pub async fn delete_evaluation(pool: &PgPool, evaluation_id: &Uuid) -> Result<()
.await?;
Ok(())
}

pub async fn get_evaluation_datapoint(
pool: &PgPool,
evaluation_result_id: Uuid,
) -> Result<EvaluationDatapoint> {
let preview = sqlx::query_as::<_, EvaluationDatapoint>(
"SELECT
id,
created_at,
evaluation_id,
scores,
data,
target,
trace_id,
executor_output,
FROM evaluation_results
WHERE id = $1",
)
.bind(evaluation_result_id)
.fetch_one(pool)
.await?;

Ok(preview)
}
2 changes: 0 additions & 2 deletions app-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,6 @@ fn main() -> anyhow::Result<()> {
.service(routes::api_keys::revoke_project_api_key)
.service(routes::evaluations::get_evaluation)
.service(routes::evaluations::delete_evaluation)
.service(routes::evaluations::get_evaluation_datapoint)
.service(routes::evaluations::get_evaluation_score_stats)
.service(
routes::evaluations::get_evaluation_score_distribution,
Expand All @@ -507,7 +506,6 @@ fn main() -> anyhow::Result<()> {
.service(routes::datasets::index_dataset)
.service(routes::evaluations::get_evaluations)
.service(routes::evaluations::get_evaluation)
.service(routes::evaluations::get_evaluation_datapoint)
.service(routes::traces::get_traces)
.service(routes::traces::get_single_trace)
.service(routes::traces::get_single_span)
Expand Down
12 changes: 0 additions & 12 deletions app-server/src/routes/evaluations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,6 @@ async fn get_evaluation(path: web::Path<(Uuid, Uuid)>, db: web::Data<DB>) -> Res
Ok(HttpResponse::Ok().json(response))
}

#[get("evaluations/{evaluation_id}/datapoints/{datapoint_id}")]
async fn get_evaluation_datapoint(
path: web::Path<(Uuid, Uuid, Uuid)>,
db: web::Data<DB>,
) -> ResponseResult {
let (_project_id, _evaluation_id, datapoint_id) = path.into_inner();

let result = evaluations::get_evaluation_datapoint(&db.pool, datapoint_id).await?;

Ok(HttpResponse::Ok().json(result))
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetEvaluationScoreStatsQuery {
Expand Down
3 changes: 2 additions & 1 deletion clickhouse/001000-initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ SETTINGS index_granularity = 8192 SETTINGS flatten_nested=0;
CREATE TABLE evaluation_scores (
project_id UUID,
group_id String,
timestamp DateTime64(9, 'UTC'),
evaluation_id UUID,
result_id UUID,
name String,
value Float64
) ENGINE = MergeTree()
ORDER BY (project_id, group_id, evaluation_id, name)
ORDER BY (project_id, group_id, timestamp, evaluation_id, name)
SETTINGS index_granularity = 8192
SETTINGS flatten_nested=0;

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,37 +1,70 @@
import { getServerSession } from 'next-auth';
import { authOptions } from '@/lib/auth';
import { fetcher } from '@/lib/utils';
import { db } from '@/lib/db/drizzle';
import { and, asc, eq, sql } from 'drizzle-orm';
import {
evaluationResults,
evaluations,
evaluationScores
} from '@/lib/db/migrations/schema';

export async function GET(
req: Request,
{ params }: { params: { projectId: string; evaluationId: string } }
): Promise<Response> {
const projectId = params.projectId;
const evaluationId = params.evaluationId;
const session = await getServerSession(authOptions);
const user = session!.user;

return await fetcher(`/projects/${projectId}/evaluations/${evaluationId}`, {
method: 'GET',
headers: {
Authorization: `Bearer ${user.apiKey}`
}
const getEvaluation = db.query.evaluations.findFirst({
where: and(
eq(evaluations.id, evaluationId),
eq(evaluations.projectId, projectId)
)
});
}

export async function DELETE(
req: Request,
{ params }: { params: { projectId: string; evaluationId: string } }
): Promise<Response> {
const projectId = params.projectId;
const evaluationId = params.evaluationId;
const session = await getServerSession(authOptions);
const user = session!.user;
const subQueryScoreCte = db.$with('scores').as(
db
.select({
resultId: evaluationScores.resultId,
cteScores:
sql`jsonb_object_agg(${evaluationScores.name}, ${evaluationScores.score})`.as(
'cte_scores'
)
})
.from(evaluationScores)
.groupBy(evaluationScores.resultId)
);

return await fetcher(`/projects/${projectId}/evaluations/${evaluationId}`, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${user.apiKey}`
}
});
const getEvaluationResults = db
.with(subQueryScoreCte)
.select({
id: evaluationResults.id,
createdAt: evaluationResults.createdAt,
evaluationId: evaluationResults.evaluationId,
data: evaluationResults.data,
target: evaluationResults.target,
executorOutput: evaluationResults.executorOutput,
scores: subQueryScoreCte.cteScores,
traceId: evaluationResults.traceId
})
.from(evaluationResults)
.leftJoin(
subQueryScoreCte,
eq(evaluationResults.id, subQueryScoreCte.resultId)
)
.where(eq(evaluationResults.evaluationId, evaluationId))
.orderBy(
asc(evaluationResults.createdAt),
asc(evaluationResults.indexInBatch)
);

const [evaluation, results] = await Promise.all([
getEvaluation,
getEvaluationResults
]);

const result = {
evaluation: evaluation,
results
};

return Response.json(result);
}
16 changes: 8 additions & 8 deletions frontend/app/api/projects/[projectId]/evaluations/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@ import { evaluations } from '@/lib/db/migrations/schema';
import { and, desc, eq, inArray } from 'drizzle-orm';
import { paginatedGet } from '@/lib/db/utils';
import { Evaluation } from '@/lib/evaluation/types';
import { NextRequest } from 'next/server';

export async function GET(
req: Request,
req: NextRequest,
{ params }: { params: { projectId: string } }
): Promise<Response> {
const projectId = params.projectId;



const result = await paginatedGet<any, Evaluation>({
table: evaluations,
filters: [eq(evaluations.projectId, projectId)],
orderBy: desc(evaluations.createdAt),
orderBy: desc(evaluations.createdAt)
});

return Response.json(result);
Expand All @@ -27,17 +26,18 @@ export async function DELETE(
): Promise<Response> {
const projectId = params.projectId;



const { searchParams } = new URL(req.url);
const evaluationIds = searchParams.get('evaluationIds')?.split(',');

if (!evaluationIds) {
return new Response('At least one Evaluation ID is required', { status: 400 });
return new Response('At least one Evaluation ID is required', {
status: 400
});
}

try {
await db.delete(evaluations)
await db
.delete(evaluations)
.where(
and(
inArray(evaluations.id, evaluationIds),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { Feature } from '@/lib/features/features';
import { clickhouseClient } from '@/lib/clickhouse/client';
import { z } from 'zod';

const NANOS_PER_MILLISECOND = 1_000_000;

const removeQueueItemSchema = z.object({
id: z.string(),
spanId: z.string(),
Expand Down Expand Up @@ -86,6 +88,7 @@ export async function POST(request: Request, { params }: { params: { projectId:
result_id: resultId,
name: value.name,
value: value.score,
timestamp: new Date().getTime() * NANOS_PER_MILLISECOND
}))
});
}
Expand Down
Loading