From fbf1605db5ed3aad7c9300c18da7407b53c9e02a Mon Sep 17 00:00:00 2001 From: Din Date: Sun, 5 Jan 2025 15:29:15 +0500 Subject: [PATCH 1/8] wip: api for uploading spans --- app-server/src/api/v1/mod.rs | 1 + app-server/src/api/v1/spans.rs | 130 +++++++++++++++++++++++++++++++++ app-server/src/main.rs | 3 +- 3 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 app-server/src/api/v1/spans.rs diff --git a/app-server/src/api/v1/mod.rs b/app-server/src/api/v1/mod.rs index 72e1425..c055fd3 100644 --- a/app-server/src/api/v1/mod.rs +++ b/app-server/src/api/v1/mod.rs @@ -3,4 +3,5 @@ pub mod evaluations; pub mod metrics; pub mod pipelines; pub mod semantic_search; +pub mod spans; pub mod traces; diff --git a/app-server/src/api/v1/spans.rs b/app-server/src/api/v1/spans.rs new file mode 100644 index 0000000..2eca1a6 --- /dev/null +++ b/app-server/src/api/v1/spans.rs @@ -0,0 +1,130 @@ +use std::collections::HashMap; + +use actix_web::{post, web, HttpResponse}; +use chrono::{DateTime, Utc}; +use serde::Deserialize; +use serde_json::Value; +use uuid::Uuid; + +use crate::{ + cache::Cache, + db::{ + project_api_keys::ProjectApiKey, + spans::{Span, SpanType}, + trace::TraceType, + DB, + }, + evaluations::utils::LabelingQueueEntry, + routes::types::ResponseResult, + traces::span_attributes::ASSOCIATION_PROPERTIES_PREFIX, +}; + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct UploadSpan { + #[serde(default = "Uuid::new_v4")] + id: Uuid, + #[serde(default)] + name: String, + #[serde(default = "Utc::now")] + start_time: DateTime, + #[serde(default = "Utc::now")] + end_time: DateTime, + #[serde(default)] + attributes: HashMap, + #[serde(default)] + span_type: SpanType, + #[serde(default)] + input: Option, + #[serde(default)] + output: Option, + #[serde(default)] + trace_id: Option, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct UploadSpansRequest { + spans: Vec, + #[serde(default)] + queue_name: Option, +} + +#[post("/spans/upload")] +async fn upload_spans( + project_api_key: ProjectApiKey, + req: web::Json, + db: web::Data, + cache: web::Data, +) -> ResponseResult { + let db = db.into_inner(); + let req = req.into_inner(); + let project_id = project_api_key.project_id; + let queue_name = req.queue_name; + let request_spans = req.spans; + let cache = cache.into_inner(); + + let queue_id = if let Some(queue_name) = queue_name { + let Some(queue) = crate::db::labeling_queues::get_labeling_queue_by_name( + &db.pool, + &queue_name, + &project_id, + ) + .await? + else { + return Ok(HttpResponse::NotFound().body(format!("Queue not found: {}", queue_name))); + }; + Some(queue.id) + } else { + None + }; + + let mut span_ids = Vec::with_capacity(request_spans.len()); + + for request_span in request_spans { + let mut attributes = request_span.attributes; + attributes.insert( + format!("{ASSOCIATION_PROPERTIES_PREFIX}.trace_type"), + // Temporary, in order not to show spans in the default trace view + serde_json::to_value(TraceType::EVENT).unwrap(), + ); + let mut span = Span { + span_id: request_span.id, + trace_id: request_span.trace_id.unwrap_or(Uuid::new_v4()), + parent_span_id: None, + name: request_span.name, + start_time: request_span.start_time, + end_time: request_span.end_time, + attributes: serde_json::to_value(attributes).unwrap(), + span_type: request_span.span_type, + input: request_span.input, + output: request_span.output, + events: None, + labels: None, + }; + + let span_usage = crate::traces::utils::get_llm_usage_for_span( + &mut span.get_attributes(), + db.clone(), + cache.clone(), + ) + .await; + + crate::traces::utils::record_span_to_db(db.clone(), &span_usage, &project_id, &mut span) + .await?; + span_ids.push(span.span_id); + } + if let Some(queue_id) = queue_id { + let queue_entries = span_ids + .iter() + .map(|span_id| LabelingQueueEntry { + span_id: span_id.clone(), + action: Value::Null, + }) + .collect::>(); + crate::db::labeling_queues::push_to_labeling_queue(&db.pool, &queue_id, &queue_entries) + .await?; + } + + Ok(HttpResponse::Ok().body("Spans uploaded successfully")) +} diff --git a/app-server/src/main.rs b/app-server/src/main.rs index 61abdb4..36ea651 100644 --- a/app-server/src/main.rs +++ b/app-server/src/main.rs @@ -407,7 +407,8 @@ fn main() -> anyhow::Result<()> { .service(api::v1::datasets::get_datapoints) .service(api::v1::evaluations::create_evaluation) .service(api::v1::metrics::process_metrics) - .service(api::v1::semantic_search::semantic_search), + .service(api::v1::semantic_search::semantic_search) + .service(api::v1::spans::upload_spans), ) // Scopes with generic auth .service( From a35550f74602360018ed828ec70dea8e61ac0166 Mon Sep 17 00:00:00 2001 From: Din Date: Mon, 6 Jan 2025 13:36:32 +0500 Subject: [PATCH 2/8] rename the api to queues/upload and field to items --- app-server/src/api/v1/mod.rs | 2 +- app-server/src/api/v1/{spans.rs => queues.rs} | 87 ++++++++----------- app-server/src/main.rs | 2 +- 3 files changed, 36 insertions(+), 55 deletions(-) rename app-server/src/api/v1/{spans.rs => queues.rs} (51%) diff --git a/app-server/src/api/v1/mod.rs b/app-server/src/api/v1/mod.rs index c055fd3..a1fef81 100644 --- a/app-server/src/api/v1/mod.rs +++ b/app-server/src/api/v1/mod.rs @@ -2,6 +2,6 @@ pub mod datasets; pub mod evaluations; pub mod metrics; pub mod pipelines; +pub mod queues; pub mod semantic_search; -pub mod spans; pub mod traces; diff --git a/app-server/src/api/v1/spans.rs b/app-server/src/api/v1/queues.rs similarity index 51% rename from app-server/src/api/v1/spans.rs rename to app-server/src/api/v1/queues.rs index 2eca1a6..a9b0a7b 100644 --- a/app-server/src/api/v1/spans.rs +++ b/app-server/src/api/v1/queues.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use actix_web::{post, web, HttpResponse}; -use chrono::{DateTime, Utc}; +use chrono::Utc; use serde::Deserialize; use serde_json::Value; use uuid::Uuid; @@ -21,39 +21,30 @@ use crate::{ #[derive(Deserialize)] #[serde(rename_all = "camelCase")] -struct UploadSpan { +struct UploadItem { #[serde(default = "Uuid::new_v4")] id: Uuid, #[serde(default)] name: String, - #[serde(default = "Utc::now")] - start_time: DateTime, - #[serde(default = "Utc::now")] - end_time: DateTime, #[serde(default)] attributes: HashMap, #[serde(default)] - span_type: SpanType, - #[serde(default)] input: Option, #[serde(default)] output: Option, - #[serde(default)] - trace_id: Option, } #[derive(Deserialize)] #[serde(rename_all = "camelCase")] -struct UploadSpansRequest { - spans: Vec, - #[serde(default)] - queue_name: Option, +struct UploadToQueueRequest { + items: Vec, + queue_name: String, } -#[post("/spans/upload")] -async fn upload_spans( +#[post("/queues/upload")] +async fn upload_to_queue( project_api_key: ProjectApiKey, - req: web::Json, + req: web::Json, db: web::Data, cache: web::Data, ) -> ResponseResult { @@ -61,44 +52,36 @@ async fn upload_spans( let req = req.into_inner(); let project_id = project_api_key.project_id; let queue_name = req.queue_name; - let request_spans = req.spans; + let request_items = req.items; let cache = cache.into_inner(); - let queue_id = if let Some(queue_name) = queue_name { - let Some(queue) = crate::db::labeling_queues::get_labeling_queue_by_name( - &db.pool, - &queue_name, - &project_id, - ) - .await? - else { - return Ok(HttpResponse::NotFound().body(format!("Queue not found: {}", queue_name))); - }; - Some(queue.id) - } else { - None + let Some(queue) = + crate::db::labeling_queues::get_labeling_queue_by_name(&db.pool, &queue_name, &project_id) + .await? + else { + return Ok(HttpResponse::NotFound().body(format!("Queue not found: {}", queue_name))); }; - let mut span_ids = Vec::with_capacity(request_spans.len()); + let mut span_ids = Vec::with_capacity(request_items.len()); - for request_span in request_spans { - let mut attributes = request_span.attributes; + for request_item in request_items { + let mut attributes = request_item.attributes; attributes.insert( format!("{ASSOCIATION_PROPERTIES_PREFIX}.trace_type"), // Temporary, in order not to show spans in the default trace view serde_json::to_value(TraceType::EVENT).unwrap(), ); let mut span = Span { - span_id: request_span.id, - trace_id: request_span.trace_id.unwrap_or(Uuid::new_v4()), + span_id: request_item.id, + trace_id: Uuid::new_v4(), parent_span_id: None, - name: request_span.name, - start_time: request_span.start_time, - end_time: request_span.end_time, + name: request_item.name, + start_time: Utc::now(), + end_time: Utc::now(), attributes: serde_json::to_value(attributes).unwrap(), - span_type: request_span.span_type, - input: request_span.input, - output: request_span.output, + span_type: SpanType::DEFAULT, + input: request_item.input, + output: request_item.output, events: None, labels: None, }; @@ -114,17 +97,15 @@ async fn upload_spans( .await?; span_ids.push(span.span_id); } - if let Some(queue_id) = queue_id { - let queue_entries = span_ids - .iter() - .map(|span_id| LabelingQueueEntry { - span_id: span_id.clone(), - action: Value::Null, - }) - .collect::>(); - crate::db::labeling_queues::push_to_labeling_queue(&db.pool, &queue_id, &queue_entries) - .await?; - } + + let queue_entries = span_ids + .iter() + .map(|span_id| LabelingQueueEntry { + span_id: span_id.clone(), + action: Value::Null, + }) + .collect::>(); + crate::db::labeling_queues::push_to_labeling_queue(&db.pool, &queue.id, &queue_entries).await?; Ok(HttpResponse::Ok().body("Spans uploaded successfully")) } diff --git a/app-server/src/main.rs b/app-server/src/main.rs index 36ea651..6c96787 100644 --- a/app-server/src/main.rs +++ b/app-server/src/main.rs @@ -408,7 +408,7 @@ fn main() -> anyhow::Result<()> { .service(api::v1::evaluations::create_evaluation) .service(api::v1::metrics::process_metrics) .service(api::v1::semantic_search::semantic_search) - .service(api::v1::spans::upload_spans), + .service(api::v1::queues::upload_to_queue), ) // Scopes with generic auth .service( From 0774afcecdfc6f3d400c8db4e10d9ba1894f88d7 Mon Sep 17 00:00:00 2001 From: Din Date: Mon, 6 Jan 2025 17:28:37 +0500 Subject: [PATCH 3/8] minor update in queues --- app-server/src/api/v1/queues.rs | 2 +- .../[projectId]/queues/[queueId]/remove/route.ts | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/app-server/src/api/v1/queues.rs b/app-server/src/api/v1/queues.rs index a9b0a7b..304dbe4 100644 --- a/app-server/src/api/v1/queues.rs +++ b/app-server/src/api/v1/queues.rs @@ -107,5 +107,5 @@ async fn upload_to_queue( .collect::>(); crate::db::labeling_queues::push_to_labeling_queue(&db.pool, &queue.id, &queue_entries).await?; - Ok(HttpResponse::Ok().body("Spans uploaded successfully")) + Ok(HttpResponse::Ok().body("Items uploaded successfully")) } diff --git a/frontend/app/api/projects/[projectId]/queues/[queueId]/remove/route.ts b/frontend/app/api/projects/[projectId]/queues/[queueId]/remove/route.ts index 87ae472..bc5f262 100644 --- a/frontend/app/api/projects/[projectId]/queues/[queueId]/remove/route.ts +++ b/frontend/app/api/projects/[projectId]/queues/[queueId]/remove/route.ts @@ -30,11 +30,11 @@ const removeQueueItemSchema = z.object({ id: z.string() }), reasoning: z.string().optional().nullable() - })), - action: z.object({ + })).nonempty(), + action: z.null().or(z.object({ resultId: z.string().optional(), datasetId: z.string().optional() - }) + })) }); // remove an item from the queue @@ -70,7 +70,7 @@ export async function POST(request: Request, { params }: { params: { projectId: } }).returning(); - if (action.resultId) { + if (action?.resultId) { const resultId = action.resultId; const userName = user.name ? ` (${user.name})` : ''; @@ -119,8 +119,7 @@ export async function POST(request: Request, { params }: { params: { projectId: } } - if (action.datasetId) { - + if (action?.datasetId) { const span = await db.query.spans.findFirst({ where: and(eq(spans.spanId, spanId), eq(spans.projectId, params.projectId)) }); @@ -135,7 +134,7 @@ export async function POST(request: Request, { params }: { params: { projectId: metadata: { spanId: span.spanId, }, - datasetId: action.datasetId, + datasetId: action?.datasetId, }).returning(); await db.insert(datapointToSpan).values({ From 275c1a89f3796da0d0287c9bb8d73331bf9d0e98 Mon Sep 17 00:00:00 2001 From: Din Date: Mon, 6 Jan 2025 17:29:45 +0500 Subject: [PATCH 4/8] update Cargo.lock version --- app-server/Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app-server/Cargo.lock b/app-server/Cargo.lock index 7e90a7a..2131496 100644 --- a/app-server/Cargo.lock +++ b/app-server/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "actix-codec" From b5d08f73a795fb554ddaf0926f2ee23280796c97 Mon Sep 17 00:00:00 2001 From: Din Date: Mon, 6 Jan 2025 18:34:55 +0500 Subject: [PATCH 5/8] fix: allow removing item from queue if no labels --- .../queues/[queueId]/remove/route.ts | 20 ++++++++++--------- .../api/projects/[projectId]/queues/route.ts | 2 -- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/frontend/app/api/projects/[projectId]/queues/[queueId]/remove/route.ts b/frontend/app/api/projects/[projectId]/queues/[queueId]/remove/route.ts index bc5f262..264b8d8 100644 --- a/frontend/app/api/projects/[projectId]/queues/[queueId]/remove/route.ts +++ b/frontend/app/api/projects/[projectId]/queues/[queueId]/remove/route.ts @@ -30,7 +30,7 @@ const removeQueueItemSchema = z.object({ id: z.string() }), reasoning: z.string().optional().nullable() - })).nonempty(), + })), action: z.null().or(z.object({ resultId: z.string().optional(), datasetId: z.string().optional() @@ -61,14 +61,16 @@ export async function POST(request: Request, { params }: { params: { projectId: labelSource: "MANUAL" as const, })); - const insertedLabels = await db.insert(labels).values(newLabels).onConflictDoUpdate({ - target: [labels.spanId, labels.classId, labels.userId], - set: { - value: sql`excluded.value`, - labelSource: sql`excluded.label_source`, - reasoning: sql`COALESCE(excluded.reasoning, labels.reasoning)`, - } - }).returning(); + const insertedLabels = newLabels.length > 0 + ? await db.insert(labels).values(newLabels).onConflictDoUpdate({ + target: [labels.spanId, labels.classId, labels.userId], + set: { + value: sql`excluded.value`, + labelSource: sql`excluded.label_source`, + reasoning: sql`COALESCE(excluded.reasoning, labels.reasoning)`, + } + }).returning() + : []; if (action?.resultId) { const resultId = action.resultId; diff --git a/frontend/app/api/projects/[projectId]/queues/route.ts b/frontend/app/api/projects/[projectId]/queues/route.ts index 3324a92..82f8139 100644 --- a/frontend/app/api/projects/[projectId]/queues/route.ts +++ b/frontend/app/api/projects/[projectId]/queues/route.ts @@ -11,8 +11,6 @@ export async function POST( ): Promise { const projectId = params.projectId; - - const body = await req.json(); const { name } = body; From 10621ee9303e08e2f084e803ebbd8c31d23a98db Mon Sep 17 00:00:00 2001 From: Din Date: Tue, 7 Jan 2025 17:58:53 +0500 Subject: [PATCH 6/8] update route name from upload to push --- app-server/src/api/v1/queues.rs | 4 ++-- app-server/src/main.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/app-server/src/api/v1/queues.rs b/app-server/src/api/v1/queues.rs index 304dbe4..4ab8627 100644 --- a/app-server/src/api/v1/queues.rs +++ b/app-server/src/api/v1/queues.rs @@ -41,8 +41,8 @@ struct UploadToQueueRequest { queue_name: String, } -#[post("/queues/upload")] -async fn upload_to_queue( +#[post("/queues/push")] +async fn push_to_queue( project_api_key: ProjectApiKey, req: web::Json, db: web::Data, diff --git a/app-server/src/main.rs b/app-server/src/main.rs index 0f8e31e..97ae0d8 100644 --- a/app-server/src/main.rs +++ b/app-server/src/main.rs @@ -428,7 +428,7 @@ fn main() -> anyhow::Result<()> { .service(api::v1::evaluations::create_evaluation) .service(api::v1::metrics::process_metrics) .service(api::v1::semantic_search::semantic_search) - .service(api::v1::queues::upload_to_queue) + .service(api::v1::queues::push_to_queue) .service(api::v1::machine_manager::start_machine) .service(api::v1::machine_manager::terminate_machine) .service(api::v1::machine_manager::execute_computer_action), From 2e20af55eeb1d7361e0486fdda552c2abca4e582 Mon Sep 17 00:00:00 2001 From: Din Date: Wed, 8 Jan 2025 10:49:23 +0500 Subject: [PATCH 7/8] update the request struct names as well --- app-server/src/api/v1/queues.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/app-server/src/api/v1/queues.rs b/app-server/src/api/v1/queues.rs index 4ab8627..1b4b24b 100644 --- a/app-server/src/api/v1/queues.rs +++ b/app-server/src/api/v1/queues.rs @@ -21,7 +21,7 @@ use crate::{ #[derive(Deserialize)] #[serde(rename_all = "camelCase")] -struct UploadItem { +struct PushItem { #[serde(default = "Uuid::new_v4")] id: Uuid, #[serde(default)] @@ -36,15 +36,15 @@ struct UploadItem { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] -struct UploadToQueueRequest { - items: Vec, +struct PushToQueueRequest { + items: Vec, queue_name: String, } #[post("/queues/push")] async fn push_to_queue( project_api_key: ProjectApiKey, - req: web::Json, + req: web::Json, db: web::Data, cache: web::Data, ) -> ResponseResult { From c0984e4a4bf787f96acf484594d55b6e899ef865 Mon Sep 17 00:00:00 2001 From: Din Date: Wed, 8 Jan 2025 16:17:57 +0500 Subject: [PATCH 8/8] add usage stats and limits to the api --- app-server/src/api/v1/queues.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/app-server/src/api/v1/queues.rs b/app-server/src/api/v1/queues.rs index 1b4b24b..903f5c0 100644 --- a/app-server/src/api/v1/queues.rs +++ b/app-server/src/api/v1/queues.rs @@ -15,6 +15,7 @@ use crate::{ DB, }, evaluations::utils::LabelingQueueEntry, + features::{is_feature_enabled, Feature}, routes::types::ResponseResult, traces::span_attributes::ASSOCIATION_PROPERTIES_PREFIX, }; @@ -63,6 +64,28 @@ async fn push_to_queue( }; let mut span_ids = Vec::with_capacity(request_items.len()); + let num_spans = request_items.len(); + crate::db::stats::add_spans_and_events_to_project_usage_stats( + &db.pool, + &project_id, + num_spans as i64, + 0, + ) + .await?; + if is_feature_enabled(Feature::UsageLimit) { + if let Ok(limits_exceeded) = + crate::traces::limits::update_workspace_limit_exceeded_by_project_id( + db.clone(), + cache.clone(), + project_id, + ) + .await + { + if limits_exceeded.spans { + return Ok(HttpResponse::TooManyRequests().body("Workspace span limit exceeded")); + } + } + } for request_item in request_items { let mut attributes = request_item.attributes;