diff --git a/Cargo.lock b/Cargo.lock index b434baefb901..17ca017211a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1050,6 +1050,27 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-extra" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460fc6f625a1f7705c6cf62d0d070794e94668988b1c38111baeec177c715f7b" +dependencies = [ + "axum", + "axum-core", + "bytes", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "serde", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-macros" version = "0.5.0" @@ -2695,7 +2716,7 @@ dependencies = [ "futures", "hyper 0.14.32", "hyper-rustls 0.24.2", - "hyper-timeout", + "hyper-timeout 0.4.1", "log", "pin-project", "rand 0.8.5", @@ -3271,6 +3292,11 @@ dependencies = [ "mockall", "nanoid", "once_cell", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry-stdout", + "opentelemetry_sdk", "rand 0.8.5", "regex", "reqwest 0.12.12", @@ -3289,6 +3315,7 @@ dependencies = [ "tokio-cron-scheduler", "tokio-stream", "tokio-util", + "tonic", "tracing", "tracing-subscriber", "url", @@ -3799,6 +3826,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.6.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.10" @@ -4391,7 +4431,7 @@ dependencies = [ "object_store", "permutation", "pin-project", - "prost", + "prost 0.12.6", "prost-build", "rand 0.8.5", "roaring", @@ -4450,7 +4490,7 @@ dependencies = [ "num_cpus", "object_store", "pin-project", - "prost", + "prost 0.12.6", "rand 0.8.5", "roaring", "serde_json", @@ -4484,7 +4524,7 @@ dependencies = [ "lance-core", "lazy_static", "log", - "prost", + "prost 0.12.6", "snafu", "tokio", ] @@ -4518,7 +4558,7 @@ dependencies = [ "log", "num-traits", "paste", - "prost", + "prost 0.12.6", "prost-build", "prost-types", "rand 0.8.5", @@ -4555,7 +4595,7 @@ dependencies = [ "log", "num-traits", "object_store", - "prost", + "prost 0.12.6", "prost-build", "prost-types", "roaring", @@ -4603,7 +4643,7 @@ dependencies = [ "moka", "num-traits", "object_store", - "prost", + "prost 0.12.6", "prost-build", "rand 0.8.5", "rayon", @@ -4649,7 +4689,7 @@ dependencies = [ "object_store", "path_abs", "pin-project", - "prost", + "prost 0.12.6", "prost-build", "rand 0.8.5", "shellexpand", @@ -4710,7 +4750,7 @@ dependencies = [ "lazy_static", "log", "object_store", - "prost", + "prost 0.12.6", "prost-build", "prost-types", "rand 0.8.5", @@ -5754,6 +5794,99 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.12", + "tracing", +] + +[[package]] +name = "opentelemetry-http" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" +dependencies = [ + "async-trait", + "bytes", + "http 1.2.0", + "opentelemetry", + "reqwest 0.12.12", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" +dependencies = [ + "http 1.2.0", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost 0.13.5", + "reqwest 0.12.12", + "thiserror 2.0.12", + "tokio", + "tonic", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost 0.13.5", + "tonic", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83d059a296a47436748557a353c5e6c5705b9470ef6c95cfc52c21a8814ddac2" + +[[package]] +name = "opentelemetry-stdout" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "447191061af41c3943e082ea359ab8b64ff27d6d34d30d327df309ddef1eef6f" +dependencies = [ + "chrono", + "opentelemetry", + "opentelemetry_sdk", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry", + "percent-encoding", + "rand 0.9.1", + "serde_json", + "thiserror 2.0.12", + "tokio", + "tokio-stream", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -6192,7 +6325,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.12.6", +] + +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive 0.13.5", ] [[package]] @@ -6209,7 +6352,7 @@ dependencies = [ "once_cell", "petgraph", "prettyplease", - "prost", + "prost 0.12.6", "prost-types", "regex", "syn 2.0.99", @@ -6229,13 +6372,26 @@ dependencies = [ "syn 2.0.99", ] +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn 2.0.99", +] + [[package]] name = "prost-types" version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" dependencies = [ - "prost", + "prost 0.12.6", ] [[package]] @@ -8335,6 +8491,35 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" +dependencies = [ + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "h2 0.4.8", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.6.0", + "hyper-timeout 0.5.2", + "hyper-util", + "percent-encoding", + "pin-project", + "prost 0.13.5", + "socket2 0.5.8", + "tokio", + "tokio-stream", + "tower 0.5.2", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -8359,9 +8544,12 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", + "indexmap 2.7.1", "pin-project-lite", + "slab", "sync_wrapper 1.0.2", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", diff --git a/crates/goose-cli/src/cli.rs b/crates/goose-cli/src/cli/main.rs similarity index 81% rename from crates/goose-cli/src/cli.rs rename to crates/goose-cli/src/cli/main.rs index 072613a722b8..8e0c3a08de47 100644 --- a/crates/goose-cli/src/cli.rs +++ b/crates/goose-cli/src/cli/main.rs @@ -1,15 +1,15 @@ use anyhow::Result; -use clap::{Args, Parser, Subcommand}; +use clap::{Parser, Subcommand}; use goose::config::{Config, ExtensionConfig}; +use super::{extract_identifier, parse_key_val, Identifier}; use crate::commands::bench::agent_generator; use crate::commands::configure::handle_configure; use crate::commands::info::handle_info; use crate::commands::mcp::run_server; use crate::commands::project::{handle_project_default, handle_projects_interactive}; use crate::commands::recipe::{handle_deeplink, handle_list, handle_validate}; -// Import the new handlers from commands::schedule use crate::commands::schedule::{ handle_schedule_add, handle_schedule_cron_help, handle_schedule_list, handle_schedule_remove, handle_schedule_run_now, handle_schedule_services_status, handle_schedule_services_stop, @@ -19,7 +19,6 @@ use crate::commands::session::{handle_session_list, handle_session_remove}; use crate::logging::setup_logging; use crate::recipes::extract_from_cli::extract_recipe_info_from_cli; use crate::recipes::recipe::{explain_recipe, render_recipe_as_yaml}; -use crate::session; use crate::session::{build_session, SessionBuilderConfig, SessionSettings}; use goose_bench::bench_config::BenchRunConfig; use goose_bench::runners::bench_runner::BenchRunner; @@ -29,6 +28,8 @@ use goose_bench::runners::model_runner::ModelRunner; use std::io::Read; use std::path::PathBuf; +use goose::telemetry::{CommandType, SessionType}; + #[derive(Parser)] #[command(author, version, display_name = "", about, long_about = None)] struct Cli { @@ -36,46 +37,6 @@ struct Cli { command: Option, } -#[derive(Args, Debug)] -#[group(required = false, multiple = false)] -struct Identifier { - #[arg( - short, - long, - value_name = "NAME", - help = "Name for the chat session (e.g., 'project-x')", - long_help = "Specify a name for your chat session. When used with --resume, will resume this specific session if it exists.", - alias = "id" - )] - name: Option, - - #[arg( - short, - long, - value_name = "PATH", - help = "Path for the chat session (e.g., './playground.jsonl')", - long_help = "Specify a path for your chat session. When used with --resume, will resume this specific session if it exists." - )] - path: Option, -} - -fn extract_identifier(identifier: Identifier) -> session::Identifier { - if let Some(name) = identifier.name { - session::Identifier::Name(name) - } else if let Some(path) = identifier.path { - session::Identifier::Path(path) - } else { - unreachable!() - } -} - -fn parse_key_val(s: &str) -> Result<(String, String), String> { - match s.split_once('=') { - Some((key, value)) => Ok((key.to_string(), value.to_string())), - None => Err(format!("invalid KEY=VALUE: {}", s)), - } -} - #[derive(Subcommand)] enum SessionCommand { #[command(about = "List all available sessions")] @@ -142,27 +103,21 @@ enum SchedulerCommand { List {}, #[command(about = "Remove a scheduled job by ID")] Remove { - #[arg(long, help = "ID of the job to remove")] // Changed from positional to named --id + #[arg(long, help = "ID of the job to remove")] id: String, }, - /// List sessions created by a specific schedule #[command(about = "List sessions created by a specific schedule")] Sessions { - /// ID of the schedule - #[arg(long, help = "ID of the schedule")] // Explicitly make it --id + #[arg(long, help = "ID of the schedule")] id: String, - /// Maximum number of sessions to return #[arg(long, help = "Maximum number of sessions to return")] limit: Option, }, - /// Run a scheduled job immediately #[command(about = "Run a scheduled job immediately")] RunNow { - /// ID of the schedule to run - #[arg(long, help = "ID of the schedule to run")] // Explicitly make it --id + #[arg(long, help = "ID of the schedule to run")] id: String, }, - /// Check status of Temporal services (temporal scheduler only) #[command(about = "Check status of Temporal services")] ServicesStatus {}, /// Stop Temporal services (temporal scheduler only) @@ -229,18 +184,14 @@ pub enum BenchCommand { #[derive(Subcommand)] enum RecipeCommand { - /// Validate a recipe file #[command(about = "Validate a recipe")] Validate { - /// Recipe name to get recipe file to validate #[arg(help = "recipe name to get recipe file or full path to the recipe file to validate")] recipe_name: String, }, - /// Generate a deeplink for a recipe file #[command(about = "Generate a deeplink for a recipe")] Deeplink { - /// Recipe name to get recipe file to generate deeplink #[arg( help = "recipe name to get recipe file or full path to the recipe file to generate deeplink" )] @@ -271,23 +222,18 @@ enum RecipeCommand { #[derive(Subcommand)] enum Command { - /// Configure Goose settings #[command(about = "Configure Goose settings")] Configure {}, - /// Display Goose configuration information #[command(about = "Display Goose information")] Info { - /// Show verbose information including current configuration #[arg(short, long, help = "Show verbose information including config.yaml")] verbose: bool, }, - /// Manage system prompts and behaviors #[command(about = "Run one of the mcp servers bundled with goose")] Mcp { name: String }, - /// Start or resume interactive chat sessions #[command( about = "Start or resume interactive chat sessions", visible_alias = "s" @@ -295,11 +241,9 @@ enum Command { Session { #[command(subcommand)] command: Option, - /// Identifier for the chat session #[command(flatten)] identifier: Option, - /// Resume a previous session #[arg( short, long, @@ -308,7 +252,6 @@ enum Command { )] resume: bool, - /// Show message history when resuming #[arg( long, help = "Show previous messages when resuming a session", @@ -316,7 +259,6 @@ enum Command { )] history: bool, - /// Enable debug output mode #[arg( long, help = "Enable debug output mode with full content and no truncation", @@ -324,7 +266,6 @@ enum Command { )] debug: bool, - /// Maximum number of consecutive identical tool calls allowed #[arg( long = "max-tool-repetitions", value_name = "NUMBER", @@ -333,7 +274,6 @@ enum Command { )] max_tool_repetitions: Option, - /// Maximum number of turns (iterations) allowed in a single response #[arg( long = "max-turns", value_name = "NUMBER", @@ -342,7 +282,6 @@ enum Command { )] max_turns: Option, - /// Add stdio extensions with environment variables and commands #[arg( long = "with-extension", value_name = "COMMAND", @@ -352,7 +291,6 @@ enum Command { )] extensions: Vec, - /// Add remote extensions with a URL #[arg( long = "with-remote-extension", value_name = "URL", @@ -362,7 +300,6 @@ enum Command { )] remote_extensions: Vec, - /// Add streamable HTTP extensions with a URL #[arg( long = "with-streamable-http-extension", value_name = "URL", @@ -372,7 +309,6 @@ enum Command { )] streamable_http_extensions: Vec, - /// Add builtin extensions by name #[arg( long = "with-builtin", value_name = "NAME", @@ -383,18 +319,14 @@ enum Command { builtins: Vec, }, - /// Open the last project directory #[command(about = "Open the last project directory", visible_alias = "p")] Project {}, - /// List recent project directories #[command(about = "List recent project directories", visible_alias = "ps")] Projects, - /// Execute commands from an instruction file #[command(about = "Execute commands from an instruction file or stdin")] Run { - /// Path to instruction file containing commands #[arg( short, long, @@ -405,7 +337,6 @@ enum Command { )] instructions: Option, - /// Input text containing commands #[arg( short = 't', long = "text", @@ -417,7 +348,6 @@ enum Command { )] input_text: Option, - /// Additional system prompt to customize agent behavior #[arg( long = "system", value_name = "TEXT", @@ -427,7 +357,6 @@ enum Command { )] system: Option, - /// Recipe name or full path to the recipe file #[arg( short = None, long = "recipe", @@ -449,7 +378,6 @@ enum Command { )] params: Vec<(String, String)>, - /// Continue in interactive mode after processing input #[arg( short = 's', long = "interactive", @@ -457,7 +385,6 @@ enum Command { )] interactive: bool, - /// Run without storing a session file #[arg( long = "no-session", help = "Run without storing a session file", @@ -466,21 +393,18 @@ enum Command { )] no_session: bool, - /// Show the recipe title, description, and parameters #[arg( long = "explain", help = "Show the recipe title, description, and parameters" )] explain: bool, - /// Print the rendered recipe instead of running it #[arg( long = "render-recipe", help = "Print the rendered recipe instead of running it." )] render_recipe: bool, - /// Maximum number of consecutive identical tool calls allowed #[arg( long = "max-tool-repetitions", value_name = "NUMBER", @@ -489,7 +413,6 @@ enum Command { )] max_tool_repetitions: Option, - /// Maximum number of turns (iterations) allowed in a single response #[arg( long = "max-turns", value_name = "NUMBER", @@ -498,11 +421,9 @@ enum Command { )] max_turns: Option, - /// Identifier for this run session #[command(flatten)] identifier: Option, - /// Resume a previous run #[arg( short, long, @@ -512,7 +433,6 @@ enum Command { )] resume: bool, - /// Enable debug output mode #[arg( long, help = "Enable debug output mode with full content and no truncation", @@ -520,7 +440,6 @@ enum Command { )] debug: bool, - /// Add stdio extensions with environment variables and commands #[arg( long = "with-extension", value_name = "COMMAND", @@ -530,7 +449,6 @@ enum Command { )] extensions: Vec, - /// Add remote extensions #[arg( long = "with-remote-extension", value_name = "URL", @@ -540,7 +458,6 @@ enum Command { )] remote_extensions: Vec, - /// Add streamable HTTP extensions #[arg( long = "with-streamable-http-extension", value_name = "URL", @@ -550,7 +467,6 @@ enum Command { )] streamable_http_extensions: Vec, - /// Add builtin extensions by name #[arg( long = "with-builtin", value_name = "NAME", @@ -560,7 +476,6 @@ enum Command { )] builtins: Vec, - /// Quiet mode - suppress non-response output #[arg( short = 'q', long = "quiet", @@ -568,7 +483,6 @@ enum Command { )] quiet: bool, - /// Scheduled job ID (used internally for scheduled executions) #[arg( long = "scheduled-job-id", value_name = "ID", @@ -578,7 +492,6 @@ enum Command { )] scheduled_job_id: Option, - /// Additional sub-recipe file paths #[arg( long = "sub-recipe", value_name = "RECIPE", @@ -588,7 +501,6 @@ enum Command { )] additional_sub_recipes: Vec, - /// Provider to use for this run (overrides environment variable) #[arg( long = "provider", value_name = "PROVIDER", @@ -597,7 +509,6 @@ enum Command { )] provider: Option, - /// Model to use for this run (overrides environment variable) #[arg( long = "model", value_name = "MODEL", @@ -607,24 +518,20 @@ enum Command { model: Option, }, - /// Recipe utilities for validation and deeplinking #[command(about = "Recipe utilities for validation and deeplinking")] Recipe { #[command(subcommand)] command: RecipeCommand, }, - /// Manage scheduled jobs #[command(about = "Manage scheduled jobs", visible_alias = "sched")] Schedule { #[command(subcommand)] command: SchedulerCommand, }, - /// Update the Goose CLI version #[command(about = "Update the goose CLI version")] Update { - /// Update to canary version #[arg( short, long, @@ -633,22 +540,18 @@ enum Command { )] canary: bool, - /// Enforce to re-configure Goose during update #[arg(short, long, help = "Enforce to re-configure goose during update")] reconfigure: bool, }, - /// Evaluate system configuration across a range of practical tasks #[command(about = "Evaluate system configuration across a range of practical tasks")] Bench { #[command(subcommand)] cmd: BenchCommand, }, - /// Start a web server with a chat interface #[command(about = "Experimental: Start a web server with a chat interface")] Web { - /// Port to run the web server on #[arg( short, long, @@ -657,7 +560,6 @@ enum Command { )] port: u16, - /// Host to bind the web server to #[arg( long, default_value = "127.0.0.1", @@ -665,14 +567,13 @@ enum Command { )] host: String, - /// Open browser automatically #[arg(long, help = "Open browser automatically when server starts")] open: bool, }, } #[derive(clap::ValueEnum, Clone, Debug)] -enum CliProviderVariant { +pub enum CliProviderVariant { OpenAi, Databricks, Ollama, @@ -703,7 +604,13 @@ pub async fn cli() -> Result<()> { match cli.command { Some(Command::Configure {}) => { - let _ = handle_configure().await; + goose::track_telemetry! { + command: ("configure", CommandType::Configure) => { + handle_configure() + .await + .map_err(|e| anyhow::anyhow!("Configure failed: {}", e)) + } + }?; return Ok(()); } Some(Command::Info { verbose }) => { @@ -711,6 +618,9 @@ pub async fn cli() -> Result<()> { return Ok(()); } Some(Command::Mcp { name }) => { + if let Err(e) = goose::telemetry::shutdown_global_telemetry().await { + eprintln!("⚠️ Failed to shutdown telemetry for MCP process: {}", e); + } let _ = run_server(&name).await; } Some(Command::Session { @@ -757,7 +667,6 @@ pub async fn cli() -> Result<()> { Ok(()) } None => { - // Run session command by default let mut session: crate::Session = build_session(SessionBuilderConfig { identifier: identifier.map(extract_identifier), resume, @@ -782,21 +691,30 @@ pub async fn cli() -> Result<()> { retry_config: None, }) .await; - setup_logging( - session - .session_file() - .as_ref() - .and_then(|p| p.file_stem()) - .and_then(|s| s.to_str()), - None, - )?; - // Render previous messages if resuming a session and history flag is set - if resume && history { - session.render_message_history(); - } + let session_id = session + .session_file() + .as_ref() + .and_then(|p| p.file_stem()) + .and_then(|s| s.to_str()) + .unwrap_or("unknown_session") + .to_string(); + + goose::track_telemetry! { + session: (session_id.as_str(), SessionType::Interactive) => { + setup_logging( + Some(session_id.as_str()), + None, + )?; + + if resume && history { + session.render_message_history(); + } - let _ = session.interactive(None).await; + let result = session.interactive(None).await; + result.map(|r| (r, session)) + } + }?; Ok(()) } }; @@ -837,6 +755,10 @@ pub async fn cli() -> Result<()> { provider, model, }) => { + // Check if this is a recipe execution for telemetry tracking + let is_recipe_execution = recipe.is_some(); + let recipe_name_for_telemetry = recipe.clone().unwrap_or_default(); + let (input_config, recipe_info) = match (instructions, input_text, recipe) { (Some(file), _, _) if file == "-" => { let mut input = String::new(); @@ -886,8 +808,11 @@ pub async fn cli() -> Result<()> { } return Ok(()); } - let (input_config, recipe_info) = - extract_recipe_info_from_cli(recipe_name, params, additional_sub_recipes)?; + let (input_config, recipe_info) = extract_recipe_info_from_cli( + recipe_name.clone(), + params.clone(), + additional_sub_recipes, + )?; (input_config, Some(recipe_info)) } (None, None, None) => { @@ -896,51 +821,130 @@ pub async fn cli() -> Result<()> { } }; - let mut session = build_session(SessionBuilderConfig { - identifier: identifier.map(extract_identifier), - resume, - no_session, - extensions, - remote_extensions, - streamable_http_extensions, - builtins, - extensions_override: input_config.extensions_override, - additional_system_prompt: input_config.additional_system_prompt, - settings: recipe_info + if is_recipe_execution { + let recipe_version = match crate::recipes::recipe::load_recipe( + &recipe_name_for_telemetry, + params.clone(), + ) { + Ok(recipe) => recipe.version, + Err(_) => "unknown".to_string(), + }; + + goose::track_telemetry! { + recipe: (&recipe_name_for_telemetry, &recipe_version, params) => { + let mut session = build_session(SessionBuilderConfig { + identifier: identifier.map(extract_identifier), + resume, + no_session, + extensions, + remote_extensions, + streamable_http_extensions, + builtins, + extensions_override: input_config.extensions_override, + additional_system_prompt: input_config.additional_system_prompt, + settings: recipe_info .as_ref() .and_then(|r| r.session_settings.clone()), - provider, - model, - debug, - max_tool_repetitions, - max_turns, - scheduled_job_id, - interactive, // Use the interactive flag from the Run command - quiet, - sub_recipes: recipe_info.as_ref().and_then(|r| r.sub_recipes.clone()), - final_output_response: recipe_info - .as_ref() - .and_then(|r| r.final_output_response.clone()), - retry_config: recipe_info.as_ref().and_then(|r| r.retry_config.clone()), - }) - .await; + provider, + model, + debug, + max_tool_repetitions, + max_turns, + scheduled_job_id, + interactive, + quiet, + sub_recipes: recipe_info.as_ref().and_then(|r| r.sub_recipes.clone()), + final_output_response: recipe_info.as_ref().and_then(|r| r.final_output_response.clone()), + retry_config: recipe_info.as_ref().and_then(|r| r.retry_config.clone()), + }) + .await; + + setup_logging( + session + .session_file() + .as_ref() + .and_then(|p| p.file_stem()) + .and_then(|s| s.to_str()), + None, + )?; + + let result = if interactive { + session.interactive(input_config.contents).await + } else if let Some(contents) = input_config.contents { + session.headless(contents).await + } else { + Err(anyhow::anyhow!( + "Error: no text provided for prompt in headless mode" + )) + }; + + result.map(|r| (r, session)) + } + }?; + } else { + let mut session = build_session(SessionBuilderConfig { + identifier: identifier.map(extract_identifier), + resume, + no_session, + extensions, + remote_extensions, + streamable_http_extensions, + builtins, + extensions_override: input_config.extensions_override, + additional_system_prompt: input_config.additional_system_prompt, + settings: recipe_info + .as_ref() + .and_then(|r| r.session_settings.clone()), + provider, + model, + debug, + max_tool_repetitions, + max_turns, + scheduled_job_id, + interactive, + quiet, + sub_recipes: recipe_info.as_ref().and_then(|r| r.sub_recipes.clone()), + final_output_response: recipe_info + .as_ref() + .and_then(|r| r.final_output_response.clone()), + retry_config: recipe_info.as_ref().and_then(|r| r.retry_config.clone()), + }) + .await; - setup_logging( - session + let session_id = session .session_file() .as_ref() .and_then(|p| p.file_stem()) - .and_then(|s| s.to_str()), - None, - )?; - - if interactive { - let _ = session.interactive(input_config.contents).await; - } else if let Some(contents) = input_config.contents { - let _ = session.headless(contents).await; - } else { - eprintln!("Error: no text provided for prompt in headless mode"); - std::process::exit(1); + .and_then(|s| s.to_str()) + .unwrap_or("unknown_run") + .to_string(); + + let session_type = if interactive { + SessionType::Interactive + } else { + SessionType::Headless + }; + + goose::track_telemetry! { + session: (session_id.as_str(), session_type) => { + setup_logging( + Some(session_id.as_str()), + None, + )?; + + let result = if interactive { + session.interactive(input_config.contents).await + } else if let Some(contents) = input_config.contents { + session.headless(contents).await + } else { + Err(anyhow::anyhow!( + "Error: no text provided for prompt in headless mode" + )) + }; + + result.map(|r| (r, session)) + } + }?; } return Ok(()); diff --git a/crates/goose-cli/src/cli/mod.rs b/crates/goose-cli/src/cli/mod.rs new file mode 100644 index 000000000000..3318ba5f6ab8 --- /dev/null +++ b/crates/goose-cli/src/cli/mod.rs @@ -0,0 +1,9 @@ +pub mod main; +pub mod session; +pub mod telemetry; +pub mod utils; + +pub use main::{cli, BenchCommand, CliProviderVariant}; +pub use session::{extract_identifier, Identifier}; +pub use telemetry::{extract_tool_usage_from_session, log_if_telemetry_disabled}; +pub use utils::{parse_key_val, InputConfig}; diff --git a/crates/goose-cli/src/cli/session.rs b/crates/goose-cli/src/cli/session.rs new file mode 100644 index 000000000000..dccb96fc55fa --- /dev/null +++ b/crates/goose-cli/src/cli/session.rs @@ -0,0 +1,35 @@ +use crate::session; +use std::path::PathBuf; + +#[derive(clap::Args, Debug)] +#[group(required = false, multiple = false)] +pub struct Identifier { + #[arg( + short, + long, + value_name = "NAME", + help = "Name for the chat session (e.g., 'project-x')", + long_help = "Specify a name for your chat session. When used with --resume, will resume this specific session if it exists.", + alias = "id" + )] + pub name: Option, + + #[arg( + short, + long, + value_name = "PATH", + help = "Path for the chat session (e.g., './playground.jsonl')", + long_help = "Specify a path for your chat session. When used with --resume, will resume this specific session if it exists." + )] + pub path: Option, +} + +pub fn extract_identifier(identifier: Identifier) -> session::Identifier { + if let Some(name) = identifier.name { + session::Identifier::Name(name) + } else if let Some(path) = identifier.path { + session::Identifier::Path(path) + } else { + unreachable!() + } +} diff --git a/crates/goose-cli/src/cli/telemetry.rs b/crates/goose-cli/src/cli/telemetry.rs new file mode 100644 index 000000000000..500f63b8e504 --- /dev/null +++ b/crates/goose-cli/src/cli/telemetry.rs @@ -0,0 +1,60 @@ +use std::collections::HashMap; + +use goose::message::MessageContent; +use goose::telemetry::ToolUsage; + +pub fn log_if_telemetry_disabled(tracking_type: &str) { + if goose::telemetry::global_telemetry().is_none() { + tracing::debug!( + "Telemetry is disabled or not initialized for {} tracking", + tracking_type + ); + } +} + +pub fn extract_tool_usage_from_session(session: &crate::Session) -> Vec { + let messages = session.message_history(); + let mut tool_usage_map: HashMap = HashMap::new(); + let mut tool_call_times: HashMap = HashMap::new(); + let mut tool_id_to_name: HashMap = HashMap::new(); + + for message in &messages { + for content in &message.content { + match content { + MessageContent::ToolRequest(tool_request) => { + if let Ok(tool_call) = &tool_request.tool_call { + let tool_name = &tool_call.name; + let tool_id = &tool_request.id; + + tool_id_to_name.insert(tool_id.clone(), tool_name.clone()); + tool_call_times.insert(tool_id.clone(), message.created); + + tool_usage_map + .entry(tool_name.clone()) + .or_insert_with(|| ToolUsage::new(tool_name)); + } + } + MessageContent::ToolResponse(tool_response) => { + let tool_id = &tool_response.id; + + if let Some(tool_name) = tool_id_to_name.get(tool_id) { + if let Some(entry) = tool_usage_map.get_mut(tool_name) { + let duration = if let Some(start_time) = tool_call_times.get(tool_id) { + let duration_ms = (message.created - start_time).max(0) as u64; + std::time::Duration::from_millis(duration_ms) + } else { + std::time::Duration::from_millis(0) + }; + + let success = tool_response.tool_result.is_ok(); + entry.add_call(duration, success); + } + } + } + _ => {} + } + } + } + + tool_usage_map.into_values().collect() +} diff --git a/crates/goose-cli/src/cli/utils.rs b/crates/goose-cli/src/cli/utils.rs new file mode 100644 index 000000000000..8c2fcc507946 --- /dev/null +++ b/crates/goose-cli/src/cli/utils.rs @@ -0,0 +1,16 @@ +use anyhow::Result; +use goose::config::ExtensionConfig; + +#[derive(Debug)] +pub struct InputConfig { + pub contents: Option, + pub extensions_override: Option>, + pub additional_system_prompt: Option, +} + +pub fn parse_key_val(s: &str) -> Result<(String, String), String> { + match s.split_once('=') { + Some((key, value)) => Ok((key.to_string(), value.to_string())), + None => Err(format!("invalid KEY=VALUE: {}", s)), + } +} diff --git a/crates/goose-cli/src/main.rs b/crates/goose-cli/src/main.rs index 278a32624212..2e8afe978477 100644 --- a/crates/goose-cli/src/main.rs +++ b/crates/goose-cli/src/main.rs @@ -3,5 +3,19 @@ use goose_cli::cli::cli; #[tokio::main] async fn main() -> Result<()> { - cli().await + let telemetry_init_result = goose::telemetry::init_global_telemetry().await; + + if let Err(e) = &telemetry_init_result { + eprintln!("⚠️ Telemetry initialization failed: {}", e); + eprintln!(" This may be due to configuration issues or connectivity problems."); + eprintln!(" The application will continue without telemetry."); + } + + let result = cli().await; + + if let Err(e) = goose::telemetry::shutdown_global_telemetry().await { + eprintln!("⚠️ Failed to shutdown telemetry: {}", e); + } + + result } diff --git a/crates/goose-cli/src/recipes/extract_from_cli.rs b/crates/goose-cli/src/recipes/extract_from_cli.rs index b3012550ed62..4c73a106a177 100644 --- a/crates/goose-cli/src/recipes/extract_from_cli.rs +++ b/crates/goose-cli/src/recipes/extract_from_cli.rs @@ -7,7 +7,7 @@ use crate::recipes::print_recipe::print_recipe_info; use crate::recipes::recipe::load_recipe; use crate::recipes::search_recipe::retrieve_recipe_file; use crate::{ - cli::{InputConfig, RecipeInfo}, + cli::main::{InputConfig, RecipeInfo}, session::SessionSettings, }; diff --git a/crates/goose-server/src/commands/agent.rs b/crates/goose-server/src/commands/agent.rs index 5fdfa89ae2ee..010463912f1c 100644 --- a/crates/goose-server/src/commands/agent.rs +++ b/crates/goose-server/src/commands/agent.rs @@ -7,6 +7,7 @@ use etcetera::{choose_app_strategy, AppStrategy}; use goose::agents::Agent; use goose::config::APP_STRATEGY; use goose::scheduler_factory::SchedulerFactory; +use goose::telemetry::{init_global_telemetry, shutdown_global_telemetry}; use tower_http::cors::{Any, CorsLayer}; use tracing::info; @@ -16,6 +17,11 @@ pub async fn run() -> Result<()> { // Initialize logging crate::logging::setup_logging(Some("goosed"))?; + // Initialize telemetry + if let Err(e) = init_global_telemetry().await { + tracing::warn!("Failed to initialize telemetry: {}", e); + } + let settings = configuration::Settings::new()?; // Initialize pricing cache on startup @@ -54,6 +60,14 @@ pub async fn run() -> Result<()> { let listener = tokio::net::TcpListener::bind(settings.socket_addr()).await?; info!("listening on {}", listener.local_addr()?); - axum::serve(listener, app).await?; + + let result = axum::serve(listener, app).await; + + // Shutdown telemetry + if let Err(e) = shutdown_global_telemetry().await { + tracing::warn!("Failed to shutdown telemetry: {}", e); + } + + result?; Ok(()) } diff --git a/crates/goose-server/src/routes/reply.rs b/crates/goose-server/src/routes/reply.rs index d3ac7208e0a0..d005f11de7f6 100644 --- a/crates/goose-server/src/routes/reply.rs +++ b/crates/goose-server/src/routes/reply.rs @@ -13,6 +13,10 @@ use goose::{ agents::{AgentEvent, SessionConfig}, message::{push_message, Message}, permission::permission_confirmation::PrincipalType, + telemetry::{ + global_telemetry, SessionExecution, SessionMetadataSupport, SessionResult, SessionType, + TelemetryExecution, + }, }; use goose::{ permission::{Permission, PermissionConfirmation}, @@ -29,7 +33,7 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, - time::Duration, + time::{Duration, Instant}, }; use tokio::sync::mpsc; use tokio::time::timeout; @@ -37,12 +41,106 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; use utoipa::ToSchema; +fn create_session_execution( + session_id: &str, + session_type: &str, + recipe_name: Option<&str>, + recipe_version: Option<&str>, +) -> SessionExecution { + let mut session_execution = SessionExecution::new(session_id, SessionType::Interactive) + .with_metadata("execution_mode", "server") + .with_metadata("session_type", session_type) + .with_metadata("interface", "ui"); + + if let Some(recipe_name) = recipe_name { + session_execution = session_execution + .with_metadata("recipe_name", recipe_name) + .with_metadata("session_mode", "recipe"); + } + if let Some(recipe_version) = recipe_version { + session_execution = session_execution.with_metadata("recipe_version", recipe_version); + } + + if recipe_name.is_none() { + session_execution = session_execution.with_metadata("session_mode", "chat"); + } + + session_execution +} + +async fn track_failed_session( + session_execution: SessionExecution, + error_message: String, + start_time: Instant, + message_count: Option, + turn_count: Option, +) { + if let Some(manager) = global_telemetry() { + let mut failed_execution = session_execution + .with_result(SessionResult::Error(error_message)) + .with_duration(start_time.elapsed()); + + if let Some(count) = message_count { + failed_execution = failed_execution.with_message_count(count); + } + if let Some(count) = turn_count { + failed_execution = failed_execution.with_turn_count(count); + } + + let _ = manager.track_session_execution(failed_execution).await; + } else { + tracing::warn!( + "Telemetry is disabled or not initialized - failed to track session failure" + ); + } +} + +async fn track_successful_session( + session_execution: SessionExecution, + start_time: Instant, + message_count: u64, + turn_count: u64, +) { + if let Some(manager) = global_telemetry() { + let successful_execution = session_execution + .with_result(SessionResult::Success) + .with_message_count(message_count) + .with_turn_count(turn_count) + .with_duration(start_time.elapsed()); + let _ = manager.track_session_execution(successful_execution).await; + } +} + +async fn track_recipe_execution( + recipe_name: &str, + recipe_version: &str, + result: SessionResult, + start_time: Instant, + session_type: &str, +) { + if let Some(manager) = global_telemetry() { + let recipe_execution = manager + .recipe_execution(recipe_name, recipe_version) + .with_result(result) + .with_duration(start_time.elapsed()) + .with_metadata("interface", "ui") + .with_metadata("execution_mode", "server") + .with_metadata("session_type", session_type); + + let _ = manager + .track_recipe_execution(recipe_execution.build()) + .await; + } +} + #[derive(Debug, Deserialize, Serialize)] struct ChatRequest { messages: Vec, session_id: Option, session_working_dir: String, scheduled_job_id: Option, + recipe_name: Option, + recipe_version: Option, } pub struct SseResponse { @@ -136,8 +234,61 @@ async fn reply_handler( let task_tx = tx.clone(); std::mem::drop(tokio::spawn(async move { + let start_time = Instant::now(); + let mut session_execution = create_session_execution( + &session_id, + "streaming", + request.recipe_name.as_deref(), + request.recipe_version.as_deref(), + ); + let agent = match state.get_agent().await { - Ok(agent) => agent, + Ok(agent) => { + let provider = agent.provider().await; + match provider { + Ok(_) => agent, + Err(_) => { + let _ = stream_event( + MessageEvent::Error { + error: "No provider configured".to_string(), + }, + &tx, + ) + .await; + let _ = stream_event( + MessageEvent::Finish { + reason: "error".to_string(), + }, + &tx, + ) + .await; + + // Track failed session and recipe execution + track_failed_session( + session_execution.clone(), + "No provider configured".to_string(), + start_time, + None, + None, + ) + .await; + + if let (Some(recipe_name), Some(recipe_version)) = + (&request.recipe_name, &request.recipe_version) + { + track_recipe_execution( + recipe_name, + recipe_version, + SessionResult::Error("No provider configured".to_string()), + start_time, + "streaming", + ) + .await; + } + return; + } + } + } Err(_) => { let _ = stream_event( MessageEvent::Error { @@ -146,6 +297,16 @@ async fn reply_handler( &task_tx, ) .await; + + // Track failed session + track_failed_session( + session_execution.clone(), + "No agent configured".to_string(), + start_time, + None, + None, + ) + .await; return; } }; @@ -173,11 +334,24 @@ async fn reply_handler( &task_tx, ) .await; + + // Track failed session + track_failed_session( + session_execution.clone(), + e.to_string(), + start_time, + None, + None, + ) + .await; return; } }; let mut all_messages = messages.clone(); + let mut message_count = messages.len(); + let mut turn_count = 0; + let session_path = match session::get_path(session::Identifier::Name(session_id.clone())) { Ok(path) => path, Err(e) => { @@ -189,6 +363,16 @@ async fn reply_handler( &task_tx, ) .await; + + // Track failed session + track_failed_session( + session_execution.clone(), + format!("Failed to get session path: {}", e), + start_time, + None, + None, + ) + .await; return; } }; @@ -204,57 +388,72 @@ async fn reply_handler( match response { Ok(Some(Ok(AgentEvent::Message(message)))) => { push_message(&mut all_messages, message.clone()); - if let Err(e) = stream_event(MessageEvent::Message { message }, &tx).await { - tracing::error!("Error sending message through channel: {}", e); - let _ = stream_event( - MessageEvent::Error { - error: e.to_string(), - }, - &tx, - ).await; - break; - } - } - Ok(Some(Ok(AgentEvent::ModelChange { model, mode }))) => { - if let Err(e) = stream_event(MessageEvent::ModelChange { model, mode }, &tx).await { - tracing::error!("Error sending model change through channel: {}", e); - let _ = stream_event( - MessageEvent::Error { - error: e.to_string(), - }, - &tx, - ).await; - } - } - Ok(Some(Ok(AgentEvent::McpNotification((request_id, n))))) => { - if let Err(e) = stream_event(MessageEvent::Notification{ - request_id: request_id.clone(), - message: n, - }, &tx).await { - tracing::error!("Error sending message through channel: {}", e); - let _ = stream_event( - MessageEvent::Error { - error: e.to_string(), - }, - &tx, - ).await; - } - } + message_count += 1; + if message.role == Role::Assistant { + turn_count += 1; + } + if let Err(e) = stream_event(MessageEvent::Message { message }, &tx).await { + tracing::error!("Error sending message through channel: {}", e); + let _ = stream_event( + MessageEvent::Error { + error: e.to_string(), + }, + &tx, + ).await; + break; + } + } + Ok(Some(Ok(AgentEvent::ModelChange { model, mode }))) => { + session_execution = session_execution.with_metadata("model", &model); + + if let Err(e) = stream_event(MessageEvent::ModelChange { model, mode }, &tx).await { + tracing::error!("Error sending model change through channel: {}", e); + let _ = stream_event( + MessageEvent::Error { + error: e.to_string(), + }, + &tx, + ).await; + } + } + Ok(Some(Ok(AgentEvent::McpNotification((request_id, n))))) => { + if let Err(e) = stream_event(MessageEvent::Notification{ + request_id: request_id.clone(), + message: n, + }, &tx).await { + tracing::error!("Error sending message through channel: {}", e); + let _ = stream_event( + MessageEvent::Error { + error: e.to_string(), + }, + &tx, + ).await; + } + } - Ok(Some(Err(e))) => { - tracing::error!("Error processing message: {}", e); - let _ = stream_event( - MessageEvent::Error { - error: e.to_string(), - }, - &tx, - ).await; - break; - } - Ok(None) => { - break; - } - Err(_) => { + Ok(Some(Err(e))) => { + tracing::error!("Error processing message: {}", e); + let _ = stream_event( + MessageEvent::Error { + error: e.to_string(), + }, + &tx, + ).await; + + // Track failed session + track_failed_session( + session_execution.clone(), + e.to_string(), + start_time, + Some(message_count as u64), + Some(turn_count as u64), + ).await; + break; + } + Ok(None) => { + break; + } + Err(_) => { if tx.is_closed() { break; } @@ -290,7 +489,30 @@ async fn reply_handler( &task_tx, ) .await; + + // Track successful session and recipe execution + track_successful_session( + session_execution.clone(), + start_time, + message_count as u64, + turn_count as u64, + ) + .await; + + if let (Some(recipe_name), Some(recipe_version)) = + (&request.recipe_name, &request.recipe_version) + { + track_recipe_execution( + recipe_name, + recipe_version, + SessionResult::Success, + start_time, + "streaming", + ) + .await; + } })); + Ok(SseResponse::new(stream)) } @@ -462,6 +684,8 @@ mod tests { session_id: Some("test-session".to_string()), session_working_dir: "test-working-dir".to_string(), scheduled_job_id: None, + recipe_name: None, + recipe_version: None, }) .unwrap(), )) diff --git a/crates/goose/Cargo.toml b/crates/goose/Cargo.toml index ceb05db371f6..a277333a3116 100644 --- a/crates/goose/Cargo.toml +++ b/crates/goose/Cargo.toml @@ -90,6 +90,14 @@ tokio-util = "0.7.15" lancedb = "0.13" arrow = "52.2" +# OpenTelemetry telemetry +opentelemetry = "0.30" +opentelemetry-otlp = { version = "0.30", features = ["http-proto", "grpc-tonic"] } +opentelemetry-stdout = "0.30" +opentelemetry_sdk = { version = "0.30", features = ["rt-tokio"] } +opentelemetry-semantic-conventions = "0.30" +tonic = "0.13" + [target.'cfg(target_os = "windows")'.dependencies] winapi = { version = "0.3", features = ["wincred"] } diff --git a/crates/goose/src/lib.rs b/crates/goose/src/lib.rs index 63f8f0aab3b8..082b1ba13dd8 100644 --- a/crates/goose/src/lib.rs +++ b/crates/goose/src/lib.rs @@ -14,6 +14,7 @@ pub mod scheduler; pub mod scheduler_factory; pub mod scheduler_trait; pub mod session; +pub mod telemetry; pub mod temporal_scheduler; pub mod token_counter; pub mod tool_monitor; diff --git a/crates/goose/src/telemetry/config.rs b/crates/goose/src/telemetry/config.rs new file mode 100644 index 000000000000..ab4d7a91f3e7 --- /dev/null +++ b/crates/goose/src/telemetry/config.rs @@ -0,0 +1,225 @@ +use serde::{Deserialize, Serialize}; +use std::env; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum TelemetryProvider { + Otlp, + #[default] + Console, + File, +} + +impl std::str::FromStr for TelemetryProvider { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "otlp" => Ok(TelemetryProvider::Otlp), + "console" => Ok(TelemetryProvider::Console), + "file" => Ok(TelemetryProvider::File), + _ => Err(format!("Unknown telemetry provider: {}", s)), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum UsageType { + #[default] + Human, + Automation, + Ci, +} + +impl std::str::FromStr for UsageType { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "human" => Ok(UsageType::Human), + "automation" => Ok(UsageType::Automation), + "ci" => Ok(UsageType::Ci), + _ => Err(format!("Unknown usage type: {}", s)), + } + } +} + +#[derive(Debug, Clone)] +pub struct TelemetryConfig { + pub enabled: bool, + pub provider: TelemetryProvider, + pub endpoint: Option, + pub api_key: Option, + pub usage_type: Option, + pub environment: Option, + pub service_name: String, + pub service_version: String, +} + +impl Default for TelemetryConfig { + fn default() -> Self { + Self { + enabled: false, + provider: TelemetryProvider::default(), + endpoint: None, + api_key: None, + usage_type: None, + environment: None, + service_name: "goose".to_string(), + service_version: env!("CARGO_PKG_VERSION").to_string(), + } + } +} + +impl TelemetryConfig { + pub fn from_env() -> Self { + let enabled = env::var("GOOSE_TELEMETRY_ENABLED") + .map(|v| v.to_lowercase() == "true" || v == "1") + .unwrap_or(false); + + let provider = env::var("GOOSE_TELEMETRY_PROVIDER") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or_default(); + + let endpoint = env::var("GOOSE_TELEMETRY_ENDPOINT") + .ok() + .or_else(|| env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok()); + + let api_key = env::var("GOOSE_TELEMETRY_API_KEY").ok(); + + let usage_type = env::var("GOOSE_USAGE_TYPE") + .ok() + .and_then(|v| v.parse().ok()); + + let environment = env::var("GOOSE_ENVIRONMENT").ok(); + + let service_name = env::var("OTEL_SERVICE_NAME").unwrap_or_else(|_| "goose".to_string()); + + Self { + enabled, + provider, + endpoint, + api_key, + usage_type, + environment, + service_name, + service_version: env!("CARGO_PKG_VERSION").to_string(), + } + } + + pub fn validate(&self) -> Result<(), String> { + if !self.enabled { + return Ok(()); + } + + match self.provider { + TelemetryProvider::Otlp => { + if self.endpoint.is_none() { + return Err("OTLP provider requires GOOSE_TELEMETRY_ENDPOINT or OTEL_EXPORTER_OTLP_ENDPOINT".to_string()); + } + } + TelemetryProvider::File => { + if self.endpoint.is_none() { + return Err( + "File provider requires GOOSE_TELEMETRY_ENDPOINT (file path)".to_string(), + ); + } + } + TelemetryProvider::Console => {} + } + + Ok(()) + } + + pub fn get_endpoint(&self) -> Option { + match self.provider { + TelemetryProvider::Otlp => self.endpoint.clone(), + TelemetryProvider::File => self.endpoint.clone(), + TelemetryProvider::Console => None, + } + } +} + +// Removed unused function - API key extraction is handled elsewhere + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + + #[test] + fn test_telemetry_provider_parsing() { + assert_eq!( + "otlp".parse::().unwrap(), + TelemetryProvider::Otlp + ); + assert_eq!( + "console".parse::().unwrap(), + TelemetryProvider::Console + ); + assert_eq!( + "file".parse::().unwrap(), + TelemetryProvider::File + ); + assert!("invalid".parse::().is_err()); + } + + #[test] + fn test_usage_type_parsing() { + assert_eq!("human".parse::().unwrap(), UsageType::Human); + assert_eq!( + "automation".parse::().unwrap(), + UsageType::Automation + ); + assert_eq!("ci".parse::().unwrap(), UsageType::Ci); + assert!("invalid".parse::().is_err()); + } + + #[test] + fn test_config_from_env() { + let config = TelemetryConfig::from_env(); + assert!(!config.enabled); + + env::set_var("GOOSE_TELEMETRY_ENABLED", "true"); + env::set_var("GOOSE_TELEMETRY_PROVIDER", "otlp"); + env::set_var("GOOSE_TELEMETRY_ENDPOINT", "http://localhost:4317"); + + let config = TelemetryConfig::from_env(); + assert!(config.enabled); + assert_eq!(config.provider, TelemetryProvider::Otlp); + assert_eq!(config.endpoint, Some("http://localhost:4317".to_string())); + + env::remove_var("GOOSE_TELEMETRY_ENABLED"); + env::remove_var("GOOSE_TELEMETRY_PROVIDER"); + env::remove_var("GOOSE_TELEMETRY_ENDPOINT"); + } + + #[test] + fn test_config_validation() { + let mut config = TelemetryConfig::default(); + + assert!(config.validate().is_ok()); + + config.enabled = true; + config.provider = TelemetryProvider::Otlp; + assert!(config.validate().is_err()); + + config.endpoint = Some("http://localhost:4317".to_string()); + assert!(config.validate().is_ok()); + + config.provider = TelemetryProvider::File; + config.endpoint = None; + assert!(config.validate().is_err()); + + config.endpoint = Some("/tmp/telemetry.log".to_string()); + assert!(config.validate().is_ok()); + + config.provider = TelemetryProvider::Console; + config.endpoint = None; + assert!(config.validate().is_ok()); + } + + // Removed test for extract_api_key_from_headers function +} diff --git a/crates/goose/src/telemetry/environment.rs b/crates/goose/src/telemetry/environment.rs new file mode 100644 index 000000000000..182f756e2c94 --- /dev/null +++ b/crates/goose/src/telemetry/environment.rs @@ -0,0 +1,152 @@ +use std::path::Path; + +pub fn detect_environment() -> Option { + let mut env_indicators = Vec::new(); + + if std::env::var("CI").is_ok() { + env_indicators.push("ci"); + } + if std::env::var("GITHUB_ACTIONS").is_ok() { + env_indicators.push("github-actions"); + } + if std::env::var("JENKINS_URL").is_ok() { + env_indicators.push("jenkins"); + } + if std::env::var("GITLAB_CI").is_ok() { + env_indicators.push("gitlab-ci"); + } + + if std::env::var("DOCKER_CONTAINER").is_ok() || Path::new("/.dockerenv").exists() { + env_indicators.push("docker"); + } + if std::env::var("KUBERNETES_SERVICE_HOST").is_ok() { + env_indicators.push("kubernetes"); + } + + if std::env::var("AWS_LAMBDA_FUNCTION_NAME").is_ok() { + env_indicators.push("aws-lambda"); + } + if std::env::var("GOOGLE_CLOUD_PROJECT").is_ok() { + env_indicators.push("gcp"); + } + if std::env::var("AZURE_FUNCTIONS_ENVIRONMENT").is_ok() { + env_indicators.push("azure-functions"); + } + + if std::env::var("VSCODE_INJECTION").is_ok() { + env_indicators.push("vscode"); + } + if std::env::var("TERM_PROGRAM").as_deref() == Ok("iTerm.app") { + env_indicators.push("iterm"); + } + if std::env::var("TERM_PROGRAM").as_deref() == Ok("Apple_Terminal") { + env_indicators.push("terminal-app"); + } + + if std::env::var("GOOSE_JOB_ID").is_ok() { + env_indicators.push("scheduled"); + } + + #[cfg(target_os = "macos")] + env_indicators.push("macos"); + #[cfg(target_os = "linux")] + env_indicators.push("linux"); + #[cfg(target_os = "windows")] + env_indicators.push("windows"); + + #[cfg(target_arch = "x86_64")] + env_indicators.push("x86_64"); + #[cfg(target_arch = "aarch64")] + env_indicators.push("aarch64"); + #[cfg(target_arch = "arm")] + env_indicators.push("arm"); + + if env_indicators.is_empty() { + None + } else { + Some(env_indicators.join(",")) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + + #[test] + fn test_detect_environment_with_no_indicators() { + let vars_to_clear = [ + "CI", + "GITHUB_ACTIONS", + "JENKINS_URL", + "GITLAB_CI", + "DOCKER_CONTAINER", + "KUBERNETES_SERVICE_HOST", + "AWS_LAMBDA_FUNCTION_NAME", + "GOOGLE_CLOUD_PROJECT", + "AZURE_FUNCTIONS_ENVIRONMENT", + "VSCODE_INJECTION", + "TERM_PROGRAM", + "GOOSE_JOB_ID", + ]; + + let original_values: Vec<_> = vars_to_clear + .iter() + .map(|var| (var, env::var(var).ok())) + .collect(); + + for var in &vars_to_clear { + env::remove_var(var); + } + + let result = detect_environment(); + + for (var, original_value) in original_values { + if let Some(value) = original_value { + env::set_var(var, value); + } + } + + assert!(result.is_some()); + let env_string = result.unwrap(); + + #[cfg(target_os = "macos")] + assert!(env_string.contains("macos")); + #[cfg(target_os = "linux")] + assert!(env_string.contains("linux")); + #[cfg(target_os = "windows")] + assert!(env_string.contains("windows")); + + #[cfg(target_arch = "x86_64")] + assert!(env_string.contains("x86_64")); + #[cfg(target_arch = "aarch64")] + assert!(env_string.contains("aarch64")); + } + + #[test] + fn test_detect_environment_with_ci() { + env::set_var("CI", "true"); + let result = detect_environment(); + assert!(result.is_some()); + assert!(result.unwrap().contains("ci")); + env::remove_var("CI"); + } + + #[test] + fn test_detect_environment_with_github_actions() { + env::set_var("GITHUB_ACTIONS", "true"); + let result = detect_environment(); + assert!(result.is_some()); + assert!(result.unwrap().contains("github-actions")); + env::remove_var("GITHUB_ACTIONS"); + } + + #[test] + fn test_detect_environment_with_docker() { + env::set_var("DOCKER_CONTAINER", "true"); + let result = detect_environment(); + assert!(result.is_some()); + assert!(result.unwrap().contains("docker")); + env::remove_var("DOCKER_CONTAINER"); + } +} diff --git a/crates/goose/src/telemetry/events.rs b/crates/goose/src/telemetry/events.rs new file mode 100644 index 000000000000..e4b8abc5b6ef --- /dev/null +++ b/crates/goose/src/telemetry/events.rs @@ -0,0 +1,909 @@ +use crate::session::storage::SessionMetadata; +use crate::telemetry::config::UsageType; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +pub trait TelemetryExecution { + type ResultType: Clone; + + fn start_time(&self) -> u64; + fn end_time(&self) -> Option; + fn set_end_time(&mut self, time: u64); + fn duration_ms(&self) -> Option; + fn set_duration_ms(&mut self, duration: u64); + fn result(&self) -> Option<&Self::ResultType>; + fn set_result(&mut self, result: Self::ResultType); + fn user_id(&self) -> &str; + fn set_user_id(&mut self, user_id: String); + fn usage_type(&self) -> &UsageType; + fn set_usage_type(&mut self, usage_type: UsageType); + fn environment(&self) -> Option<&String>; + fn set_environment(&mut self, environment: String); + fn metadata(&self) -> &HashMap; + fn metadata_mut(&mut self) -> &mut HashMap; + fn error_details(&self) -> Option<&ErrorDetails>; + fn set_error_details(&mut self, error_details: ErrorDetails); + + fn with_result(mut self, result: Self::ResultType) -> Self + where + Self: Sized, + { + let end_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + self.set_end_time(end_time); + self.set_duration_ms((end_time - self.start_time()) * 1000); + self.set_result(result); + self + } + + fn with_duration(mut self, duration: Duration) -> Self + where + Self: Sized, + { + self.set_duration_ms(duration.as_millis() as u64); + self + } + + fn with_metadata(mut self, key: &str, value: &str) -> Self + where + Self: Sized, + { + self.metadata_mut() + .insert(key.to_string(), value.to_string()); + self + } + + fn with_environment(mut self, environment: &str) -> Self + where + Self: Sized, + { + self.set_environment(environment.to_string()); + self + } + + fn with_error_details(mut self, error_details: ErrorDetails) -> Self + where + Self: Sized, + { + self.set_error_details(error_details); + self + } + + fn complete(&mut self) { + if self.end_time().is_none() { + let end_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + self.set_end_time(end_time); + self.set_duration_ms((end_time - self.start_time()) * 1000); + } + } +} + +pub trait SessionMetadataSupport { + fn token_usage(&self) -> Option<&TokenUsage>; + fn set_token_usage(&mut self, token_usage: TokenUsage); + fn tool_usage(&self) -> &Vec; + fn tool_usage_mut(&mut self) -> &mut Vec; + fn message_count(&self) -> u64; + fn set_message_count(&mut self, count: u64); + fn turn_count(&self) -> u64; + fn set_turn_count(&mut self, count: u64); + + fn with_token_usage(mut self, token_usage: TokenUsage) -> Self + where + Self: Sized, + { + self.set_token_usage(token_usage); + self + } + + fn add_tool_usage(&mut self, tool_usage: ToolUsage) { + self.tool_usage_mut().push(tool_usage); + } + + fn with_message_count(mut self, count: u64) -> Self + where + Self: Sized, + { + self.set_message_count(count); + self + } + + fn with_turn_count(mut self, count: u64) -> Self + where + Self: Sized, + { + self.set_turn_count(count); + self + } + + fn with_session_metadata(mut self, session_metadata: &SessionMetadata) -> Self + where + Self: Sized + TelemetryExecution, + { + self.set_message_count(session_metadata.message_count as u64); + + self.metadata_mut().insert( + "working_dir".to_string(), + session_metadata.working_dir.to_string_lossy().to_string(), + ); + + if let Some(schedule_id) = &session_metadata.schedule_id { + self.metadata_mut() + .insert("schedule_id".to_string(), schedule_id.clone()); + } + + if let (Some(input), Some(output)) = ( + session_metadata.input_tokens, + session_metadata.output_tokens, + ) { + self.set_token_usage(TokenUsage::new(input as u64, output as u64)); + } + + self + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RecipeExecution { + pub recipe_name: String, + pub recipe_version: String, + pub start_time: u64, + pub end_time: Option, + pub duration_ms: Option, + pub result: Option, + pub user_id: String, + pub usage_type: UsageType, + pub environment: Option, + pub metadata: HashMap, + pub token_usage: Option, + pub tool_usage: Vec, + pub message_count: u64, + pub turn_count: u64, + pub error_details: Option, +} + +impl RecipeExecution { + pub fn new(recipe_name: &str, recipe_version: &str) -> Self { + let start_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + Self { + recipe_name: recipe_name.to_string(), + recipe_version: recipe_version.to_string(), + start_time, + end_time: None, + duration_ms: None, + result: None, + user_id: String::new(), + usage_type: UsageType::Human, + environment: None, + metadata: HashMap::new(), + token_usage: None, + tool_usage: Vec::new(), + message_count: 0, + turn_count: 0, + error_details: None, + } + } +} + +impl TelemetryExecution for RecipeExecution { + type ResultType = SessionResult; + + fn start_time(&self) -> u64 { + self.start_time + } + fn end_time(&self) -> Option { + self.end_time + } + fn set_end_time(&mut self, time: u64) { + self.end_time = Some(time); + } + fn duration_ms(&self) -> Option { + self.duration_ms + } + fn set_duration_ms(&mut self, duration: u64) { + self.duration_ms = Some(duration); + } + fn result(&self) -> Option<&Self::ResultType> { + self.result.as_ref() + } + fn set_result(&mut self, result: Self::ResultType) { + self.result = Some(result); + } + fn user_id(&self) -> &str { + &self.user_id + } + fn set_user_id(&mut self, user_id: String) { + self.user_id = user_id; + } + fn usage_type(&self) -> &UsageType { + &self.usage_type + } + fn set_usage_type(&mut self, usage_type: UsageType) { + self.usage_type = usage_type; + } + fn environment(&self) -> Option<&String> { + self.environment.as_ref() + } + fn set_environment(&mut self, environment: String) { + self.environment = Some(environment); + } + fn metadata(&self) -> &HashMap { + &self.metadata + } + fn metadata_mut(&mut self) -> &mut HashMap { + &mut self.metadata + } + fn error_details(&self) -> Option<&ErrorDetails> { + self.error_details.as_ref() + } + fn set_error_details(&mut self, error_details: ErrorDetails) { + self.error_details = Some(error_details); + } +} + +impl SessionMetadataSupport for RecipeExecution { + fn token_usage(&self) -> Option<&TokenUsage> { + self.token_usage.as_ref() + } + fn set_token_usage(&mut self, token_usage: TokenUsage) { + self.token_usage = Some(token_usage); + } + fn tool_usage(&self) -> &Vec { + &self.tool_usage + } + fn tool_usage_mut(&mut self) -> &mut Vec { + &mut self.tool_usage + } + fn message_count(&self) -> u64 { + self.message_count + } + fn set_message_count(&mut self, count: u64) { + self.message_count = count; + } + fn turn_count(&self) -> u64 { + self.turn_count + } + fn set_turn_count(&mut self, count: u64) { + self.turn_count = count; + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TokenUsage { + pub input_tokens: u64, + pub output_tokens: u64, + pub total_tokens: u64, + pub estimated_cost: Option, + pub model: Option, + pub provider: Option, +} + +impl TokenUsage { + pub fn new(input_tokens: u64, output_tokens: u64) -> Self { + Self { + input_tokens, + output_tokens, + total_tokens: input_tokens + output_tokens, + estimated_cost: None, + model: None, + provider: None, + } + } + + pub fn with_cost(mut self, cost: f64) -> Self { + self.estimated_cost = Some(cost); + self + } + + pub fn with_model(mut self, model: &str, provider: &str) -> Self { + self.model = Some(model.to_string()); + self.provider = Some(provider.to_string()); + self + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ToolUsage { + pub tool_name: String, + pub call_count: u64, + pub total_duration_ms: u64, + pub avg_duration_ms: u64, + pub success_count: u64, + pub error_count: u64, +} + +impl ToolUsage { + pub fn new(tool_name: &str) -> Self { + Self { + tool_name: tool_name.to_string(), + call_count: 0, + total_duration_ms: 0, + avg_duration_ms: 0, + success_count: 0, + error_count: 0, + } + } + + pub fn add_call(&mut self, duration: Duration, success: bool) { + self.call_count += 1; + let duration_ms = duration.as_millis() as u64; + self.total_duration_ms += duration_ms; + self.avg_duration_ms = self.total_duration_ms / self.call_count; + + if success { + self.success_count += 1; + } else { + self.error_count += 1; + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ErrorDetails { + pub error_type: String, + pub error_message: String, + pub stack_trace: Option, + pub failing_tool: Option, + pub context: HashMap, +} + +impl ErrorDetails { + pub fn new(error_type: &str, error_message: &str) -> Self { + Self { + error_type: error_type.to_string(), + error_message: error_message.to_string(), + stack_trace: None, + failing_tool: None, + context: HashMap::new(), + } + } + + pub fn with_stack_trace(mut self, stack_trace: &str) -> Self { + self.stack_trace = Some(stack_trace.to_string()); + self + } + + pub fn with_failing_tool(mut self, tool_name: &str) -> Self { + self.failing_tool = Some(tool_name.to_string()); + self + } + + pub fn with_context(mut self, key: &str, value: &str) -> Self { + self.context.insert(key.to_string(), value.to_string()); + self + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "event_type", rename_all = "snake_case")] +pub enum TelemetryEvent { + RecipeExecution(RecipeExecution), + SessionExecution(SessionExecution), + CommandExecution(CommandExecution), + SystemMetrics(SystemMetrics), + UserSession(UserSession), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SessionExecution { + pub session_id: String, + pub session_type: SessionType, + pub start_time: u64, + pub end_time: Option, + pub duration_ms: Option, + pub result: Option, + pub user_id: String, + pub usage_type: UsageType, + pub environment: Option, + pub metadata: HashMap, + pub token_usage: Option, + pub tool_usage: Vec, + pub message_count: u64, + pub turn_count: u64, + pub error_details: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum SessionType { + Interactive, + Headless, + Scheduled, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum SessionResult { + Success, + Error(String), + Interrupted, + Cancelled, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommandExecution { + pub command_name: String, + pub command_type: CommandType, + pub start_time: u64, + pub end_time: Option, + pub duration_ms: Option, + pub result: Option, + pub user_id: String, + pub usage_type: UsageType, + pub environment: Option, + pub metadata: HashMap, + pub arguments: HashMap, + pub error_details: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum CommandType { + Configure, + Info, + SessionList, + SessionRemove, + SessionExport, + ProjectOpen, + ProjectsList, + RecipeValidate, + RecipeDeeplink, + ScheduleAdd, + ScheduleList, + ScheduleRemove, + ScheduleRunNow, + ScheduleSessions, + Update, + Bench, + Web, + Mcp, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum CommandResult { + Success, + Error(String), + Cancelled, +} + +impl SessionExecution { + pub fn new(session_id: &str, session_type: SessionType) -> Self { + let start_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + Self { + session_id: session_id.to_string(), + session_type, + start_time, + end_time: None, + duration_ms: None, + result: None, + user_id: String::new(), + usage_type: UsageType::Human, + environment: None, + metadata: HashMap::new(), + token_usage: None, + tool_usage: Vec::new(), + message_count: 0, + turn_count: 0, + error_details: None, + } + } +} + +impl TelemetryExecution for SessionExecution { + type ResultType = SessionResult; + + fn start_time(&self) -> u64 { + self.start_time + } + fn end_time(&self) -> Option { + self.end_time + } + fn set_end_time(&mut self, time: u64) { + self.end_time = Some(time); + } + fn duration_ms(&self) -> Option { + self.duration_ms + } + fn set_duration_ms(&mut self, duration: u64) { + self.duration_ms = Some(duration); + } + fn result(&self) -> Option<&Self::ResultType> { + self.result.as_ref() + } + fn set_result(&mut self, result: Self::ResultType) { + self.result = Some(result); + } + fn user_id(&self) -> &str { + &self.user_id + } + fn set_user_id(&mut self, user_id: String) { + self.user_id = user_id; + } + fn usage_type(&self) -> &UsageType { + &self.usage_type + } + fn set_usage_type(&mut self, usage_type: UsageType) { + self.usage_type = usage_type; + } + fn environment(&self) -> Option<&String> { + self.environment.as_ref() + } + fn set_environment(&mut self, environment: String) { + self.environment = Some(environment); + } + fn metadata(&self) -> &HashMap { + &self.metadata + } + fn metadata_mut(&mut self) -> &mut HashMap { + &mut self.metadata + } + fn error_details(&self) -> Option<&ErrorDetails> { + self.error_details.as_ref() + } + fn set_error_details(&mut self, error_details: ErrorDetails) { + self.error_details = Some(error_details); + } +} + +impl SessionMetadataSupport for SessionExecution { + fn token_usage(&self) -> Option<&TokenUsage> { + self.token_usage.as_ref() + } + fn set_token_usage(&mut self, token_usage: TokenUsage) { + self.token_usage = Some(token_usage); + } + fn tool_usage(&self) -> &Vec { + &self.tool_usage + } + fn tool_usage_mut(&mut self) -> &mut Vec { + &mut self.tool_usage + } + fn message_count(&self) -> u64 { + self.message_count + } + fn set_message_count(&mut self, count: u64) { + self.message_count = count; + } + fn turn_count(&self) -> u64 { + self.turn_count + } + fn set_turn_count(&mut self, count: u64) { + self.turn_count = count; + } +} + +impl CommandExecution { + pub fn new(command_name: &str, command_type: CommandType) -> Self { + let start_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + Self { + command_name: command_name.to_string(), + command_type, + start_time, + end_time: None, + duration_ms: None, + result: None, + user_id: String::new(), + usage_type: UsageType::Human, + environment: None, + metadata: HashMap::new(), + arguments: HashMap::new(), + error_details: None, + } + } + + pub fn with_argument(mut self, key: &str, value: &str) -> Self { + self.arguments.insert(key.to_string(), value.to_string()); + self + } +} + +impl TelemetryExecution for CommandExecution { + type ResultType = CommandResult; + + fn start_time(&self) -> u64 { + self.start_time + } + fn end_time(&self) -> Option { + self.end_time + } + fn set_end_time(&mut self, time: u64) { + self.end_time = Some(time); + } + fn duration_ms(&self) -> Option { + self.duration_ms + } + fn set_duration_ms(&mut self, duration: u64) { + self.duration_ms = Some(duration); + } + fn result(&self) -> Option<&Self::ResultType> { + self.result.as_ref() + } + fn set_result(&mut self, result: Self::ResultType) { + self.result = Some(result); + } + fn user_id(&self) -> &str { + &self.user_id + } + fn set_user_id(&mut self, user_id: String) { + self.user_id = user_id; + } + fn usage_type(&self) -> &UsageType { + &self.usage_type + } + fn set_usage_type(&mut self, usage_type: UsageType) { + self.usage_type = usage_type; + } + fn environment(&self) -> Option<&String> { + self.environment.as_ref() + } + fn set_environment(&mut self, environment: String) { + self.environment = Some(environment); + } + fn metadata(&self) -> &HashMap { + &self.metadata + } + fn metadata_mut(&mut self) -> &mut HashMap { + &mut self.metadata + } + fn error_details(&self) -> Option<&ErrorDetails> { + self.error_details.as_ref() + } + fn set_error_details(&mut self, error_details: ErrorDetails) { + self.error_details = Some(error_details); + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SystemMetrics { + pub timestamp: u64, + pub memory_usage: Option, + pub cpu_usage: Option, + pub active_sessions: u64, + pub load_average: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UserSession { + pub session_id: String, + pub user_id: String, + pub start_time: u64, + pub end_time: Option, + pub duration_seconds: Option, + pub usage_type: UsageType, + pub recipe_count: u64, + pub total_tokens: u64, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_recipe_execution_creation() { + let execution = RecipeExecution::new("test-recipe", "1.0.0"); + + assert_eq!(execution.recipe_name, "test-recipe"); + assert_eq!(execution.recipe_version, "1.0.0"); + assert!(execution.start_time > 0); + assert!(execution.end_time.is_none()); + assert!(execution.result.is_none()); + } + + #[test] + fn test_recipe_execution_completion() { + let execution = + RecipeExecution::new("test-recipe", "1.0.0").with_result(SessionResult::Success); + + assert!(execution.end_time.is_some()); + assert!(execution.duration_ms.is_some()); + assert_eq!(execution.result, Some(SessionResult::Success)); + } + + #[test] + fn test_recipe_execution_metadata() { + let execution = RecipeExecution::new("test-recipe", "1.0.0") + .with_metadata("key1", "value1") + .with_metadata("key2", "value2"); + + assert_eq!(execution.metadata.get("key1"), Some(&"value1".to_string())); + assert_eq!(execution.metadata.get("key2"), Some(&"value2".to_string())); + } + + #[test] + fn test_recipe_execution_environment() { + let execution = + RecipeExecution::new("test-recipe", "1.0.0").with_environment("macos,x86_64,iterm"); + + assert_eq!( + execution.environment, + Some("macos,x86_64,iterm".to_string()) + ); + } + + #[test] + fn test_token_usage() { + let token_usage = TokenUsage::new(100, 50) + .with_cost(0.01) + .with_model("gpt-4", "openai"); + + assert_eq!(token_usage.input_tokens, 100); + assert_eq!(token_usage.output_tokens, 50); + assert_eq!(token_usage.total_tokens, 150); + assert_eq!(token_usage.estimated_cost, Some(0.01)); + assert_eq!(token_usage.model, Some("gpt-4".to_string())); + assert_eq!(token_usage.provider, Some("openai".to_string())); + } + + #[test] + fn test_tool_usage() { + let mut tool_usage = ToolUsage::new("test-tool"); + + tool_usage.add_call(Duration::from_millis(100), true); + tool_usage.add_call(Duration::from_millis(200), false); + + assert_eq!(tool_usage.call_count, 2); + assert_eq!(tool_usage.total_duration_ms, 300); + assert_eq!(tool_usage.avg_duration_ms, 150); + assert_eq!(tool_usage.success_count, 1); + assert_eq!(tool_usage.error_count, 1); + } + + #[test] + fn test_error_details() { + let error_details = ErrorDetails::new("validation_error", "Invalid input") + .with_stack_trace("stack trace here") + .with_failing_tool("input-validator") + .with_context("input_type", "json"); + + assert_eq!(error_details.error_type, "validation_error"); + assert_eq!(error_details.error_message, "Invalid input"); + assert_eq!( + error_details.stack_trace, + Some("stack trace here".to_string()) + ); + assert_eq!( + error_details.failing_tool, + Some("input-validator".to_string()) + ); + assert_eq!( + error_details.context.get("input_type"), + Some(&"json".to_string()) + ); + } + + #[test] + fn test_telemetry_event_serialization() { + let execution = RecipeExecution::new("test-recipe", "1.0.0"); + let event = TelemetryEvent::RecipeExecution(execution); + + let json = serde_json::to_string(&event).unwrap(); + let deserialized: TelemetryEvent = serde_json::from_str(&json).unwrap(); + + match deserialized { + TelemetryEvent::RecipeExecution(exec) => { + assert_eq!(exec.recipe_name, "test-recipe"); + assert_eq!(exec.recipe_version, "1.0.0"); + } + _ => panic!("Expected RecipeExecution event"), + } + } + + #[test] + fn test_telemetry_execution_trait() { + use crate::telemetry::config::UsageType; + + let mut execution = RecipeExecution::new("test-recipe", "1.0.0"); + + // Test trait methods + assert!(execution.start_time() > 0); + assert!(execution.end_time().is_none()); + assert_eq!(execution.user_id(), ""); + assert_eq!(execution.usage_type(), &UsageType::Human); + + // Test builder methods from trait + let execution = execution + .with_metadata("key1", "value1") + .with_environment("test-env") + .with_result(SessionResult::Success); + + assert_eq!( + execution.metadata().get("key1"), + Some(&"value1".to_string()) + ); + assert_eq!(execution.environment(), Some(&"test-env".to_string())); + assert_eq!(execution.result(), Some(&SessionResult::Success)); + assert!(execution.end_time().is_some()); + } + + #[test] + fn test_session_metadata_support_trait() { + let mut execution = RecipeExecution::new("test-recipe", "1.0.0"); + + // Test session metadata support methods + assert_eq!(execution.message_count(), 0); + assert_eq!(execution.turn_count(), 0); + assert!(execution.token_usage().is_none()); + assert!(execution.tool_usage().is_empty()); + + // Test builder methods from trait + let token_usage = TokenUsage::new(100, 50); + let execution = execution + .with_message_count(10) + .with_turn_count(5) + .with_token_usage(token_usage); + + assert_eq!(execution.message_count(), 10); + assert_eq!(execution.turn_count(), 5); + assert!(execution.token_usage().is_some()); + assert_eq!(execution.token_usage().unwrap().total_tokens, 150); + } + + #[test] + fn test_command_execution_trait() { + let mut execution = CommandExecution::new("test-command", CommandType::Configure); + + // Test trait methods + assert!(execution.start_time() > 0); + assert!(execution.end_time().is_none()); + assert_eq!(execution.user_id(), ""); + + // Test builder methods from trait + let execution = execution + .with_metadata("key1", "value1") + .with_environment("test-env") + .with_result(CommandResult::Success); + + assert_eq!( + execution.metadata().get("key1"), + Some(&"value1".to_string()) + ); + assert_eq!(execution.environment(), Some(&"test-env".to_string())); + assert_eq!(execution.result(), Some(&CommandResult::Success)); + assert!(execution.end_time().is_some()); + } + + #[test] + fn test_session_execution_trait() { + let mut execution = SessionExecution::new("test-session", SessionType::Interactive); + + // Test trait methods + assert!(execution.start_time() > 0); + assert!(execution.end_time().is_none()); + assert_eq!(execution.user_id(), ""); + + // Test builder methods from trait + let execution = execution + .with_metadata("key1", "value1") + .with_environment("test-env") + .with_result(SessionResult::Success); + + assert_eq!( + execution.metadata().get("key1"), + Some(&"value1".to_string()) + ); + assert_eq!(execution.environment(), Some(&"test-env".to_string())); + assert_eq!(execution.result(), Some(&SessionResult::Success)); + assert!(execution.end_time().is_some()); + } +} diff --git a/crates/goose/src/telemetry/macros.rs b/crates/goose/src/telemetry/macros.rs new file mode 100644 index 000000000000..6e7e250e304b --- /dev/null +++ b/crates/goose/src/telemetry/macros.rs @@ -0,0 +1,173 @@ +#[macro_export] +macro_rules! track_telemetry { + (session: ($session_id:expr, $session_type:expr) => $body:block) => {{ + use goose::telemetry::{TelemetryExecution, SessionMetadataSupport}; + + let start_time = std::time::Instant::now(); + + let telemetry_execution = goose::telemetry::global_telemetry().map(|_manager| { + goose::telemetry::SessionExecution::new($session_id, $session_type) + .with_metadata("interface", "cli") + }); + + if goose::telemetry::global_telemetry().is_none() { + tracing::debug!("Telemetry is disabled or not initialized for session tracking"); + } + + let result = async move $body.await; + + if let Some(mut execution) = telemetry_execution { + let duration = start_time.elapsed(); + + match &result { + Ok((_, session)) => { + let messages = session.message_history(); + if let Ok(session_metadata) = session.get_metadata() { + execution = execution.with_session_metadata(&session_metadata); + } + + let tool_usage = goose::telemetry::extract_tool_usage_from_messages(&messages); + for tool in tool_usage { + execution.add_tool_usage(tool); + } + + if let Some(env) = goose::telemetry::detect_environment() { + execution = execution.with_environment(&env); + } + + execution = execution + .with_message_count(messages.len() as u64) + .with_turn_count(messages.len() as u64) + .with_result(goose::telemetry::SessionResult::Success) + .with_duration(duration); + } + Err(e) => { + execution = execution + .with_result(goose::telemetry::SessionResult::Error(e.to_string())) + .with_duration(duration); + } + } + + if let Some(manager) = goose::telemetry::global_telemetry() { + if let Err(e) = manager.track_session_execution(execution).await { + tracing::warn!("Failed to track session execution: {}", e); + } + } + } + + result.map(|(result, _)| result) + }}; + + (command: ($command_name:expr, $command_type:expr) => $body:block) => {{ + use goose::telemetry::TelemetryExecution; + + let start_time = std::time::Instant::now(); + + let telemetry_execution = goose::telemetry::global_telemetry() + .map(|_manager| goose::telemetry::CommandExecution::new($command_name, $command_type)); + + if goose::telemetry::global_telemetry().is_none() { + tracing::debug!("Telemetry is disabled or not initialized for command tracking"); + } + + let result = async move $body.await; + + if let Some(mut execution) = telemetry_execution { + let duration = start_time.elapsed(); + + match &result { + Ok(_) => { + execution = execution + .with_result(goose::telemetry::CommandResult::Success) + .with_duration(duration); + } + Err(e) => { + execution = execution + .with_result(goose::telemetry::CommandResult::Error(e.to_string())) + .with_duration(duration); + } + } + + if let Some(env) = goose::telemetry::detect_environment() { + execution = execution.with_environment(&env); + } + + if let Some(manager) = goose::telemetry::global_telemetry() { + if let Err(e) = manager.track_command_execution(execution).await { + tracing::warn!("Failed to track command execution: {}", e); + } + } + } + + result + }}; + + (recipe: ($recipe_name:expr, $recipe_version:expr, $params:expr) => $body:block) => {{ + use goose::telemetry::SessionMetadataSupport; + + let start_time = std::time::Instant::now(); + + let telemetry_execution = goose::telemetry::global_telemetry() + .map(|manager| manager.recipe_execution($recipe_name, $recipe_version)); + + if goose::telemetry::global_telemetry().is_none() { + tracing::debug!("Telemetry is disabled or not initialized for recipe tracking"); + } + + let result = async move $body.await; + + if let Some(execution_builder) = telemetry_execution { + let duration = start_time.elapsed(); + let execution = match &result { + Ok((_, session)) => { + let mut builder = execution_builder + .with_result(goose::telemetry::SessionResult::Success) + .with_duration(duration); + + let messages = session.message_history(); + if let Ok(session_metadata) = session.get_metadata() { + builder = builder.with_session_metadata(&session_metadata); + } + + let tool_usage = goose::telemetry::extract_tool_usage_from_messages(&messages); + for tool in tool_usage { + builder = builder.add_tool_usage(tool); + } + + for (key, value) in &$params { + builder = builder.with_metadata(key, value); + } + + if let Some(env) = goose::telemetry::detect_environment() { + builder = builder.with_environment(&env); + } + + builder = builder + .with_message_count(messages.len() as u64) + .with_turn_count(messages.len() as u64); + + builder.build() + } + Err(e) => { + let mut builder = execution_builder + .with_result(goose::telemetry::SessionResult::Error(e.to_string())) + .with_duration(duration); + + for (key, value) in &$params { + builder = builder.with_metadata(key, value); + } + + builder.build() + } + }; + + if let Some(manager) = goose::telemetry::global_telemetry() { + if let Err(e) = manager.track_recipe_execution(execution).await { + tracing::warn!("Failed to track recipe execution: {}", e); + } + } + } + + result.map(|(result, _)| result) + }}; +} diff --git a/crates/goose/src/telemetry/manager.rs b/crates/goose/src/telemetry/manager.rs new file mode 100644 index 000000000000..c1f0e1429f82 --- /dev/null +++ b/crates/goose/src/telemetry/manager.rs @@ -0,0 +1,591 @@ +use crate::telemetry::{ + config::TelemetryConfig, + events::{ + CommandExecution, RecipeExecution, SessionExecution, SessionMetadataSupport, + TelemetryEvent, TelemetryExecution, + }, + providers::{create_backend, TelemetryBackend}, + user::UserIdentity, +}; +use std::sync::Arc; +use tokio::sync::Mutex; + +pub struct TelemetryManager { + config: TelemetryConfig, + backend: Option>>>, + user_identity: Option, + enabled: bool, +} + +impl TelemetryManager { + pub async fn new() -> Result> { + let config = TelemetryConfig::from_env(); + + if let Err(e) = config.validate() { + tracing::warn!("Telemetry configuration invalid: {}", e); + return Ok(Self { + config, + backend: None, + user_identity: None, + enabled: false, + }); + } + + let mut manager = Self { + config: config.clone(), + backend: None, + user_identity: None, + enabled: config.enabled, + }; + + if manager.enabled { + match manager.initialize().await { + Ok(()) => { + eprintln!( + "🔧 Telemetry initialized successfully with provider: {:?}", + config.provider + ); + tracing::info!( + "Telemetry initialized successfully with provider: {:?}", + config.provider + ); + } + Err(e) => { + eprintln!( + "❌ Failed to initialize telemetry: {}. Continuing without telemetry.", + e + ); + tracing::warn!( + "Failed to initialize telemetry: {}. Continuing without telemetry.", + e + ); + manager.enabled = false; + } + } + } else { + eprintln!("⚠️ Telemetry disabled (GOOSE_TELEMETRY_ENABLED not set to true)"); + tracing::debug!("Telemetry disabled"); + } + + Ok(manager) + } + + async fn initialize(&mut self) -> Result<(), Box> { + self.user_identity = Some(UserIdentity::load_or_create().await?); + + let mut backend = create_backend(&self.config); + backend.initialize(&self.config).await?; + self.backend = Some(Arc::new(Mutex::new(backend))); + + Ok(()) + } + + pub async fn track_recipe_execution( + &self, + mut execution: RecipeExecution, + ) -> Result<(), Box> { + if !self.enabled { + return Ok(()); + } + + if let Some(user_identity) = &self.user_identity { + execution.user_id = user_identity.user_id.clone(); + execution.usage_type = user_identity.usage_type.clone(); + } + + if let Some(environment) = &self.config.environment { + execution.environment = Some(environment.clone()); + } + + execution.complete(); + + if let Some(backend) = &self.backend { + let backend_guard = backend.lock().await; + let event = TelemetryEvent::RecipeExecution(execution.clone()); + + if let Err(e) = backend_guard.send_event(&event).await { + tracing::warn!("Failed to send telemetry event: {}", e); + } + } + + Ok(()) + } + + pub async fn track_recipe_executions( + &self, + executions: Vec, + ) -> Result<(), Box> { + if !self.enabled { + return Ok(()); + } + + for execution in executions { + self.track_recipe_execution(execution).await?; + } + + Ok(()) + } + + pub async fn track_session_execution( + &self, + mut execution: SessionExecution, + ) -> Result<(), Box> { + if !self.enabled { + return Ok(()); + } + + if let Some(user_identity) = &self.user_identity { + execution.user_id = user_identity.user_id.clone(); + execution.usage_type = user_identity.usage_type.clone(); + } + + if let Some(environment) = &self.config.environment { + execution.environment = Some(environment.clone()); + } + + execution.complete(); + + if let Some(backend) = &self.backend { + let backend_guard = backend.lock().await; + let event = TelemetryEvent::SessionExecution(execution); + + if let Err(e) = backend_guard.send_event(&event).await { + tracing::warn!("Failed to send telemetry event: {}", e); + } + } + + Ok(()) + } + + pub async fn track_command_execution( + &self, + mut execution: CommandExecution, + ) -> Result<(), Box> { + if !self.enabled { + return Ok(()); + } + + if let Some(user_identity) = &self.user_identity { + execution.user_id = user_identity.user_id.clone(); + execution.usage_type = user_identity.usage_type.clone(); + } + + if let Some(environment) = &self.config.environment { + execution.environment = Some(environment.clone()); + } + + execution.complete(); + + if let Some(backend) = &self.backend { + let backend_guard = backend.lock().await; + let event = TelemetryEvent::CommandExecution(execution); + + if let Err(e) = backend_guard.send_event(&event).await { + tracing::warn!("Failed to send telemetry event: {}", e); + } + } + + Ok(()) + } + + pub fn get_user_identity(&self) -> Option<&UserIdentity> { + self.user_identity.as_ref() + } + + pub fn is_enabled(&self) -> bool { + self.enabled + } + + pub fn get_config(&self) -> &TelemetryConfig { + &self.config + } + + pub async fn shutdown(&self) -> Result<(), Box> { + if let Some(backend) = &self.backend { + let backend_guard = backend.lock().await; + if let Err(e) = backend_guard.shutdown().await { + tracing::warn!("Error during telemetry shutdown: {}", e); + } else { + tracing::debug!("Telemetry shutdown successfully"); + } + } + + Ok(()) + } + + pub fn recipe_execution(&self, name: &str, version: &str) -> RecipeExecutionBuilder { + RecipeExecutionBuilder::new(name, version) + } +} + +pub struct RecipeExecutionBuilder { + execution: RecipeExecution, +} + +impl RecipeExecutionBuilder { + fn new(name: &str, version: &str) -> Self { + Self { + execution: RecipeExecution::new(name, version), + } + } + + pub fn with_metadata(mut self, key: &str, value: &str) -> Self { + self.execution = self.execution.with_metadata(key, value); + self + } + + pub fn with_token_usage(mut self, token_usage: crate::telemetry::events::TokenUsage) -> Self { + self.execution = self.execution.with_token_usage(token_usage); + self + } + + pub fn add_tool_usage(mut self, tool_usage: crate::telemetry::events::ToolUsage) -> Self { + self.execution.add_tool_usage(tool_usage); + self + } + + pub fn with_error_details( + mut self, + error_details: crate::telemetry::events::ErrorDetails, + ) -> Self { + self.execution = self.execution.with_error_details(error_details); + self + } + + pub fn with_result(mut self, result: crate::telemetry::events::SessionResult) -> Self { + self.execution = self.execution.with_result(result); + self + } + + pub fn with_duration(mut self, duration: std::time::Duration) -> Self { + self.execution = self.execution.with_duration(duration); + self + } + + pub fn with_environment(mut self, environment: &str) -> Self { + self.execution = self.execution.with_environment(environment); + self + } + + pub fn with_message_count(mut self, count: u64) -> Self { + self.execution = self.execution.with_message_count(count); + self + } + + pub fn with_turn_count(mut self, count: u64) -> Self { + self.execution = self.execution.with_turn_count(count); + self + } + + pub fn with_session_metadata( + mut self, + session_metadata: &crate::session::storage::SessionMetadata, + ) -> Self { + self.execution = self.execution.with_session_metadata(session_metadata); + self + } + + pub fn build(self) -> RecipeExecution { + self.execution + } + + pub async fn track( + self, + manager: &TelemetryManager, + ) -> Result<(), Box> { + manager.track_recipe_execution(self.execution).await + } +} + +static GLOBAL_TELEMETRY: once_cell::sync::OnceCell> = + once_cell::sync::OnceCell::new(); + +pub async fn init_global_telemetry() -> Result<(), Box> { + let manager = TelemetryManager::new().await?; + GLOBAL_TELEMETRY + .set(Arc::new(manager)) + .map_err(|_| "Global telemetry already initialized")?; + Ok(()) +} + +pub fn global_telemetry() -> Option> { + GLOBAL_TELEMETRY.get().cloned() +} + +pub async fn shutdown_global_telemetry() -> Result<(), Box> { + if let Some(manager) = GLOBAL_TELEMETRY.get() { + manager.shutdown().await?; + } + Ok(()) +} + +#[macro_export] +macro_rules! track_recipe { + ($name:expr, $version:expr) => {{ + $crate::telemetry::RecipeExecutionBuilder::new($name, $version) + }}; +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::telemetry::{ + config::{TelemetryConfig, TelemetryProvider}, + events::{SessionResult, TokenUsage}, + providers::TelemetryBackend, + }; + use std::env; + use std::sync::Arc; + use tokio::sync::Mutex; + + // Mock backend for fast testing + struct MockTelemetryBackend { + events: Arc>>, + } + + impl MockTelemetryBackend { + fn new() -> Self { + Self { + events: Arc::new(Mutex::new(Vec::new())), + } + } + + async fn get_events(&self) -> Vec { + self.events.lock().await.clone() + } + } + + #[async_trait::async_trait] + impl TelemetryBackend for MockTelemetryBackend { + async fn initialize( + &mut self, + _config: &TelemetryConfig, + ) -> Result<(), Box> { + Ok(()) + } + + async fn send_event( + &self, + event: &TelemetryEvent, + ) -> Result<(), Box> { + self.events.lock().await.push(event.clone()); + Ok(()) + } + + async fn shutdown(&self) -> Result<(), Box> { + Ok(()) + } + } + + // Fast unit tests + #[test] + fn test_telemetry_manager_disabled_sync() { + let config = TelemetryConfig { + enabled: false, + provider: TelemetryProvider::Console, + endpoint: None, + api_key: None, + usage_type: None, + environment: None, + service_name: "goose".to_string(), + service_version: "1.0.0".to_string(), + }; + + let manager = TelemetryManager { + config, + backend: None, + user_identity: None, + enabled: false, + }; + + assert!(!manager.is_enabled()); + assert!(manager.get_user_identity().is_none()); + } + + #[test] + fn test_recipe_execution_builder_sync() { + let builder = RecipeExecutionBuilder::new("test-recipe", "1.0.0"); + let execution = builder + .with_metadata("key1", "value1") + .with_token_usage(TokenUsage::new(100, 50)) + .with_result(SessionResult::Success) + .build(); + + assert_eq!(execution.recipe_name, "test-recipe"); + assert_eq!(execution.recipe_version, "1.0.0"); + assert_eq!(execution.metadata.get("key1"), Some(&"value1".to_string())); + assert!(execution.token_usage.is_some()); + assert_eq!(execution.result, Some(SessionResult::Success)); + } + + #[tokio::test] + async fn test_telemetry_manager_with_mock_backend() { + let mock_backend = MockTelemetryBackend::new(); + let config = TelemetryConfig { + enabled: true, + provider: TelemetryProvider::Console, + endpoint: None, + api_key: None, + usage_type: None, + environment: Some("test".to_string()), + service_name: "goose".to_string(), + service_version: "1.0.0".to_string(), + }; + + let manager = TelemetryManager { + config, + backend: Some(Arc::new(Mutex::new(Box::new(mock_backend)))), + user_identity: Some(crate::telemetry::user::UserIdentity { + user_id: "test-user".to_string(), + usage_type: crate::telemetry::config::UsageType::Human, + first_seen: chrono::Utc::now(), + last_seen: chrono::Utc::now(), + }), + enabled: true, + }; + + let execution = + RecipeExecution::new("test-recipe", "1.0.0").with_result(SessionResult::Success); + + let result = manager.track_recipe_execution(execution).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_batch_tracking_with_mock() { + let mock_backend = MockTelemetryBackend::new(); + let events_ref = mock_backend.events.clone(); + + let config = TelemetryConfig { + enabled: true, + provider: TelemetryProvider::Console, + endpoint: None, + api_key: None, + usage_type: None, + environment: None, + service_name: "goose".to_string(), + service_version: "1.0.0".to_string(), + }; + + let manager = TelemetryManager { + config, + backend: Some(Arc::new(Mutex::new(Box::new(mock_backend)))), + user_identity: None, + enabled: true, + }; + + let executions = vec![ + RecipeExecution::new("recipe1", "1.0.0").with_result(SessionResult::Success), + RecipeExecution::new("recipe2", "1.0.0") + .with_result(SessionResult::Error("test error".to_string())), + RecipeExecution::new("recipe3", "1.0.0").with_result(SessionResult::Cancelled), + ]; + + let result = manager.track_recipe_executions(executions).await; + assert!(result.is_ok()); + + let events = events_ref.lock().await; + assert_eq!(events.len(), 3); + } + + // Integration tests (slower, marked with ignore for optional running) + #[tokio::test] + #[ignore = "slow integration test"] + async fn test_telemetry_manager_disabled_integration() { + env::remove_var("GOOSE_TELEMETRY_ENABLED"); + env::remove_var("GOOSE_TELEMETRY_API_KEY"); + env::remove_var("OTEL_EXPORTER_OTLP_ENDPOINT"); + + let manager = TelemetryManager::new().await.unwrap(); + assert!(!manager.is_enabled()); + + let execution = RecipeExecution::new("test", "1.0.0"); + let result = manager.track_recipe_execution(execution).await; + assert!(result.is_ok()); + } + + #[tokio::test] + #[ignore = "slow integration test"] + async fn test_telemetry_manager_console_integration() { + env::set_var("GOOSE_TELEMETRY_ENABLED", "true"); + env::set_var("GOOSE_TELEMETRY_PROVIDER", "console"); + + let manager = TelemetryManager::new().await.unwrap(); + assert!(manager.is_enabled()); + assert_eq!(manager.get_config().provider, TelemetryProvider::Console); + + let execution = RecipeExecution::new("test-recipe", "1.0.0") + .with_result(SessionResult::Success) + .with_token_usage(TokenUsage::new(100, 50)); + + let result = manager.track_recipe_execution(execution).await; + assert!(result.is_ok()); + + manager.shutdown().await.unwrap(); + env::remove_var("GOOSE_TELEMETRY_ENABLED"); + env::remove_var("GOOSE_TELEMETRY_PROVIDER"); + } + + #[tokio::test] + #[ignore = "slow integration test"] + async fn test_recipe_execution_builder_integration() { + env::set_var("GOOSE_TELEMETRY_ENABLED", "true"); + env::set_var("GOOSE_TELEMETRY_PROVIDER", "console"); + + let manager = TelemetryManager::new().await.unwrap(); + + let execution = manager + .recipe_execution("test-recipe", "1.0.0") + .with_metadata("key1", "value1") + .with_token_usage(TokenUsage::new(100, 50)) + .with_result(SessionResult::Success) + .build(); + + assert_eq!(execution.recipe_name, "test-recipe"); + assert_eq!(execution.recipe_version, "1.0.0"); + assert_eq!(execution.metadata.get("key1"), Some(&"value1".to_string())); + assert!(execution.token_usage.is_some()); + assert_eq!(execution.result, Some(SessionResult::Success)); + + let result = manager.track_recipe_execution(execution).await; + assert!(result.is_ok()); + + manager.shutdown().await.unwrap(); + env::remove_var("GOOSE_TELEMETRY_ENABLED"); + env::remove_var("GOOSE_TELEMETRY_PROVIDER"); + } + + #[tokio::test] + #[ignore = "slow integration test"] + async fn test_global_telemetry_integration() { + init_global_telemetry().await.unwrap(); + + let manager = global_telemetry().unwrap(); + assert!(!manager.is_enabled()); + + shutdown_global_telemetry().await.unwrap(); + } + + #[tokio::test] + #[ignore = "slow integration test"] + async fn test_batch_tracking_integration() { + env::set_var("GOOSE_TELEMETRY_ENABLED", "true"); + env::set_var("GOOSE_TELEMETRY_PROVIDER", "console"); + + let manager = TelemetryManager::new().await.unwrap(); + + let executions = vec![ + RecipeExecution::new("recipe1", "1.0.0").with_result(SessionResult::Success), + RecipeExecution::new("recipe2", "1.0.0") + .with_result(SessionResult::Error("test error".to_string())), + RecipeExecution::new("recipe3", "1.0.0").with_result(SessionResult::Cancelled), + ]; + + let result = manager.track_recipe_executions(executions).await; + assert!(result.is_ok()); + + manager.shutdown().await.unwrap(); + env::remove_var("GOOSE_TELEMETRY_ENABLED"); + env::remove_var("GOOSE_TELEMETRY_PROVIDER"); + } +} diff --git a/crates/goose/src/telemetry/mod.rs b/crates/goose/src/telemetry/mod.rs new file mode 100644 index 000000000000..3c838c16275a --- /dev/null +++ b/crates/goose/src/telemetry/mod.rs @@ -0,0 +1,24 @@ +mod config; +mod environment; +mod events; +mod macros; +mod manager; +mod providers; +mod user; +mod utils; + +pub use { + config::{TelemetryConfig, TelemetryProvider, UsageType}, + environment::detect_environment, + events::{ + CommandExecution, CommandResult, CommandType, ErrorDetails, RecipeExecution, + SessionExecution, SessionMetadataSupport, SessionResult, SessionType, TelemetryEvent, + TelemetryExecution, TokenUsage, ToolUsage, + }, + manager::{ + global_telemetry, init_global_telemetry, shutdown_global_telemetry, RecipeExecutionBuilder, + TelemetryManager, + }, + user::{detect_usage_type, UserIdentity}, + utils::extract_tool_usage_from_messages, +}; diff --git a/crates/goose/src/telemetry/providers/console.rs b/crates/goose/src/telemetry/providers/console.rs new file mode 100644 index 000000000000..8bccc492cdc7 --- /dev/null +++ b/crates/goose/src/telemetry/providers/console.rs @@ -0,0 +1,284 @@ +use crate::telemetry::{ + config::TelemetryConfig, + events::{RecipeExecution, TelemetryEvent}, +}; +use opentelemetry::{ + metrics::{Counter, Histogram, MeterProvider}, + trace::{Span, Tracer, TracerProvider as OtelTracerProvider}, + KeyValue, +}; +use opentelemetry_sdk::{ + metrics::{PeriodicReader, SdkMeterProvider}, + trace::SdkTracerProvider, + Resource, +}; +use opentelemetry_semantic_conventions as semconv; +use std::time::Duration; + +pub struct ConsoleProvider { + tracer_provider: Option, + meter_provider: Option, + recipe_counter: Option>, + recipe_duration: Option>, + token_counter: Option>, + tool_counter: Option>, + initialized: bool, +} + +impl ConsoleProvider { + pub fn new() -> Self { + Self { + tracer_provider: None, + meter_provider: None, + recipe_counter: None, + recipe_duration: None, + token_counter: None, + tool_counter: None, + initialized: false, + } + } + + async fn init_metrics(&mut self) -> Result<(), Box> { + if let Some(meter_provider) = &self.meter_provider { + let meter = meter_provider.meter("goose"); + + self.recipe_counter = Some( + meter + .u64_counter("goose.recipe.executions") + .with_description("Number of recipe executions") + .build(), + ); + + self.recipe_duration = Some( + meter + .f64_histogram("goose.recipe.duration") + .with_description("Recipe execution duration in seconds") + .build(), + ); + + self.token_counter = Some( + meter + .u64_counter("goose.tokens.used") + .with_description("Number of tokens used") + .build(), + ); + + self.tool_counter = Some( + meter + .u64_counter("goose.tool.calls") + .with_description("Number of tool calls") + .build(), + ); + } + + Ok(()) + } + + fn record_recipe_execution(&self, execution: &RecipeExecution) { + let attributes = vec![ + KeyValue::new("recipe.name", execution.recipe_name.clone()), + KeyValue::new("recipe.version", execution.recipe_version.clone()), + KeyValue::new("usage.type", format!("{:?}", execution.usage_type)), + KeyValue::new( + "result", + format!( + "{:?}", + execution + .result + .as_ref() + .unwrap_or(&crate::telemetry::events::SessionResult::Success) + ), + ), + ]; + + if let Some(counter) = &self.recipe_counter { + counter.add(1, &attributes); + } + + if let (Some(histogram), Some(duration_ms)) = (&self.recipe_duration, execution.duration_ms) + { + histogram.record(duration_ms as f64 / 1000.0, &attributes); + } + + if let (Some(counter), Some(token_usage)) = (&self.token_counter, &execution.token_usage) { + let token_attributes = [ + attributes.clone(), + vec![ + KeyValue::new("token.type", "input"), + KeyValue::new("model", token_usage.model.clone().unwrap_or_default()), + KeyValue::new("provider", token_usage.provider.clone().unwrap_or_default()), + ], + ] + .concat(); + + counter.add(token_usage.input_tokens, &token_attributes); + + let output_attributes = [ + attributes.clone(), + vec![ + KeyValue::new("token.type", "output"), + KeyValue::new("model", token_usage.model.clone().unwrap_or_default()), + KeyValue::new("provider", token_usage.provider.clone().unwrap_or_default()), + ], + ] + .concat(); + + counter.add(token_usage.output_tokens, &output_attributes); + } + + if let Some(counter) = &self.tool_counter { + for tool_usage in &execution.tool_usage { + let tool_attributes = [ + attributes.clone(), + vec![ + KeyValue::new("tool.name", tool_usage.tool_name.clone()), + KeyValue::new("tool.result", "success"), + ], + ] + .concat(); + + counter.add(tool_usage.success_count, &tool_attributes); + + if tool_usage.error_count > 0 { + let error_attributes = [ + attributes.clone(), + vec![ + KeyValue::new("tool.name", tool_usage.tool_name.clone()), + KeyValue::new("tool.result", "error"), + ], + ] + .concat(); + + counter.add(tool_usage.error_count, &error_attributes); + } + } + } + } + + fn create_recipe_span(&self, execution: &RecipeExecution) { + if let Some(tracer_provider) = &self.tracer_provider { + let tracer = tracer_provider.tracer("goose"); + let mut span = tracer + .span_builder(format!("recipe.{}", execution.recipe_name)) + .with_attributes(vec![ + KeyValue::new("recipe.name", execution.recipe_name.clone()), + KeyValue::new("recipe.version", execution.recipe_version.clone()), + KeyValue::new("usage.type", format!("{:?}", execution.usage_type)), + KeyValue::new("user.id", execution.user_id.clone()), + ]) + .start(&tracer); + + if let Some(result) = &execution.result { + span.set_attribute(KeyValue::new("result", format!("{:?}", result))); + } + + if let Some(duration_ms) = execution.duration_ms { + span.set_attribute(KeyValue::new("duration.ms", duration_ms as i64)); + } + + if let Some(error_details) = &execution.error_details { + span.set_attribute(KeyValue::new( + "error.type", + error_details.error_type.clone(), + )); + span.set_attribute(KeyValue::new( + "error.message", + error_details.error_message.clone(), + )); + span.record_error(&std::io::Error::other(error_details.error_message.clone())); + } + + span.end(); + } + } +} + +#[async_trait::async_trait] +impl super::TelemetryBackend for ConsoleProvider { + async fn initialize( + &mut self, + config: &TelemetryConfig, + ) -> Result<(), Box> { + if self.initialized { + return Ok(()); + } + + let resource = Resource::builder() + .with_attributes(vec![ + KeyValue::new(semconv::resource::SERVICE_NAME, config.service_name.clone()), + KeyValue::new( + semconv::resource::SERVICE_VERSION, + config.service_version.clone(), + ), + KeyValue::new( + "goose.usage_type", + format!( + "{:?}", + config + .usage_type + .as_ref() + .unwrap_or(&crate::telemetry::config::UsageType::Human) + ), + ), + ]) + .build(); + + let tracer_provider = SdkTracerProvider::builder() + .with_resource(resource.clone()) + .with_simple_exporter(opentelemetry_stdout::SpanExporter::default()) + .build(); + + self.tracer_provider = Some(tracer_provider); + + let meter_provider = SdkMeterProvider::builder() + .with_resource(resource) + .with_reader( + PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default()) + .with_interval(Duration::from_secs(30)) + .build(), + ) + .build(); + + self.meter_provider = Some(meter_provider); + + self.init_metrics().await?; + self.initialized = true; + + tracing::info!("Console telemetry provider initialized successfully"); + Ok(()) + } + + async fn send_event( + &self, + event: &TelemetryEvent, + ) -> Result<(), Box> { + if !self.initialized { + return Err("Console provider not initialized".into()); + } + + match event { + TelemetryEvent::RecipeExecution(execution) => { + self.record_recipe_execution(execution); + self.create_recipe_span(execution); + } + TelemetryEvent::SessionExecution(_execution) => { + println!("📊 Session execution telemetry event received"); + } + TelemetryEvent::CommandExecution(_execution) => { + println!("📊 Command execution telemetry event received"); + } + TelemetryEvent::SystemMetrics(_metrics) => {} + TelemetryEvent::UserSession(_session) => {} + } + + Ok(()) + } + + async fn shutdown(&self) -> Result<(), Box> { + if self.initialized { + // In OpenTelemetry 0.30, shutdown is handled by dropping the provider + tracing::info!("Console telemetry provider shutdown successfully"); + } + Ok(()) + } +} diff --git a/crates/goose/src/telemetry/providers/file.rs b/crates/goose/src/telemetry/providers/file.rs new file mode 100644 index 000000000000..6d50580f4716 --- /dev/null +++ b/crates/goose/src/telemetry/providers/file.rs @@ -0,0 +1,137 @@ +use crate::telemetry::{config::TelemetryConfig, events::TelemetryEvent}; +use serde_json; +use std::fs::OpenOptions; +use std::io::Write; +use std::path::Path; +use std::sync::Mutex; + +/// File-based telemetry provider that writes JSON events to a file. mostly for debugging +pub struct FileProvider { + file_path: Option, + file_handle: Option>, + initialized: bool, +} + +impl FileProvider { + pub fn new() -> Self { + Self { + file_path: None, + file_handle: None, + initialized: false, + } + } + + fn write_event_to_file( + &self, + event: &TelemetryEvent, + ) -> Result<(), Box> { + if let Some(file_handle) = &self.file_handle { + let mut file = file_handle + .lock() + .map_err(|e| format!("Failed to lock file: {}", e))?; + + let event_record = serde_json::json!({ + "timestamp": chrono::Utc::now().to_rfc3339(), + "event_type": match event { + TelemetryEvent::RecipeExecution(_) => "recipe_execution", + TelemetryEvent::SessionExecution(_) => "session_execution", + TelemetryEvent::CommandExecution(_) => "command_execution", + TelemetryEvent::SystemMetrics(_) => "system_metrics", + TelemetryEvent::UserSession(_) => "user_session", + }, + "data": event + }); + + writeln!(file, "{}", serde_json::to_string(&event_record)?)?; + file.flush()?; + } + + Ok(()) + } +} + +#[async_trait::async_trait] +impl super::TelemetryBackend for FileProvider { + async fn initialize( + &mut self, + config: &TelemetryConfig, + ) -> Result<(), Box> { + if self.initialized { + return Ok(()); + } + + let file_path = config + .get_endpoint() + .ok_or("File provider requires a file path in GOOSE_TELEMETRY_ENDPOINT")?; + + if let Some(parent) = Path::new(&file_path).parent() { + std::fs::create_dir_all(parent)?; + } + + let file = OpenOptions::new() + .create(true) + .append(true) + .open(&file_path)?; + + self.file_path = Some(file_path.clone()); + self.file_handle = Some(Mutex::new(file)); + self.initialized = true; + + let init_record = serde_json::json!({ + "timestamp": chrono::Utc::now().to_rfc3339(), + "event_type": "telemetry_init", + "data": { + "service_name": config.service_name, + "service_version": config.service_version, + "provider": "file", + "file_path": file_path + } + }); + + if let Some(file_handle) = &self.file_handle { + let mut file = file_handle + .lock() + .map_err(|e| format!("Failed to lock file: {}", e))?; + writeln!(file, "{}", serde_json::to_string(&init_record)?)?; + file.flush()?; + } + + tracing::info!("File telemetry provider initialized: {}", file_path); + Ok(()) + } + + async fn send_event( + &self, + event: &TelemetryEvent, + ) -> Result<(), Box> { + if !self.initialized { + return Err("File provider not initialized".into()); + } + + self.write_event_to_file(event)?; + Ok(()) + } + + async fn shutdown(&self) -> Result<(), Box> { + if self.initialized { + let shutdown_record = serde_json::json!({ + "timestamp": chrono::Utc::now().to_rfc3339(), + "event_type": "telemetry_shutdown", + "data": {} + }); + + if let Some(file_handle) = &self.file_handle { + let mut file = file_handle + .lock() + .map_err(|e| format!("Failed to lock file: {}", e))?; + writeln!(file, "{}", serde_json::to_string(&shutdown_record)?)?; + file.flush()?; + } + + if let Some(file_path) = &self.file_path { + tracing::info!("File telemetry provider shutdown: {}", file_path); + } + } + Ok(()) + } +} diff --git a/crates/goose/src/telemetry/providers/mod.rs b/crates/goose/src/telemetry/providers/mod.rs new file mode 100644 index 000000000000..d27e94ca28af --- /dev/null +++ b/crates/goose/src/telemetry/providers/mod.rs @@ -0,0 +1,31 @@ +use crate::telemetry::{ + config::{TelemetryConfig, TelemetryProvider}, + events::TelemetryEvent, +}; + +pub mod console; +pub mod file; +pub mod otlp; + +#[async_trait::async_trait] +pub trait TelemetryBackend: Send + Sync { + async fn initialize( + &mut self, + config: &TelemetryConfig, + ) -> Result<(), Box>; + + async fn send_event( + &self, + event: &TelemetryEvent, + ) -> Result<(), Box>; + + async fn shutdown(&self) -> Result<(), Box>; +} + +pub fn create_backend(config: &TelemetryConfig) -> Box { + match config.provider { + TelemetryProvider::Console => Box::new(console::ConsoleProvider::new()), + TelemetryProvider::File => Box::new(file::FileProvider::new()), + TelemetryProvider::Otlp => Box::new(otlp::OtlpProvider::new()), + } +} diff --git a/crates/goose/src/telemetry/providers/otlp.rs b/crates/goose/src/telemetry/providers/otlp.rs new file mode 100644 index 000000000000..08c3d6119814 --- /dev/null +++ b/crates/goose/src/telemetry/providers/otlp.rs @@ -0,0 +1,715 @@ +use crate::telemetry::{ + config::TelemetryConfig, + events::{CommandExecution, RecipeExecution, SessionExecution, TelemetryEvent}, +}; +use opentelemetry::{ + metrics::{Counter, Histogram, MeterProvider}, + trace::{Span, Tracer, TracerProvider as OtelTracerProvider}, + KeyValue, +}; +use opentelemetry_sdk::{ + metrics::{PeriodicReader, SdkMeterProvider}, + trace::SdkTracerProvider, + Resource, +}; +use opentelemetry_semantic_conventions as semconv; +use std::time::Duration; + +pub struct OtlpProvider { + tracer_provider: Option, + meter_provider: Option, + recipe_counter: Option>, + recipe_duration: Option>, + session_counter: Option>, + session_duration: Option>, + command_counter: Option>, + command_duration: Option>, + token_counter: Option>, + tool_counter: Option>, + initialized: bool, +} + +impl OtlpProvider { + pub fn new() -> Self { + Self { + tracer_provider: None, + meter_provider: None, + recipe_counter: None, + recipe_duration: None, + session_counter: None, + session_duration: None, + command_counter: None, + command_duration: None, + token_counter: None, + tool_counter: None, + initialized: false, + } + } + + async fn init_metrics(&mut self) -> Result<(), Box> { + if let Some(meter_provider) = &self.meter_provider { + let meter = meter_provider.meter("goose"); + + self.recipe_counter = Some( + meter + .u64_counter("goose.recipe.executions") + .with_description("Number of recipe executions") + .build(), + ); + + self.recipe_duration = Some( + meter + .f64_histogram("goose.recipe.duration") + .with_description("Recipe execution duration in seconds") + .build(), + ); + + self.session_counter = Some( + meter + .u64_counter("goose.session.executions") + .with_description("Number of session executions") + .build(), + ); + + self.session_duration = Some( + meter + .f64_histogram("goose.session.duration") + .with_description("Session execution duration in seconds") + .build(), + ); + + self.command_counter = Some( + meter + .u64_counter("goose.command.executions") + .with_description("Number of command executions") + .build(), + ); + + self.command_duration = Some( + meter + .f64_histogram("goose.command.duration") + .with_description("Command execution duration in seconds") + .build(), + ); + + self.token_counter = Some( + meter + .u64_counter("goose.tokens.used") + .with_description("Number of tokens used") + .build(), + ); + + self.tool_counter = Some( + meter + .u64_counter("goose.tool.calls") + .with_description("Number of tool calls") + .build(), + ); + } + + Ok(()) + } + + fn record_recipe_execution(&self, execution: &RecipeExecution) { + let attributes = vec![ + KeyValue::new("recipe.name", execution.recipe_name.clone()), + KeyValue::new("recipe.version", execution.recipe_version.clone()), + KeyValue::new("usage.type", format!("{:?}", execution.usage_type)), + KeyValue::new( + "result", + format!( + "{:?}", + execution + .result + .as_ref() + .unwrap_or(&crate::telemetry::events::SessionResult::Success) + ), + ), + ]; + + if let Some(counter) = &self.recipe_counter { + counter.add(1, &attributes); + } + + if let (Some(histogram), Some(duration_ms)) = (&self.recipe_duration, execution.duration_ms) + { + histogram.record(duration_ms as f64 / 1000.0, &attributes); + } + + if let (Some(counter), Some(token_usage)) = (&self.token_counter, &execution.token_usage) { + let token_attributes = [ + attributes.clone(), + vec![ + KeyValue::new("token.type", "input"), + KeyValue::new("model", token_usage.model.clone().unwrap_or_default()), + KeyValue::new("provider", token_usage.provider.clone().unwrap_or_default()), + ], + ] + .concat(); + + counter.add(token_usage.input_tokens, &token_attributes); + + let output_attributes = [ + attributes.clone(), + vec![ + KeyValue::new("token.type", "output"), + KeyValue::new("model", token_usage.model.clone().unwrap_or_default()), + KeyValue::new("provider", token_usage.provider.clone().unwrap_or_default()), + ], + ] + .concat(); + + counter.add(token_usage.output_tokens, &output_attributes); + } + + if let Some(counter) = &self.tool_counter { + for tool_usage in &execution.tool_usage { + let tool_attributes = [ + attributes.clone(), + vec![ + KeyValue::new("tool.name", tool_usage.tool_name.clone()), + KeyValue::new("tool.result", "success"), + ], + ] + .concat(); + + counter.add(tool_usage.success_count, &tool_attributes); + + if tool_usage.error_count > 0 { + let error_attributes = [ + attributes.clone(), + vec![ + KeyValue::new("tool.name", tool_usage.tool_name.clone()), + KeyValue::new("tool.result", "error"), + ], + ] + .concat(); + + counter.add(tool_usage.error_count, &error_attributes); + } + } + } + } + + fn create_recipe_span(&self, execution: &RecipeExecution) { + if let Some(tracer_provider) = &self.tracer_provider { + let tracer = tracer_provider.tracer("goose"); + let mut span = tracer + .span_builder(format!("recipe.{}", execution.recipe_name)) + .with_attributes(vec![ + KeyValue::new("recipe.name", execution.recipe_name.clone()), + KeyValue::new("recipe.version", execution.recipe_version.clone()), + KeyValue::new("usage.type", format!("{:?}", execution.usage_type)), + KeyValue::new("user.id", execution.user_id.clone()), + ]) + .start(&tracer); + + if let Some(result) = &execution.result { + span.set_attribute(KeyValue::new("result", format!("{:?}", result))); + } + + if let Some(duration_ms) = execution.duration_ms { + span.set_attribute(KeyValue::new("duration.ms", duration_ms as i64)); + } + + if let Some(error_details) = &execution.error_details { + span.set_attribute(KeyValue::new( + "error.type", + error_details.error_type.clone(), + )); + span.set_attribute(KeyValue::new( + "error.message", + error_details.error_message.clone(), + )); + span.record_error(&std::io::Error::other(error_details.error_message.clone())); + } + + span.end(); + } + } + + fn record_session_execution(&self, execution: &SessionExecution) { + let attributes = vec![ + KeyValue::new("session.type", format!("{:?}", execution.session_type)), + KeyValue::new("usage.type", format!("{:?}", execution.usage_type)), + KeyValue::new( + "result", + format!( + "{:?}", + execution + .result + .as_ref() + .unwrap_or(&crate::telemetry::events::SessionResult::Success) + ), + ), + KeyValue::new("execution.type", "session"), + ]; + + if let Some(counter) = &self.session_counter { + counter.add(1, &attributes); + } + + if let (Some(histogram), Some(duration_ms)) = + (&self.session_duration, execution.duration_ms) + { + histogram.record(duration_ms as f64 / 1000.0, &attributes); + } + + if let (Some(counter), Some(token_usage)) = (&self.token_counter, &execution.token_usage) { + let token_attributes = [ + attributes.clone(), + vec![ + KeyValue::new("token.type", "input"), + KeyValue::new("model", token_usage.model.clone().unwrap_or_default()), + KeyValue::new("provider", token_usage.provider.clone().unwrap_or_default()), + ], + ] + .concat(); + + counter.add(token_usage.input_tokens, &token_attributes); + + let output_attributes = [ + attributes.clone(), + vec![ + KeyValue::new("token.type", "output"), + KeyValue::new("model", token_usage.model.clone().unwrap_or_default()), + KeyValue::new("provider", token_usage.provider.clone().unwrap_or_default()), + ], + ] + .concat(); + + counter.add(token_usage.output_tokens, &output_attributes); + } + + if let Some(counter) = &self.tool_counter { + for tool_usage in &execution.tool_usage { + let tool_attributes = [ + attributes.clone(), + vec![ + KeyValue::new("tool.name", tool_usage.tool_name.clone()), + KeyValue::new("tool.result", "success"), + ], + ] + .concat(); + + counter.add(tool_usage.success_count, &tool_attributes); + + if tool_usage.error_count > 0 { + let error_attributes = [ + attributes.clone(), + vec![ + KeyValue::new("tool.name", tool_usage.tool_name.clone()), + KeyValue::new("tool.result", "error"), + ], + ] + .concat(); + + counter.add(tool_usage.error_count, &error_attributes); + } + } + } + } + + fn create_session_span(&self, execution: &SessionExecution) { + if let Some(tracer_provider) = &self.tracer_provider { + let tracer = tracer_provider.tracer("goose"); + let mut span = tracer + .span_builder(format!("session.{}", execution.session_id)) + .with_attributes(vec![ + KeyValue::new("session.id", execution.session_id.clone()), + KeyValue::new("session.type", format!("{:?}", execution.session_type)), + KeyValue::new("usage.type", format!("{:?}", execution.usage_type)), + KeyValue::new("user.id", execution.user_id.clone()), + KeyValue::new("message.count", execution.message_count as i64), + KeyValue::new("turn.count", execution.turn_count as i64), + ]) + .start(&tracer); + + if let Some(result) = &execution.result { + span.set_attribute(KeyValue::new("result", format!("{:?}", result))); + } + + if let Some(duration_ms) = execution.duration_ms { + span.set_attribute(KeyValue::new("duration.ms", duration_ms as i64)); + } + + if let Some(error_details) = &execution.error_details { + span.set_attribute(KeyValue::new( + "error.type", + error_details.error_type.clone(), + )); + span.set_attribute(KeyValue::new( + "error.message", + error_details.error_message.clone(), + )); + span.record_error(&std::io::Error::other(error_details.error_message.clone())); + } + + span.end(); + } + } + + fn record_command_execution(&self, execution: &CommandExecution) { + let attributes = vec![ + KeyValue::new("command.name", execution.command_name.clone()), + KeyValue::new("command.type", format!("{:?}", execution.command_type)), + KeyValue::new("usage.type", format!("{:?}", execution.usage_type)), + KeyValue::new( + "result", + format!( + "{:?}", + execution + .result + .as_ref() + .unwrap_or(&crate::telemetry::events::CommandResult::Success) + ), + ), + KeyValue::new("execution.type", "command"), + ]; + + if let Some(counter) = &self.command_counter { + counter.add(1, &attributes); + } + + if let (Some(histogram), Some(duration_ms)) = + (&self.command_duration, execution.duration_ms) + { + histogram.record(duration_ms as f64 / 1000.0, &attributes); + } + } + + fn create_command_span(&self, execution: &CommandExecution) { + if let Some(tracer_provider) = &self.tracer_provider { + let tracer = tracer_provider.tracer("goose"); + let mut span = tracer + .span_builder(format!("command.{}", execution.command_name)) + .with_attributes(vec![ + KeyValue::new("command.name", execution.command_name.clone()), + KeyValue::new("command.type", format!("{:?}", execution.command_type)), + KeyValue::new("usage.type", format!("{:?}", execution.usage_type)), + KeyValue::new("user.id", execution.user_id.clone()), + ]) + .start(&tracer); + + if let Some(result) = &execution.result { + span.set_attribute(KeyValue::new("result", format!("{:?}", result))); + } + + if let Some(duration_ms) = execution.duration_ms { + span.set_attribute(KeyValue::new("duration.ms", duration_ms as i64)); + } + + if let Some(error_details) = &execution.error_details { + span.set_attribute(KeyValue::new( + "error.type", + error_details.error_type.clone(), + )); + span.set_attribute(KeyValue::new( + "error.message", + error_details.error_message.clone(), + )); + span.record_error(&std::io::Error::other(error_details.error_message.clone())); + } + + span.end(); + } + } +} + +#[async_trait::async_trait] +impl super::TelemetryBackend for OtlpProvider { + async fn initialize( + &mut self, + config: &TelemetryConfig, + ) -> Result<(), Box> { + if self.initialized { + return Ok(()); + } + + let resource = Resource::builder() + .with_attributes(vec![ + KeyValue::new(semconv::resource::SERVICE_NAME, config.service_name.clone()), + KeyValue::new( + semconv::resource::SERVICE_VERSION, + config.service_version.clone(), + ), + KeyValue::new( + "goose.usage_type", + format!( + "{:?}", + config + .usage_type + .as_ref() + .unwrap_or(&crate::telemetry::config::UsageType::Human) + ), + ), + ]) + .build(); + + use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, WithTonicConfig}; + + let endpoint = config.get_endpoint().ok_or( + "OTLP provider requires GOOSE_TELEMETRY_ENDPOINT or OTEL_EXPORTER_OTLP_ENDPOINT", + )?; + + // Check if HTTP protocol is explicitly requested + let use_http = std::env::var("GOOSE_TELEMETRY_PROTOCOL") + .map(|p| p.to_lowercase() == "http") + .unwrap_or(false); + + let otlp_trace_exporter = if use_http { + eprintln!("🌐 OTLP: Using HTTP/protobuf protocol"); + let mut builder = opentelemetry_otlp::SpanExporter::builder() + .with_http() + .with_endpoint(format!("{}/v1/traces", endpoint.trim_end_matches('/'))); + + if let Some(api_key) = &config.api_key { + use std::collections::HashMap; + + let mut headers = HashMap::new(); + + let header_name = std::env::var("GOOSE_TELEMETRY_AUTH_HEADER") + .unwrap_or_else(|_| "x-api-key".to_string()); + + headers.insert(header_name, api_key.clone()); + + builder = builder.with_headers(headers); + tracing::info!("OTLP provider configured with API key authentication"); + } + + // Use default HTTP client for protobuf format + eprintln!( + "🔧 OTLP: Trace exporter endpoint: {}/v1/traces", + endpoint.trim_end_matches('/') + ); + + builder.build()? + } else { + eprintln!("🚀 OTLP: Using gRPC protocol (default)"); + let mut builder = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(endpoint.clone()); + + if let Some(api_key) = &config.api_key { + use tonic::metadata::{MetadataMap, MetadataValue}; + + let mut metadata = MetadataMap::new(); + + let header_name = std::env::var("GOOSE_TELEMETRY_AUTH_HEADER") + .unwrap_or_else(|_| "x-api-key".to_string()); + + let metadata_value = MetadataValue::try_from(api_key.as_str()) + .map_err(|e| format!("Invalid API key format: {}", e))?; + + match header_name.as_str() { + "x-api-key" => metadata.insert("x-api-key", metadata_value), + "authorization" => metadata.insert("authorization", metadata_value), + "x-honeycomb-team" => metadata.insert("x-honeycomb-team", metadata_value), + "x-otlp-api-key" => metadata.insert("x-otlp-api-key", metadata_value), + _ => { + let key = tonic::metadata::MetadataKey::from_bytes(header_name.as_bytes()) + .map_err(|e| format!("Invalid header name '{}': {}", header_name, e))?; + metadata.insert(key, metadata_value) + } + }; + + builder = builder.with_metadata(metadata); + tracing::info!("OTLP provider configured with API key authentication"); + } + + builder.build()? + }; + + let tracer_provider = SdkTracerProvider::builder() + .with_resource(resource.clone()) + .with_batch_exporter(otlp_trace_exporter) + .build(); + + self.tracer_provider = Some(tracer_provider); + + let otlp_metrics_exporter = if use_http { + let mut builder = opentelemetry_otlp::MetricExporter::builder() + .with_http() + .with_endpoint(format!("{}/v1/metrics", endpoint.trim_end_matches('/'))); + + if let Some(api_key) = &config.api_key { + use std::collections::HashMap; + + let mut headers = HashMap::new(); + + let header_name = std::env::var("GOOSE_TELEMETRY_AUTH_HEADER") + .unwrap_or_else(|_| "x-api-key".to_string()); + + headers.insert(header_name, api_key.clone()); + + builder = builder.with_headers(headers); + } + + // Use default HTTP client for protobuf format + eprintln!( + "🔧 OTLP: Metrics exporter endpoint: {}/v1/metrics", + endpoint.trim_end_matches('/') + ); + + builder.build()? + } else { + let mut builder = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_endpoint(endpoint.clone()); + + if let Some(api_key) = &config.api_key { + use tonic::metadata::{MetadataMap, MetadataValue}; + + let mut metadata = MetadataMap::new(); + + let header_name = std::env::var("GOOSE_TELEMETRY_AUTH_HEADER") + .unwrap_or_else(|_| "x-api-key".to_string()); + + let metadata_value = MetadataValue::try_from(api_key.as_str()) + .map_err(|e| format!("Invalid API key format: {}", e))?; + + match header_name.as_str() { + "x-api-key" => metadata.insert("x-api-key", metadata_value), + "authorization" => metadata.insert("authorization", metadata_value), + "x-honeycomb-team" => metadata.insert("x-honeycomb-team", metadata_value), + "x-otlp-api-key" => metadata.insert("x-otlp-api-key", metadata_value), + _ => { + let key = tonic::metadata::MetadataKey::from_bytes(header_name.as_bytes()) + .map_err(|e| format!("Invalid header name '{}': {}", header_name, e))?; + metadata.insert(key, metadata_value) + } + }; + + builder = builder.with_metadata(metadata); + } + + builder.build()? + }; + + let meter_provider = SdkMeterProvider::builder() + .with_resource(resource) + .with_reader( + PeriodicReader::builder(otlp_metrics_exporter) + .with_interval(Duration::from_secs(1)) // Very short interval for immediate export + .build(), + ) + .build(); + + self.meter_provider = Some(meter_provider); + + self.init_metrics().await?; + self.initialized = true; + + let auth_status = if config.api_key.is_some() { + "with authentication" + } else { + "without authentication" + }; + eprintln!( + "🔧 OTLP telemetry provider initialized with endpoint: {} ({})", + endpoint, auth_status + ); + tracing::info!( + "OTLP telemetry provider initialized with endpoint: {} ({})", + endpoint, + auth_status + ); + Ok(()) + } + + async fn send_event( + &self, + event: &TelemetryEvent, + ) -> Result<(), Box> { + if !self.initialized { + return Err("OTLP provider not initialized".into()); + } + + match event { + TelemetryEvent::RecipeExecution(execution) => { + eprintln!( + "📊 OTLP: Recording recipe execution: {}", + execution.recipe_name + ); + self.record_recipe_execution(execution); + self.create_recipe_span(execution); + eprintln!("✅ OTLP: Recipe span created and recorded"); + } + TelemetryEvent::SessionExecution(execution) => { + eprintln!( + "📊 OTLP: Recording session execution: {}", + execution.session_id + ); + self.record_session_execution(execution); + self.create_session_span(execution); + eprintln!("✅ OTLP: Session span created and recorded"); + } + TelemetryEvent::CommandExecution(execution) => { + eprintln!( + "📊 OTLP: Recording command execution: {}", + execution.command_name + ); + self.record_command_execution(execution); + self.create_command_span(execution); + eprintln!("✅ OTLP: Command span created and recorded"); + } + TelemetryEvent::SystemMetrics(_metrics) => { + // System metrics could be implemented in the future + } + TelemetryEvent::UserSession(_session) => { + // User session metrics could be implemented in the future + } + } + + // Let the standard OTLP library handle the export + eprintln!("✅ OTLP: Event recorded successfully (will be exported periodically by PeriodicReader)"); + + Ok(()) + } + + async fn shutdown(&self) -> Result<(), Box> { + if self.initialized { + eprintln!("🛑 OTLP: Shutting down telemetry provider..."); + + // Use proper async context for shutdown flush + if let Ok(handle) = tokio::runtime::Handle::try_current() { + eprintln!("✅ OTLP: Found active Tokio runtime for shutdown flush"); + + if let Some(tracer_provider) = &self.tracer_provider { + eprintln!("🔄 OTLP: Shutdown flushing traces with runtime context..."); + let tracer_provider = tracer_provider.clone(); + let result = handle + .spawn(async move { tracer_provider.force_flush() }) + .await; + + match result { + Ok(Ok(_)) => eprintln!("✅ OTLP: Shutdown traces flush successful"), + Ok(Err(e)) => eprintln!("❌ OTLP: Shutdown traces flush error: {:?}", e), + Err(e) => eprintln!("❌ OTLP: Shutdown traces spawn error: {:?}", e), + } + } + + if let Some(meter_provider) = &self.meter_provider { + eprintln!("🔄 OTLP: Shutdown flushing metrics with runtime context..."); + let meter_provider = meter_provider.clone(); + let result = handle + .spawn(async move { meter_provider.force_flush() }) + .await; + + match result { + Ok(Ok(_)) => eprintln!("✅ OTLP: Shutdown metrics flush successful"), + Ok(Err(e)) => eprintln!("❌ OTLP: Shutdown metrics flush error: {:?}", e), + Err(e) => eprintln!("❌ OTLP: Shutdown metrics spawn error: {:?}", e), + } + } + } else { + eprintln!("⚠️ OTLP: No active Tokio runtime for shutdown flush"); + eprintln!(" (This is expected during shutdown - data should have been exported periodically)"); + } + + eprintln!("✅ OTLP: Shutdown complete"); + tracing::info!("OTLP telemetry provider shutdown successfully"); + } + Ok(()) + } +} diff --git a/crates/goose/src/telemetry/user.rs b/crates/goose/src/telemetry/user.rs new file mode 100644 index 000000000000..1704e60d2936 --- /dev/null +++ b/crates/goose/src/telemetry/user.rs @@ -0,0 +1,256 @@ +use crate::telemetry::config::UsageType; +use serde::{Deserialize, Serialize}; +use std::env; +use std::fs; +use std::path::PathBuf; +use uuid::Uuid; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UserIdentity { + pub user_id: String, + pub usage_type: UsageType, + pub first_seen: chrono::DateTime, + pub last_seen: chrono::DateTime, +} + +impl UserIdentity { + pub async fn load_or_create() -> Result> { + let identity_path = get_identity_file_path()?; + + if identity_path.exists() { + match fs::read_to_string(&identity_path) { + Ok(content) => { + if let Ok(mut identity) = serde_json::from_str::(&content) { + identity.last_seen = chrono::Utc::now(); + identity.usage_type = detect_usage_type(); + + if let Err(e) = identity.save().await { + tracing::warn!("Failed to update user identity: {}", e); + } + + return Ok(identity); + } + } + Err(e) => { + tracing::warn!("Failed to read user identity file: {}", e); + } + } + } + + let identity = Self::new(); + if let Err(e) = identity.save().await { + tracing::warn!("Failed to save new user identity: {}", e); + } + + Ok(identity) + } + + fn new() -> Self { + let now = chrono::Utc::now(); + Self { + user_id: Uuid::new_v4().to_string(), + usage_type: detect_usage_type(), + first_seen: now, + last_seen: now, + } + } + + async fn save(&self) -> Result<(), Box> { + let identity_path = get_identity_file_path()?; + + if let Some(parent) = identity_path.parent() { + fs::create_dir_all(parent)?; + } + + let content = serde_json::to_string_pretty(self)?; + fs::write(&identity_path, content)?; + + Ok(()) + } +} + +pub fn detect_usage_type() -> UsageType { + if let Ok(usage_type) = env::var("GOOSE_USAGE_TYPE") { + if let Ok(parsed) = usage_type.parse::() { + return parsed; + } + } + + if is_ci_environment() { + return UsageType::Ci; + } + + if is_automation_environment() { + return UsageType::Automation; + } + + UsageType::Human +} + +fn is_ci_environment() -> bool { + let ci_vars = [ + "CI", + "CONTINUOUS_INTEGRATION", + "BUILD_NUMBER", + "GITHUB_ACTIONS", + "GITLAB_CI", + "JENKINS_URL", + "TRAVIS", + "CIRCLECI", + "BUILDKITE", + "DRONE", + "TEAMCITY_VERSION", + "TF_BUILD", + "CODEBUILD_BUILD_ID", + ]; + + ci_vars.iter().any(|var| env::var(var).is_ok()) +} + +fn is_automation_environment() -> bool { + let automation_vars = [ + "AWS_LAMBDA_FUNCTION_NAME", + "KUBERNETES_SERVICE_HOST", + "DOCKER_CONTAINER", + "SERVERLESS", + "FUNCTION_NAME", + "AZURE_FUNCTIONS_ENVIRONMENT", + "GOOSE_AUTOMATION", + "GOOSE_JOB_ID", // Temporal service scheduled jobs + ]; + + automation_vars.iter().any(|var| env::var(var).is_ok()) +} + +fn get_identity_file_path() -> Result> { + let config_dir = dirs::config_dir().ok_or("Could not determine config directory")?; + + Ok(config_dir + .join("goose") + .join("telemetry") + .join("user-identity.json")) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + use tempfile::TempDir; + + #[test] + fn test_detect_usage_type_default() { + let vars_to_clear = [ + "GOOSE_USAGE_TYPE", + "CI", + "CONTINUOUS_INTEGRATION", + "BUILD_NUMBER", + "GITHUB_ACTIONS", + "GITLAB_CI", + "JENKINS_URL", + "TRAVIS", + "CIRCLECI", + "BUILDKITE", + "DRONE", + "TEAMCITY_VERSION", + "TF_BUILD", + "CODEBUILD_BUILD_ID", + "AWS_LAMBDA_FUNCTION_NAME", + "GOOSE_JOB_ID", + ]; + for var in &vars_to_clear { + env::remove_var(var); + } + + assert_eq!(detect_usage_type(), UsageType::Human); + } + + #[test] + fn test_detect_usage_type_temporal_service() { + // Clear all environment variables first + let vars_to_clear = [ + "GOOSE_USAGE_TYPE", + "CI", + "GITHUB_ACTIONS", + "AWS_LAMBDA_FUNCTION_NAME", + "GOOSE_JOB_ID", + ]; + for var in &vars_to_clear { + env::remove_var(var); + } + + // Set temporal service job ID + env::set_var("GOOSE_JOB_ID", "scheduled-job-abc123"); + assert_eq!(detect_usage_type(), UsageType::Automation); + + // Clean up + env::remove_var("GOOSE_JOB_ID"); + } + + #[test] + fn test_detect_usage_type_override() { + env::set_var("GOOSE_USAGE_TYPE", "automation"); + assert_eq!(detect_usage_type(), UsageType::Automation); + env::remove_var("GOOSE_USAGE_TYPE"); + } + + #[test] + fn test_detect_ci_environment() { + env::remove_var("CI"); + env::remove_var("GITHUB_ACTIONS"); + assert!(!is_ci_environment()); + + env::set_var("CI", "true"); + assert!(is_ci_environment()); + env::remove_var("CI"); + + env::set_var("GITHUB_ACTIONS", "true"); + assert!(is_ci_environment()); + env::remove_var("GITHUB_ACTIONS"); + } + + #[test] + fn test_detect_automation_environment() { + env::remove_var("AWS_LAMBDA_FUNCTION_NAME"); + env::remove_var("KUBERNETES_SERVICE_HOST"); + env::remove_var("GOOSE_JOB_ID"); + assert!(!is_automation_environment()); + + env::set_var("AWS_LAMBDA_FUNCTION_NAME", "test-function"); + assert!(is_automation_environment()); + env::remove_var("AWS_LAMBDA_FUNCTION_NAME"); + + env::set_var("KUBERNETES_SERVICE_HOST", "10.0.0.1"); + assert!(is_automation_environment()); + env::remove_var("KUBERNETES_SERVICE_HOST"); + + // Test temporal service detection + env::set_var("GOOSE_JOB_ID", "scheduled-job-123"); + assert!(is_automation_environment()); + env::remove_var("GOOSE_JOB_ID"); + } + + #[test] + fn test_user_identity_creation() { + let identity = UserIdentity::new(); + + assert!(Uuid::parse_str(&identity.user_id).is_ok()); + + let now = chrono::Utc::now(); + assert!(identity.first_seen <= now); + assert!(identity.last_seen <= now); + assert!(identity.first_seen <= identity.last_seen); + } + + #[tokio::test] + async fn test_user_identity_persistence() { + let temp_dir = TempDir::new().unwrap(); + let identity_path = temp_dir.path().join("user-identity.json"); + + let original_identity = UserIdentity::new(); + let json = serde_json::to_string(&original_identity).unwrap(); + + let loaded_identity: UserIdentity = serde_json::from_str(&json).unwrap(); + assert_eq!(original_identity.user_id, loaded_identity.user_id); + assert_eq!(original_identity.usage_type, loaded_identity.usage_type); + } +} diff --git a/crates/goose/src/telemetry/utils.rs b/crates/goose/src/telemetry/utils.rs new file mode 100644 index 000000000000..4662a49d8742 --- /dev/null +++ b/crates/goose/src/telemetry/utils.rs @@ -0,0 +1,50 @@ +use std::collections::HashMap; + +use crate::message::{Message, MessageContent}; +use crate::telemetry::ToolUsage; + +pub fn extract_tool_usage_from_messages(messages: &[Message]) -> Vec { + let mut tool_usage_map: HashMap = HashMap::new(); + let mut tool_call_times: HashMap = HashMap::new(); + let mut tool_id_to_name: HashMap = HashMap::new(); + + for message in messages { + for content in &message.content { + match content { + MessageContent::ToolRequest(tool_request) => { + if let Ok(tool_call) = &tool_request.tool_call { + let tool_name = &tool_call.name; + let tool_id = &tool_request.id; + + tool_id_to_name.insert(tool_id.clone(), tool_name.clone()); + tool_call_times.insert(tool_id.clone(), message.created); + + tool_usage_map + .entry(tool_name.clone()) + .or_insert_with(|| ToolUsage::new(tool_name)); + } + } + MessageContent::ToolResponse(tool_response) => { + let tool_id = &tool_response.id; + + if let Some(tool_name) = tool_id_to_name.get(tool_id) { + if let Some(entry) = tool_usage_map.get_mut(tool_name) { + let duration = if let Some(start_time) = tool_call_times.get(tool_id) { + let duration_ms = (message.created - start_time).max(0) as u64; + std::time::Duration::from_millis(duration_ms) + } else { + std::time::Duration::from_millis(0) + }; + + let success = tool_response.tool_result.is_ok(); + entry.add_call(duration, success); + } + } + } + _ => {} + } + } + } + + tool_usage_map.into_values().collect() +}