Skip to content

Commit 7c73c86

Browse files
committed
feat(router): Hive Console Usage Reporting
1 parent 71ab1a6 commit 7c73c86

File tree

11 files changed

+283
-17
lines changed

11 files changed

+283
-17
lines changed

Cargo.lock

Lines changed: 53 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@ lto = true
2626
codegen-units = 1
2727

2828
[workspace.dependencies]
29-
graphql-tools = "0.4.0"
30-
graphql-parser = "0.4.1"
29+
graphql-parser = { version = "0.5.0", package = "graphql-parser-hive-fork" }
30+
graphql-tools = { version = "0.4.0", features = [
31+
"graphql_parser_fork",
32+
], default-features = false }
3133
serde = { version = "1.0.219", features = ["derive"] }
3234
serde_json = "1.0.142"
3335
sonic-rs = "0.5.3"

bin/router/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ path = "src/main.rs"
1919
hive-router-query-planner = { path = "../../lib/query-planner", version = "2.0.0" }
2020
hive-router-plan-executor = { path = "../../lib/executor", version = "4.0.0" }
2121
hive-router-config = { path = "../../lib/router-config", version = "0.0.7" }
22+
hive-console-sdk = { path = "../../../graphql-hive/packages/libraries/sdk-rs" }
2223

2324
tokio = { workspace = true }
2425
futures = { workspace = true }

bin/router/src/lib.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ mod schema_state;
88
mod shared_state;
99
mod supergraph;
1010

11-
use std::sync::Arc;
11+
use std::{sync::Arc};
1212

1313
use crate::{
1414
background_tasks::BackgroundTasksManager,
@@ -110,12 +110,17 @@ pub async fn configure_app_from_config(
110110
Some(jwt_config) => Some(JwtAuthRuntime::init(bg_tasks_manager, jwt_config).await?),
111111
None => None,
112112
};
113+
let usage_agent = pipeline::usage::from_config(&router_config).map(Arc::new);
114+
115+
if let Some(usage_agent) = usage_agent.clone() {
116+
bg_tasks_manager.register_task(usage_agent);
117+
}
113118

114119
let router_config_arc = Arc::new(router_config);
115120
let schema_state =
116121
SchemaState::new_from_config(bg_tasks_manager, router_config_arc.clone()).await?;
117122
let schema_state_arc = Arc::new(schema_state);
118-
let shared_state = Arc::new(RouterSharedState::new(router_config_arc, jwt_runtime)?);
123+
let shared_state = Arc::new(RouterSharedState::new(router_config_arc, jwt_runtime, usage_agent)?);
119124

120125
Ok((shared_state, schema_state_arc))
121126
}

bin/router/src/pipeline/mod.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{borrow::Cow, sync::Arc};
1+
use std::{borrow::Cow, sync::Arc, time::Instant};
22

33
use hive_router_plan_executor::execution::plan::PlanExecutionOutput;
44
use hive_router_query_planner::utils::cancellation::CancellationToken;
@@ -40,6 +40,7 @@ pub mod normalize;
4040
pub mod parser;
4141
pub mod progressive_override;
4242
pub mod query_plan;
43+
pub mod usage;
4344
pub mod validation;
4445

4546
static GRAPHIQL_HTML: &str = include_str!("../../static/graphiql.html");
@@ -100,6 +101,7 @@ pub async fn execute_pipeline(
100101
shared_state: &Arc<RouterSharedState>,
101102
schema_state: &Arc<SchemaState>,
102103
) -> Result<PlanExecutionOutput, PipelineError> {
104+
let start = Instant::now();
103105
perform_csrf_prevention(req, &shared_state.router_config.csrf)?;
104106

105107
let execution_request = get_execution_request(req, body_bytes).await?;
@@ -117,6 +119,7 @@ pub async fn execute_pipeline(
117119
)
118120
.await?;
119121
let query = Cow::Owned(execution_request.query.clone());
122+
let query_clone: Cow<'_, str> = Cow::Owned(execution_request.query.clone());
120123
let variable_payload =
121124
coerce_request_variables(req, supergraph, execution_request, &normalize_payload)?;
122125

@@ -144,5 +147,22 @@ pub async fn execute_pipeline(
144147
)
145148
.await?;
146149

150+
let operation_name = normalize_payload.operation_for_plan.name.as_ref();
151+
152+
shared_state.usage_agent.as_ref().and_then(|usage_agent| {
153+
shared_state.router_config.usage.as_ref().map(|usage_config| {
154+
usage::send_usage_report(
155+
supergraph.schema.clone(),
156+
start,
157+
req,
158+
operation_name,
159+
&query_clone,
160+
usage_agent,
161+
usage_config,
162+
&execution_result,
163+
)
164+
})
165+
});
166+
147167
Ok(execution_result)
148168
}

