Skip to content

Commit

Permalink
api for uploading spans (#302)
Browse files Browse the repository at this point in the history
* wip: api for uploading spans

* rename the api to queues/upload and field to items

* minor update in queues

* update Cargo.lock version

* fix: allow removing item from queue if no labels

* update route name from upload to push

* update the request struct names as well

* add usage stats and limits to the api
  • Loading branch information
dinmukhamedm authored Jan 8, 2025
1 parent 43e275d commit 859093d
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 18 deletions.
1 change: 1 addition & 0 deletions app-server/src/api/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ pub mod evaluations;
pub mod machine_manager;
pub mod metrics;
pub mod pipelines;
pub mod queues;
pub mod semantic_search;
pub mod traces;
134 changes: 134 additions & 0 deletions app-server/src/api/v1/queues.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use std::collections::HashMap;

use actix_web::{post, web, HttpResponse};
use chrono::Utc;
use serde::Deserialize;
use serde_json::Value;
use uuid::Uuid;

use crate::{
cache::Cache,
db::{
project_api_keys::ProjectApiKey,
spans::{Span, SpanType},
trace::TraceType,
DB,
},
evaluations::utils::LabelingQueueEntry,
features::{is_feature_enabled, Feature},
routes::types::ResponseResult,
traces::span_attributes::ASSOCIATION_PROPERTIES_PREFIX,
};

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct PushItem {
#[serde(default = "Uuid::new_v4")]
id: Uuid,
#[serde(default)]
name: String,
#[serde(default)]
attributes: HashMap<String, Value>,
#[serde(default)]
input: Option<Value>,
#[serde(default)]
output: Option<Value>,
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct PushToQueueRequest {
items: Vec<PushItem>,
queue_name: String,
}

#[post("/queues/push")]
async fn push_to_queue(
project_api_key: ProjectApiKey,
req: web::Json<PushToQueueRequest>,
db: web::Data<DB>,
cache: web::Data<Cache>,
) -> ResponseResult {
let db = db.into_inner();
let req = req.into_inner();
let project_id = project_api_key.project_id;
let queue_name = req.queue_name;
let request_items = req.items;
let cache = cache.into_inner();

let Some(queue) =
crate::db::labeling_queues::get_labeling_queue_by_name(&db.pool, &queue_name, &project_id)
.await?
else {
return Ok(HttpResponse::NotFound().body(format!("Queue not found: {}", queue_name)));
};

let mut span_ids = Vec::with_capacity(request_items.len());
let num_spans = request_items.len();
crate::db::stats::add_spans_and_events_to_project_usage_stats(
&db.pool,
&project_id,
num_spans as i64,
0,
)
.await?;
if is_feature_enabled(Feature::UsageLimit) {
if let Ok(limits_exceeded) =
crate::traces::limits::update_workspace_limit_exceeded_by_project_id(
db.clone(),
cache.clone(),
project_id,
)
.await
{
if limits_exceeded.spans {
return Ok(HttpResponse::TooManyRequests().body("Workspace span limit exceeded"));
}
}
}

for request_item in request_items {
let mut attributes = request_item.attributes;
attributes.insert(
format!("{ASSOCIATION_PROPERTIES_PREFIX}.trace_type"),
// Temporary, in order not to show spans in the default trace view
serde_json::to_value(TraceType::EVENT).unwrap(),
);
let mut span = Span {
span_id: request_item.id,
trace_id: Uuid::new_v4(),
parent_span_id: None,
name: request_item.name,
start_time: Utc::now(),
end_time: Utc::now(),
attributes: serde_json::to_value(attributes).unwrap(),
span_type: SpanType::DEFAULT,
input: request_item.input,
output: request_item.output,
events: None,
labels: None,
};

let span_usage = crate::traces::utils::get_llm_usage_for_span(
&mut span.get_attributes(),
db.clone(),
cache.clone(),
)
.await;

crate::traces::utils::record_span_to_db(db.clone(), &span_usage, &project_id, &mut span)
.await?;
span_ids.push(span.span_id);
}

let queue_entries = span_ids
.iter()
.map(|span_id| LabelingQueueEntry {
span_id: span_id.clone(),
action: Value::Null,
})
.collect::<Vec<_>>();
crate::db::labeling_queues::push_to_labeling_queue(&db.pool, &queue.id, &queue_entries).await?;

Ok(HttpResponse::Ok().body("Items uploaded successfully"))
}
4 changes: 2 additions & 2 deletions app-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,10 @@ fn main() -> anyhow::Result<()> {
.service(api::v1::evaluations::create_evaluation)
.service(api::v1::metrics::process_metrics)
.service(api::v1::semantic_search::semantic_search)
.service(api::v1::queues::push_to_queue)
.service(api::v1::machine_manager::start_machine)
.service(api::v1::machine_manager::terminate_machine)
.service(api::v1::machine_manager::execute_computer_action)
.app_data(PayloadConfig::new(10 * 1024 * 1024)),
.service(api::v1::machine_manager::execute_computer_action),
)
// Scopes with generic auth
.service(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ const removeQueueItemSchema = z.object({
}),
reasoning: z.string().optional().nullable()
})),
action: z.object({
action: z.null().or(z.object({
resultId: z.string().optional(),
datasetId: z.string().optional()
})
}))
});

// remove an item from the queue
Expand All @@ -61,16 +61,18 @@ export async function POST(request: Request, { params }: { params: { projectId:
labelSource: "MANUAL" as const,
}));

const insertedLabels = await db.insert(labels).values(newLabels).onConflictDoUpdate({
target: [labels.spanId, labels.classId, labels.userId],
set: {
value: sql`excluded.value`,
labelSource: sql`excluded.label_source`,
reasoning: sql`COALESCE(excluded.reasoning, labels.reasoning)`,
}
}).returning();
const insertedLabels = newLabels.length > 0
? await db.insert(labels).values(newLabels).onConflictDoUpdate({
target: [labels.spanId, labels.classId, labels.userId],
set: {
value: sql`excluded.value`,
labelSource: sql`excluded.label_source`,
reasoning: sql`COALESCE(excluded.reasoning, labels.reasoning)`,
}
}).returning()
: [];

if (action.resultId) {
if (action?.resultId) {
const resultId = action.resultId;
const userName = user.name ? ` (${user.name})` : '';

Expand Down Expand Up @@ -119,8 +121,7 @@ export async function POST(request: Request, { params }: { params: { projectId:
}
}

if (action.datasetId) {

if (action?.datasetId) {
const span = await db.query.spans.findFirst({
where: and(eq(spans.spanId, spanId), eq(spans.projectId, params.projectId))
});
Expand All @@ -135,7 +136,7 @@ export async function POST(request: Request, { params }: { params: { projectId:
metadata: {
spanId: span.spanId,
},
datasetId: action.datasetId,
datasetId: action?.datasetId,
}).returning();

await db.insert(datapointToSpan).values({
Expand Down
2 changes: 0 additions & 2 deletions frontend/app/api/projects/[projectId]/queues/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ export async function POST(
): Promise<Response> {
const projectId = params.projectId;



const body = await req.json();
const { name } = body;

Expand Down

0 comments on commit 859093d

Please sign in to comment.