Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ retry-policies = "0.4.0"
reqwest-retry = "0.7.0"
reqwest-middleware = "0.4.2"
vrl = { version = "0.27.0", features = ["compiler", "parser", "value", "diagnostic", "stdlib", "core"] }
regex-automata = "0.4.10"
3 changes: 2 additions & 1 deletion bin/router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ path = "src/main.rs"
hive-router-query-planner = { path = "../../lib/query-planner", version = "2.0.2" }
hive-router-plan-executor = { path = "../../lib/executor", version = "6.0.0" }
hive-router-config = { path = "../../lib/router-config", version = "0.0.10" }
hive-console-sdk = "0.1.0"

tokio = { workspace = true }
futures = { workspace = true }
Expand All @@ -45,11 +46,11 @@ reqwest-retry = { workspace = true }
reqwest-middleware = { workspace = true }
vrl = { workspace = true }
serde_json = { workspace = true }
regex-automata = { workspace = true }

mimalloc = { version = "0.1.48", features = ["v3"] }
moka = { version = "0.12.10", features = ["future"] }
ulid = "1.2.1"
tokio-util = "0.7.16"
cookie = "0.18.1"
regex-automata = "0.4.10"
arc-swap = "1.7.1"
16 changes: 14 additions & 2 deletions bin/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
},
jwt::JwtAuthRuntime,
logger::configure_logging,
pipeline::graphql_request_handler,
pipeline::{graphql_request_handler, usage_reporting::init_hive_user_agent},
};

pub use crate::{schema_state::SchemaState, shared_state::RouterSharedState};
Expand Down Expand Up @@ -111,11 +111,23 @@ pub async fn configure_app_from_config(
false => None,
};

let hive_usage_agent = match router_config.usage_reporting.enabled {
true => Some(init_hive_user_agent(
bg_tasks_manager,
&router_config.usage_reporting,
)),
false => None,
};

let router_config_arc = Arc::new(router_config);
let schema_state =
SchemaState::new_from_config(bg_tasks_manager, router_config_arc.clone()).await?;
let schema_state_arc = Arc::new(schema_state);
let shared_state = Arc::new(RouterSharedState::new(router_config_arc, jwt_runtime)?);
let shared_state = Arc::new(RouterSharedState::new(
router_config_arc,
jwt_runtime,
hive_usage_agent,
)?);

Ok((shared_state, schema_state_arc))
}
Expand Down
18 changes: 17 additions & 1 deletion bin/router/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, time::Instant};

use hive_router_plan_executor::execution::{
client_request_details::{ClientRequestDetails, JwtRequestDetails, OperationDetails},
Expand Down Expand Up @@ -46,6 +46,7 @@ pub mod normalize;
pub mod parser;
pub mod progressive_override;
pub mod query_plan;
pub mod usage_reporting;
pub mod validation;

static GRAPHIQL_HTML: &str = include_str!("../../static/graphiql.html");
Expand Down Expand Up @@ -111,6 +112,7 @@ pub async fn execute_pipeline(
shared_state: &Arc<RouterSharedState>,
schema_state: &Arc<SchemaState>,
) -> Result<PlanExecutionOutput, PipelineError> {
let start = Instant::now();
perform_csrf_prevention(req, &shared_state.router_config.csrf)?;

let mut execution_request = get_execution_request(req, body_bytes).await?;
Expand Down Expand Up @@ -190,5 +192,19 @@ pub async fn execute_pipeline(
)
.await?;

if shared_state.router_config.usage_reporting.enabled {
if let Some(hive_usage_agent) = &shared_state.hive_usage_agent {
usage_reporting::collect_usage_report(
supergraph.supergraph_schema.clone(),
start.elapsed(),
req,
&client_request_details,
hive_usage_agent,
&shared_state.router_config.usage_reporting,
&execution_result,
);
}
}

Ok(execution_result)
}
105 changes: 105 additions & 0 deletions bin/router/src/pipeline/usage_reporting.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use std::{
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use async_trait::async_trait;
use graphql_parser::schema::Document;
use hive_console_sdk::agent::{ExecutionReport, UsageAgent};
use hive_router_config::usage_reporting::UsageReportingConfig;
use hive_router_plan_executor::execution::{
client_request_details::ClientRequestDetails, plan::PlanExecutionOutput,
};
use ntex::web::HttpRequest;
use rand::Rng;
use tokio_util::sync::CancellationToken;

use crate::{
background_tasks::{BackgroundTask, BackgroundTasksManager},
consts::ROUTER_VERSION,
};

pub fn init_hive_user_agent(
bg_tasks_manager: &mut BackgroundTasksManager,
usage_config: &UsageReportingConfig,
) -> Arc<UsageAgent> {
let user_agent = format!("hive-router/{}", ROUTER_VERSION);
let hive_user_agent = hive_console_sdk::agent::UsageAgent::new(
usage_config.access_token.clone(),
usage_config.endpoint.clone(),
usage_config.target_id.clone(),
usage_config.buffer_size,
usage_config.connect_timeout,
usage_config.request_timeout,
usage_config.accept_invalid_certs,
usage_config.flush_interval,
user_agent,
);
let hive_user_agent_arc = Arc::new(hive_user_agent);
bg_tasks_manager.register_task(hive_user_agent_arc.clone());
hive_user_agent_arc
}

#[inline]
pub fn collect_usage_report(
schema: Arc<Document<'static, String>>,
duration: Duration,
req: &HttpRequest,
client_request_details: &ClientRequestDetails,
hive_usage_agent: &UsageAgent,
usage_config: &UsageReportingConfig,
execution_result: &PlanExecutionOutput,
) {
let mut rng = rand::rng();
let sampled = rng.random::<f64>() < usage_config.sample_rate.as_f64();
if !sampled {
return;
}
if client_request_details
.operation
.name
.is_some_and(|op_name| usage_config.exclude.contains(&op_name.to_string()))
{
return;
}
let client_name = get_header_value(req, &usage_config.client_name_header);
let client_version = get_header_value(req, &usage_config.client_version_header);
let timestamp = SystemTime::now()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be as_millis instead of sec*1000

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 👍

.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let execution_report = ExecutionReport {
schema,
client_name: client_name.map(|s| s.to_owned()),
client_version: client_version.map(|s| s.to_owned()),
timestamp,
duration,
ok: execution_result.error_count == 0,
errors: execution_result.error_count,
operation_body: client_request_details.operation.query.to_owned(),
operation_name: client_request_details
.operation
.name
.map(|op_name| op_name.to_owned()),
persisted_document_hash: None,
};

if let Err(err) = hive_usage_agent.add_report(execution_report) {
tracing::error!("Failed to send usage report: {}", err);
}
}

fn get_header_value<'req>(req: &'req HttpRequest, header_name: &str) -> Option<&'req str> {
req.headers().get(header_name).and_then(|v| v.to_str().ok())
}