bin/router/src/pipeline/usage.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
use std::{sync::Arc, time::{Duration, Instant, SystemTime, UNIX_EPOCH}};
2+
3+
use async_trait::async_trait;
4+
use graphql_parser::schema::Document;
5+
use hive_console_sdk::agent::{ExecutionReport, UsageAgent};
6+
use hive_router_config::{HiveRouterConfig, usage::UsageConfig};
7+
use hive_router_plan_executor::execution::plan::PlanExecutionOutput;
8+
use ntex::web::HttpRequest;
9+
use rand::Rng;
10+
use tokio_util::sync::CancellationToken;
11+
12+
use crate::background_tasks::BackgroundTask;
13+
14+
pub fn from_config(
15+
router_config: &HiveRouterConfig,
16+
) -> Option<UsageAgent> {
17+
router_config.usage.as_ref().map(|usage_config| {
18+
let flush_interval = Duration::from_secs(usage_config.flush_interval);
19+
hive_console_sdk::agent::UsageAgent::new(
20+
usage_config.token.clone(),
21+
usage_config.endpoint.clone(),
22+
usage_config.buffer_size,
23+
usage_config.connect_timeout,
24+
usage_config.request_timeout,
25+
usage_config.accept_invalid_certs,
26+
flush_interval,
27+
"hive-router".to_string(),
28+
)
29+
})
30+
}
31+
32+
pub fn send_usage_report(
33+
schema: Arc<Document<'static, String>>,
34+
start: Instant,
35+
req: &HttpRequest,
36+
operation_name: Option<&String>,
37+
operation_body: &str,
38+
usage_agent: &UsageAgent,
39+
usage_config: &UsageConfig,
40+
execution_result: &PlanExecutionOutput,
41+
) {
42+
let mut rng = rand::rng();
43+
let sampled = rng.random::<f64>() < usage_config.sample_rate;
44+
if !sampled {
45+
return;
46+
}
47+
if operation_name.is_some_and(|op_name| usage_config.exclude.contains(op_name)) {
48+
return;
49+
}
50+
let client_name = get_header_value(req, &usage_config.client_name_header);
51+
let client_version = get_header_value(req, &usage_config.client_version_header);
52+
let timestamp = SystemTime::now()
53+
.duration_since(UNIX_EPOCH)
54+
.unwrap()
55+
.as_secs()
56+
* 1000;
57+
let duration = start.elapsed();
58+
let execution_report = ExecutionReport {
59+
schema,
60+
client_name,
61+
client_version,
62+
timestamp,
63+
duration,
64+
ok: execution_result.error_count == 0,
65+
errors: execution_result.error_count,
66+
operation_body: operation_body.to_owned(),
67+
operation_name: operation_name.map(|op_name| op_name.to_owned()),
68+
persisted_document_hash: None,
69+
};
70+
usage_agent
71+
.add_report(execution_report)
72+
.unwrap_or_else(|err| tracing::error!("Failed to send usage report: {}", err));
73+
}
74+
75+
fn get_header_value(req: &HttpRequest, header_name: &str) -> Option<String> {
76+
req.headers()
77+
.get(header_name)
78+
.and_then(|v| v.to_str().ok())
79+
.map(|s| s.to_string())
80+
}
81+
82+
#[async_trait]
83+
impl BackgroundTask for UsageAgent {
84+
fn id(&self) -> &str {
85+
"usage_report_flush_interval"
86+
}
87+
88+
async fn run(&self, token: CancellationToken) {
89+
self.start_flush_interval(Some(token)).await
90+
}
91+
}

bin/router/src/schema_state.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use arc_swap::{ArcSwap, Guard};
22
use async_trait::async_trait;
3-
use graphql_tools::validation::utils::ValidationError;
3+
use graphql_parser::schema::Document;
4+
use graphql_tools::{validation::utils::ValidationError};
45
use hive_router_config::{supergraph::SupergraphSource, HiveRouterConfig};
56
use hive_router_plan_executor::{
67
executors::error::SubgraphExecutorError,
@@ -21,7 +22,7 @@ use tracing::{debug, error, trace};
2122

2223
use crate::{
2324
background_tasks::{BackgroundTask, BackgroundTasksManager},
24-
pipeline::normalize::GraphQLNormalizationPayload,
25+
pipeline::{normalize::GraphQLNormalizationPayload},
2526
supergraph::{
2627
base::{LoadSupergraphError, ReloadSupergraphResult, SupergraphLoader},
2728
resolve_from_config,
@@ -39,6 +40,7 @@ pub struct SupergraphData {
3940
pub metadata: SchemaMetadata,
4041
pub planner: Planner,
4142
pub subgraph_executor_map: SubgraphExecutorMap,
43+
pub schema: Arc<Document<'static, String>>,
4244
}
4345

4446
#[derive(Debug, thiserror::Error)]
@@ -124,6 +126,7 @@ impl SchemaState {
124126
)?;
125127

126128
Ok(SupergraphData {
129+
schema: Arc::new(parsed_supergraph_sdl),
127130
metadata,
128131
planner,
129132
subgraph_executor_map,

0 commit comments

Comments
 (0)