Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store label values sent from association properties #198

Merged
merged 2 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 3 additions & 46 deletions app-server/src/db/labels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub enum LabelType {
pub enum LabelSource {
MANUAL,
AUTO,
CODE,
}

#[derive(sqlx::Type, Serialize, Clone, PartialEq)]
Expand All @@ -34,7 +35,7 @@ pub struct LabelClass {
pub name: String,
pub project_id: Uuid,
pub label_type: LabelType,
pub value_map: Value, // Vec<Value>
pub value_map: Value, // HashMap<String, f64>
pub description: Option<String>,
pub evaluator_runnable_graph: Option<Value>,
}
Expand Down Expand Up @@ -111,56 +112,12 @@ pub async fn get_label_classes_by_project_id(
query.push_bind(label_class_ids);
query.push(")");
}
query.push(" ORDER BY created_at DESC");
let label_classes = query.build_query_as::<LabelClass>().fetch_all(pool).await?;

Ok(label_classes)
}

pub async fn create_label_class(
pool: &PgPool,
id: Uuid,
name: String,
project_id: Uuid,
label_type: &LabelType,
value_map: Vec<Value>,
description: Option<String>,
evaluator_runnable_graph: Option<Value>,
) -> Result<LabelClass> {
let label_class = sqlx::query_as::<_, LabelClass>(
"INSERT INTO label_classes (
id,
name,
project_id,
label_type,
value_map,
description,
evaluator_runnable_graph
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING
id,
created_at,
name,
project_id,
label_type,
value_map,
description,
evaluator_runnable_graph
",
)
.bind(id)
.bind(name)
.bind(project_id)
.bind(label_type)
.bind(serde_json::to_value(value_map).unwrap())
.bind(description)
.bind(evaluator_runnable_graph)
.fetch_one(pool)
.await?;

Ok(label_class)
}

pub async fn get_label_class(
pool: &PgPool,
project_id: Uuid,
Expand Down
18 changes: 18 additions & 0 deletions app-server/src/db/spans.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::str::FromStr;

use anyhow::Result;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
Expand All @@ -19,6 +21,22 @@ pub enum SpanType {
EVALUATION,
}

impl FromStr for SpanType {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_uppercase().trim() {
"DEFAULT" | "SPAN" => Ok(SpanType::DEFAULT),
"LLM" => Ok(SpanType::LLM),
"PIPELINE" => Ok(SpanType::PIPELINE),
"EXECUTOR" => Ok(SpanType::EXECUTOR),
"EVALUATOR" => Ok(SpanType::EVALUATOR),
"EVALUATION" => Ok(SpanType::EVALUATION),
_ => Err(anyhow::anyhow!("Invalid span type: {}", s)),
}
}
}