#[async_trait]
impl BackgroundTask for UsageAgent {
fn id(&self) -> &str {
"hive_console_usage_report_task"
}

async fn run(&self, token: CancellationToken) {
self.start_flush_interval(Some(token)).await
}
}
3 changes: 3 additions & 0 deletions bin/router/src/schema_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use arc_swap::{ArcSwap, Guard};
use async_trait::async_trait;
use graphql_parser::schema::Document;
use graphql_tools::validation::utils::ValidationError;
use hive_router_config::{supergraph::SupergraphSource, HiveRouterConfig};
use hive_router_plan_executor::{
Expand Down Expand Up @@ -39,6 +40,7 @@ pub struct SupergraphData {
pub metadata: SchemaMetadata,
pub planner: Planner,
pub subgraph_executor_map: SubgraphExecutorMap,
pub supergraph_schema: Arc<Document<'static, String>>,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -124,6 +126,7 @@ impl SchemaState {
)?;

Ok(SupergraphData {
supergraph_schema: Arc::new(parsed_supergraph_sdl),
metadata,
planner,
subgraph_executor_map,
Expand Down
6 changes: 6 additions & 0 deletions bin/router/src/shared_state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use graphql_tools::validation::validate::ValidationPlan;
use hive_console_sdk::agent::UsageAgent;
use hive_router_config::HiveRouterConfig;
use hive_router_plan_executor::headers::{
compile::compile_headers_plan, errors::HeaderRuleCompileError, plan::HeaderRulesPlan,
Expand All @@ -18,12 +19,14 @@ pub struct RouterSharedState {
pub override_labels_evaluator: OverrideLabelsEvaluator,
pub cors_runtime: Option<Cors>,
pub jwt_auth_runtime: Option<JwtAuthRuntime>,
pub hive_usage_agent: Option<Arc<UsageAgent>>,
}

impl RouterSharedState {
pub fn new(
router_config: Arc<HiveRouterConfig>,
jwt_auth_runtime: Option<JwtAuthRuntime>,
hive_usage_agent: Option<Arc<UsageAgent>>,
) -> Result<Self, SharedStateError> {
Ok(Self {
validation_plan: graphql_tools::validation::rules::default_rules_validation_plan(),
Expand All @@ -36,6 +39,7 @@ impl RouterSharedState {
)
.map_err(Box::new)?,
jwt_auth_runtime,
hive_usage_agent,
})
}
}
Expand All @@ -48,4 +52,6 @@ pub enum SharedStateError {
CORSConfig(#[from] Box<CORSConfigError>),
#[error("invalid override labels config: {0}")]
OverrideLabelsCompile(#[from] Box<OverrideLabelsCompileError>),
#[error("error creating hive usage agent: {0}")]
UsageAgent(#[from] Box<hive_console_sdk::agent::AgentError>),
}
Loading
Loading