Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix realtime and manual spans #221

Merged
merged 4 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
865 changes: 571 additions & 294 deletions app-server/Cargo.lock

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions app-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ tokio-stream = { version = "0.1", features = ["net"] }
futures = "0.3"
rayon = "1"
enum_dispatch = "0.3.12"
reqwest = { version = "0.12.7", default-features = false, features = ["rustls-tls", "json", "stream", "multipart"] }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "stream", "multipart"] }
serde = "1.0"
serde_json = "1.0.105"
log = "0.4.20"
Expand All @@ -35,11 +35,10 @@ actix-multipart = "0.6.1"
actix-web-httpauth = "0.8.1"
rand = "0.8.5"
itertools = "0.11.0"
unicode-segmentation = "1.10.1"
chrono = { version = "0.4.31", features = ["serde"] }
moka = { version = "0.12.1", features = ["sync", "future"] }
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "uuid", "json", "chrono", "bigdecimal"] }
thiserror = "1.0.56"
thiserror = "2"
json_value_merge = "2.0.0"
serde-jsonlines = "0.5.0"
regex = "1.10.3"
Expand All @@ -55,7 +54,6 @@ handlebars_misc_helpers = { version = "0.16.3", features = ["json"] }
aws-sdk-bedrockruntime = "1.37.0"
aws-config = "1.5.5"
aws-credential-types = "1.2.0"
backoff = { version = "0.4.0", features = ["tokio"] }
lmnr-baml = { git = "https://github.com/lmnr-ai/lmnr-baml", branch = "rust" }
lapin = "2.5.0"
bytes = "1.7.1"
Expand Down
11 changes: 2 additions & 9 deletions app-server/src/db/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ impl FromStr for SpanType {
#[derive(Deserialize, Serialize, Clone, Debug, Default, FromRow)]
#[serde(rename_all = "camelCase")]
pub struct Span {
pub version: String,
pub span_id: Uuid,
pub trace_id: Uuid,
pub parent_span_id: Option<Uuid>,
Expand Down Expand Up @@ -78,8 +77,7 @@ pub async fn record_span(pool: &PgPool, span: &Span, project_id: &Uuid) -> Resul
};
sqlx::query(
"INSERT INTO spans
(version,
span_id,
(span_id,
trace_id,
parent_span_id,
start_time,
Expand All @@ -106,10 +104,8 @@ pub async fn record_span(pool: &PgPool, span: &Span, project_id: &Uuid) -> Resul
$10,
$11,
$12,
$13,
$14)
$13)
ON CONFLICT (span_id, project_id) DO UPDATE SET
version = EXCLUDED.version,
trace_id = EXCLUDED.trace_id,
parent_span_id = EXCLUDED.parent_span_id,
start_time = EXCLUDED.start_time,
Expand All @@ -123,7 +119,6 @@ pub async fn record_span(pool: &PgPool, span: &Span, project_id: &Uuid) -> Resul
output_preview = EXCLUDED.output_preview
",
)
.bind(&span.version)
.bind(&span.span_id)
.bind(&span.trace_id)
.bind(&span.parent_span_id as &Option<Uuid>)
Expand Down Expand Up @@ -196,7 +191,6 @@ pub async fn get_trace_spans(
spans.span_id,
spans.start_time,
spans.end_time,
spans.version,
spans.trace_id,
spans.input,
spans.output,
Expand Down Expand Up @@ -245,7 +239,6 @@ pub async fn get_span(pool: &PgPool, id: Uuid, project_id: Uuid) -> Result<Span>
span_id,
start_time,
end_time,
version,
trace_id,
parent_span_id,
name,
Expand Down
72 changes: 15 additions & 57 deletions app-server/src/db/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ use super::{
utils::add_date_range_to_query,
};

pub const DEFAULT_VERSION: &str = "0.1.0";

/// Helper struct to pass current trace info, if exists, if pipeline is called from remote trace context
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -45,12 +43,6 @@ pub struct Trace {
start_time: Option<DateTime<Utc>>,
#[serde(default)]
end_time: Option<DateTime<Utc>>,
// Laminar trace format's version
version: String,
// Laminar customers' release version
release: Option<String>,
// User id of Laminar customers' user
user_id: Option<String>,
session_id: Option<String>,
metadata: Option<Value>,
input_token_count: i64,
Expand All @@ -59,7 +51,6 @@ pub struct Trace {
input_cost: f64,
output_cost: f64,
cost: f64,
success: bool,
project_id: Uuid,
}

Expand All @@ -69,12 +60,6 @@ pub struct TraceWithTopSpan {
id: Uuid,
start_time: DateTime<Utc>,
end_time: Option<DateTime<Utc>>,
// Laminar trace format's version
version: String,
// Laminar customers' release version
release: Option<String>,
// User id of Laminar customers' user
user_id: Option<String>,
session_id: Option<String>,
metadata: Option<Value>,
input_token_count: i64,
Expand All @@ -83,7 +68,6 @@ pub struct TraceWithTopSpan {
input_cost: f64,
output_cost: f64,
cost: f64,
success: bool,
project_id: Uuid,

top_span_input_preview: Option<String>,
Expand Down Expand Up @@ -114,13 +98,11 @@ pub async fn update_trace_attributes(
input_cost,
output_cost,
cost,
success,
start_time,
end_time,
version,
session_id,
user_id,
trace_type
trace_type,
metadata
)
VALUES (
$1,
Expand All @@ -131,13 +113,11 @@ pub async fn update_trace_attributes(
COALESCE($6, 0::float8),
COALESCE($7, 0::float8),
COALESCE($8, 0::float8),
COALESCE($9, true),
$9,
$10,
$11,
$12,
$13,
$14,
COALESCE($15, 'DEFAULT'::trace_type)
COALESCE($12, 'DEFAULT'::trace_type),
$13
)
ON CONFLICT(id) DO
UPDATE
Expand All @@ -148,12 +128,11 @@ pub async fn update_trace_attributes(
input_cost = traces.input_cost + COALESCE($6, 0),
output_cost = traces.output_cost + COALESCE($7, 0),
cost = traces.cost + COALESCE($8, 0),
success = CASE WHEN $9 IS NULL THEN traces.success ELSE $9 END,
start_time = CASE WHEN traces.start_time IS NULL OR traces.start_time > $10 THEN $10 ELSE traces.start_time END,
end_time = CASE WHEN traces.end_time IS NULL OR traces.end_time < $11 THEN $11 ELSE traces.end_time END,
session_id = CASE WHEN traces.session_id IS NULL THEN $13 ELSE traces.session_id END,
user_id = CASE WHEN traces.user_id IS NULL THEN $14 ELSE traces.user_id END,
trace_type = CASE WHEN $15 IS NULL THEN traces.trace_type ELSE COALESCE($15, 'DEFAULT'::trace_type) END
start_time = CASE WHEN traces.start_time IS NULL OR traces.start_time > $9 THEN $9 ELSE traces.start_time END,
end_time = CASE WHEN traces.end_time IS NULL OR traces.end_time < $10 THEN $10 ELSE traces.end_time END,
session_id = CASE WHEN traces.session_id IS NULL THEN $11 ELSE traces.session_id END,
trace_type = CASE WHEN $12 IS NULL THEN traces.trace_type ELSE COALESCE($12, 'DEFAULT'::trace_type) END,
metadata = COALESCE($13, traces.metadata)
"
)
.bind(attributes.id)
Expand All @@ -164,13 +143,11 @@ pub async fn update_trace_attributes(
.bind(attributes.input_cost)
.bind(attributes.output_cost)
.bind(attributes.cost)
.bind(attributes.success)
.bind(attributes.start_time)
.bind(attributes.end_time)
.bind(DEFAULT_VERSION)
.bind(&attributes.session_id)
.bind(&attributes.user_id)
.bind(&attributes.trace_type)
.bind(&serde_json::to_value(&attributes.metadata).unwrap())
.execute(pool)
.await?;
Ok(())
Expand All @@ -188,9 +165,6 @@ fn add_traces_info_expression(
id,
start_time,
end_time,
version,
release,
user_id,
session_id,
metadata,
project_id,
Expand All @@ -200,15 +174,13 @@ fn add_traces_info_expression(
input_cost,
output_cost,
cost,
success,
trace_type,
top_level_spans.input_preview top_span_input_preview,
top_level_spans.output_preview top_span_output_preview,
top_level_spans.path top_span_path,
top_level_spans.name top_span_name,
top_level_spans.span_type top_span_type,
EXTRACT(EPOCH FROM (end_time - start_time)) as latency,
CASE WHEN success = true THEN 'Success' ELSE 'Failed' END status
EXTRACT(EPOCH FROM (end_time - start_time)) as latency
FROM traces
JOIN (
SELECT
Expand Down Expand Up @@ -409,9 +381,6 @@ pub async fn get_traces(
id,
start_time,
end_time,
version,
release,
user_id,
session_id,
metadata,
project_id,
Expand All @@ -421,13 +390,11 @@ pub async fn get_traces(
input_cost,
output_cost,
cost,
success,
top_span_input_preview,
top_span_output_preview,
top_span_name,
top_span_type,
top_span_path,
status
top_span_path
FROM traces_info ",
);
if let Some(search) = text_search_filter {
Expand Down Expand Up @@ -466,9 +433,6 @@ pub async fn count_traces(
id,
start_time,
end_time,
version,
release,
user_id,
session_id,
metadata,
project_id,
Expand All @@ -478,10 +442,8 @@ pub async fn count_traces(
input_cost,
output_cost,
cost,
success,
trace_type,
EXTRACT(EPOCH FROM (end_time - start_time)) as latency,
CASE WHEN success = true THEN 'Success' ELSE 'Failed' END status
EXTRACT(EPOCH FROM (end_time - start_time)) as latency
FROM traces
WHERE start_time IS NOT NULL AND end_time IS NOT NULL AND trace_type = 'DEFAULT')",
);
Expand Down Expand Up @@ -516,9 +478,6 @@ pub async fn get_single_trace(pool: &PgPool, id: Uuid) -> Result<Trace> {
id,
start_time,
end_time,
version,
release,
user_id,
session_id,
metadata,
project_id,
Expand All @@ -527,8 +486,7 @@ pub async fn get_single_trace(pool: &PgPool, id: Uuid) -> Result<Trace> {
total_token_count,
input_cost,
output_cost,
cost,
success
cost
FROM traces
WHERE id = $1
AND start_time IS NOT NULL AND end_time IS NOT NULL",
Expand Down
14 changes: 8 additions & 6 deletions app-server/src/traces/attributes.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::collections::HashMap;

use chrono::{DateTime, Utc};
use serde_json::Value;
use uuid::Uuid;

use crate::db::trace::TraceType;
Expand All @@ -16,10 +19,9 @@ pub struct TraceAttributes {
pub output_cost: Option<f64>,
/// Total costis not calculated on this struct and must be set manually
pub cost: Option<f64>,
pub success: Option<bool>,
pub session_id: Option<String>,
pub user_id: Option<String>,
pub trace_type: Option<TraceType>,
pub metadata: Option<HashMap<String, Value>>,
}

impl TraceAttributes {
Expand Down Expand Up @@ -69,11 +71,11 @@ impl TraceAttributes {
self.session_id = session_id;
}

pub fn update_user_id(&mut self, user_id: Option<String>) {
self.user_id = user_id;
}

pub fn update_trace_type(&mut self, trace_type: Option<TraceType>) {
self.trace_type = trace_type;
}

pub fn set_metadata(&mut self, metadata: Option<HashMap<String, Value>>) {
self.metadata = metadata;
}
}
Loading