#[derive(Deserialize, Serialize, Clone, Debug, Default, FromRow)]
#[serde(rename_all = "camelCase")]
pub struct Span {
Expand Down
3 changes: 3 additions & 0 deletions app-server/src/db/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ fn add_filters_to_traces_query(query: &mut QueryBuilder<Postgres>, filters: &Opt
} else if filter.filter_column == "trace_type" {
query.push_bind(filter_value_str);
query.push("::trace_type");
} else if filter.filter_column == "top_span_type" {
dinmukhamedm marked this conversation as resolved.
Show resolved Hide resolved
let span_type = filter_value_str.parse::<SpanType>().unwrap_or_default();
query.push_bind(span_type);
} else {
query.push_bind(filter_value_str);
}
Expand Down
17 changes: 15 additions & 2 deletions app-server/src/traces/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use crate::{
pipeline::runner::PipelineRunner,
semantic_search::SemanticSearch,
storage::Storage,
traces::{evaluators::run_evaluator, utils::record_span_to_db},
traces::{
evaluators::run_evaluator,
utils::{record_labels_to_db, record_span_to_db},
},
};

pub async fn process_queue_spans<T: Storage + ?Sized>(
Expand Down Expand Up @@ -136,7 +139,6 @@ async fn inner_process_queue_spans<T: Storage + ?Sized>(
}
// ignore the span if the limit is exceeded
Ok(limits_exceeded) => {
// TODO: do the same for events
if limits_exceeded.spans {
let _ = delivery
.ack(BasicAckOptions::default())
Expand Down Expand Up @@ -191,6 +193,17 @@ async fn inner_process_queue_spans<T: Storage + ?Sized>(
.map_err(|e| log::error!("Failed to ack RabbitMQ delivery: {:?}", e));
}

if let Err(e) =
record_labels_to_db(db.clone(), &span, &rabbitmq_span_message.project_id).await
{
log::error!(
"Failed to record labels to DB. span_id [{}], project_id [{}]: {:?}",
span.span_id,
rabbitmq_span_message.project_id,
e
);
}

let ch_span = CHSpan::from_db_span(&span, span_usage, rabbitmq_span_message.project_id);
// TODO: Queue batches and send them every 1-2 seconds
let insert_span_res = ch::spans::insert_span(clickhouse.clone(), &ch_span).await;
Expand Down
6 changes: 0 additions & 6 deletions app-server/src/traces/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ use crate::{
#[derive(Clone)]
pub struct WorkspaceLimitsExceeded {
pub spans: bool,
// TODO: for now it does not make sense to count events within the Otel span on
// the MQ producer side, so events limit is not used.
pub _events: bool,
}

pub async fn get_workspace_limit_exceeded_by_project_id(
Expand All @@ -35,8 +32,6 @@ pub async fn get_workspace_limit_exceeded_by_project_id(
let workspace_limits_exceeded = WorkspaceLimitsExceeded {
spans: workspace_stats.spans_this_month >= workspace_stats.spans_limit
&& is_free_tier,
_events: workspace_stats.events_this_month >= workspace_stats.events_limit
&& is_free_tier,
};
let _ = cache
.insert::<WorkspaceLimitsExceeded>(
Expand Down Expand Up @@ -71,7 +66,6 @@ pub async fn update_workspace_limit_exceeded_by_workspace_id(
let is_free_tier = workspace_stats.tier_name.to_lowercase().trim() == "free";
let workspace_limits_exceeded = WorkspaceLimitsExceeded {
spans: workspace_stats.spans_this_month >= workspace_stats.spans_limit && is_free_tier,
_events: workspace_stats.events_this_month >= workspace_stats.events_limit && is_free_tier,
};
let _ = cache
.insert::<WorkspaceLimitsExceeded>(workspace_id.to_string(), &workspace_limits_exceeded)
Expand Down
14 changes: 14 additions & 0 deletions app-server/src/traces/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,20 @@ impl SpanAttributes {
.insert(SPAN_PATH.to_string(), Value::String(span_name.to_string()));
}
}

pub fn get_labels(&self) -> HashMap<String, Value> {
let mut res = HashMap::new();
let label_prefix = format!("{ASSOCIATION_PROPERTIES_PREFIX}label.");
for (key, value) in self.attributes.iter() {
if key.starts_with(&label_prefix) {
res.insert(
key.strip_prefix(&label_prefix).unwrap().to_string(),
value.clone(),
);
}
}
res
}
}

impl Span {
Expand Down
43 changes: 42 additions & 1 deletion app-server/src/traces/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use serde_json::Value;
use uuid::Uuid;
Expand All @@ -7,6 +7,7 @@ use crate::{
cache::Cache,
db::{
self,
labels::LabelSource,
spans::{Span, SpanType},
trace, DB,
},
Expand Down Expand Up @@ -128,3 +129,43 @@ pub async fn record_span_to_db(

Ok(())
}

pub async fn record_labels_to_db(
db: Arc<DB>,
span: &Span,
project_id: &Uuid,
) -> anyhow::Result<()> {
let project_labels =
db::labels::get_label_classes_by_project_id(&db.pool, *project_id, None).await?;

let labels = span.get_attributes().get_labels();

for (label_name, label_value_key) in labels {
let label_class = project_labels.iter().find(|l| l.name == label_name);
if let Some(label_class) = label_class {
let key = match label_value_key {
Value::String(s) => s.clone(),
v => v.to_string(),
};
let value_map =
serde_json::from_value::<HashMap<String, f64>>(label_class.value_map.clone())
.unwrap_or_default();
let label_value = value_map.get(&key).cloned();
if let Some(label_value) = label_value {
db::labels::update_span_label(
&db.pool,
span.span_id,
label_value,
None,
label_class.id,
LabelSource::CODE,
None,
None,
)
.await?;
}
}
}

Ok(())
}
5 changes: 5 additions & 0 deletions frontend/app/api/projects/[projectId]/spans/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ export async function GET(
filter.column = "(attributes ->> 'gen_ai.usage.output_cost')::float8";
} else if (filter.column === "cost") {
filter.column = "(attributes ->> 'gen_ai.usage.cost')::float8";
} else if (filter.column === "span_type") {
// cast to span_type
const uppercased = filter.value.toUpperCase().trim();
filter.value = uppercased === 'SPAN' ? "'DEFAULT'" : `'${uppercased}'`;
filter.castType = "span_type";
}
return filter;
});
Expand Down
25 changes: 0 additions & 25 deletions frontend/components/pipelines/template-select.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ export default function TemplateSelect({
}: TemplateSelectProps) {
const buildTemplates = templates.filter((t) => t.displayGroup === 'build');
const evalTemplates = templates.filter((t) => t.displayGroup === 'eval');
const semanticEventEvalTemplates = templates.filter(
(t) => t.displayGroup === 'semantic_event_eval'
);
return (
<div className={cn('flex flex-col space-y-4', className ?? '')}>
<Label>Build</Label>
Expand All @@ -45,28 +42,6 @@ export default function TemplateSelect({
))}
</div>

<Label>Semantic Event Evaluation</Label>
<div className="grid gap-4 grid-cols-1 md:grid-cols-2 lg:grid-cols-3 mt-2">
{semanticEventEvalTemplates.map((t) => (
<Card
className={cn(
'hover:bg-secondary p-1',
t.id === templateId ? 'bg-secondary' : ''
)}
key={t.id}
onClick={() => setTemplateId(t.id)}
>
<div className="p-4 space-y-1 cursor-pointer">
<h4 className="cursor-pointer font-semibold truncate max-w-50">
{' '}
{t.name}{' '}
</h4>
<p className="text-gray-600 text-[12px]">{t.description}</p>
</div>
</Card>
))}
</div>

<Label>Evaluations</Label>
<div className="grid gap-4 grid-cols-1 md:grid-cols-2 lg:grid-cols-3 mt-2">
{evalTemplates.map((t) => (
Expand Down
20 changes: 20 additions & 0 deletions frontend/lib/db/migrations/0005_misty_leper_queen.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
ALTER TYPE "public"."label_source" ADD VALUE 'CODE';--> statement-breakpoint
CREATE TABLE IF NOT EXISTS "playgrounds" (
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL,
"created_at" timestamp with time zone DEFAULT now() NOT NULL,
"name" text NOT NULL,
"project_id" uuid NOT NULL
);
--> statement-breakpoint
DROP INDEX IF EXISTS "spans_parent_span_id_project_id_start_time_end_time_idx";--> statement-breakpoint
DROP INDEX IF EXISTS "spans_project_id_idx";--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "playgrounds" ADD CONSTRAINT "playgrounds_project_id_fkey" FOREIGN KEY ("project_id") REFERENCES "public"."projects"("id") ON DELETE cascade ON UPDATE cascade;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
--> statement-breakpoint
CREATE INDEX IF NOT EXISTS "api_keys_user_id_idx" ON "api_keys" USING btree ("user_id");--> statement-breakpoint
CREATE INDEX IF NOT EXISTS "spans_root_project_id_start_time_end_time_trace_id_idx" ON "spans" USING btree ("project_id","start_time","end_time","trace_id") WHERE (parent_span_id IS NULL);--> statement-breakpoint
CREATE INDEX IF NOT EXISTS "traces_project_id_trace_type_start_time_end_time_idx" ON "traces" USING btree ("project_id","start_time","end_time") WHERE ((trace_type = 'DEFAULT'::trace_type) AND (start_time IS NOT NULL) AND (end_time IS NOT NULL));--> statement-breakpoint
CREATE INDEX IF NOT EXISTS "spans_project_id_idx" ON "spans" USING hash ("project_id");
Loading