Skip to content

Commit 21ccac2

Browse files
committed
Timeout
1 parent 9cd4959 commit 21ccac2

File tree

5 files changed

+141
-125
lines changed

5 files changed

+141
-125
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/executor/src/execution/plan.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,6 @@ impl<'exec, 'req> Executor<'exec, 'req> {
708708
representations,
709709
headers: headers_map,
710710
extensions: None,
711-
client_request: self.client_request,
712711
};
713712

714713
if let Some(jwt_forwarding_plan) = &self.jwt_forwarding_plan {
@@ -723,7 +722,7 @@ impl<'exec, 'req> Executor<'exec, 'req> {
723722
subgraph_name: node.service_name.clone(),
724723
response: self
725724
.executors
726-
.execute(&node.service_name, subgraph_request)
725+
.execute(&node.service_name, subgraph_request, self.client_request)
727726
.await
728727
.into(),
729728
}))

lib/executor/src/executors/common.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,11 @@ use bytes::Bytes;
55
use http::HeaderMap;
66
use sonic_rs::Value;
77

8-
use crate::execution::client_request_details::ClientRequestDetails;
9-
108
#[async_trait]
119
pub trait SubgraphExecutor {
12-
async fn execute<'exec, 'req>(
10+
async fn execute<'a>(
1311
&self,
14-
execution_request: HttpExecutionRequest<'exec, 'req>,
12+
execution_request: HttpExecutionRequest<'a>,
1513
) -> HttpExecutionResponse;
1614

1715
fn to_boxed_arc<'a>(self) -> Arc<Box<dyn SubgraphExecutor + Send + Sync + 'a>>
@@ -28,19 +26,18 @@ pub type SubgraphExecutorBoxedArc = Arc<Box<SubgraphExecutorType>>;
2826

2927
pub type SubgraphRequestExtensions = HashMap<String, Value>;
3028

31-
pub struct HttpExecutionRequest<'exec, 'req> {
32-
pub query: &'exec str,
29+
pub struct HttpExecutionRequest<'a> {
30+
pub query: &'a str,
3331
pub dedupe: bool,
34-
pub operation_name: Option<&'exec str>,
32+
pub operation_name: Option<&'a str>,
3533
// TODO: variables could be stringified before even executing the request
36-
pub variables: Option<HashMap<&'exec str, &'exec sonic_rs::Value>>,
34+
pub variables: Option<HashMap<&'a str, &'a sonic_rs::Value>>,
3735
pub headers: HeaderMap,
3836
pub representations: Option<Vec<u8>>,
3937
pub extensions: Option<SubgraphRequestExtensions>,
40-
pub client_request: &'exec ClientRequestDetails<'exec, 'req>,
4138
}
4239

43-
impl HttpExecutionRequest<'_, '_> {
40+
impl HttpExecutionRequest<'_> {
4441
pub fn add_request_extensions_field(&mut self, key: String, value: Value) {
4542
self.extensions
4643
.get_or_insert_with(HashMap::new)

lib/executor/src/executors/http.rs

Lines changed: 8 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
1-
use std::collections::BTreeMap;
21
use std::sync::Arc;
3-
use std::time::Duration;
42

5-
use crate::execution::client_request_details::ClientRequestDetails;
63
use crate::executors::common::HttpExecutionResponse;
74
use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse};
8-
use crate::utils::expression::execute_expression_with_value;
95
use dashmap::DashMap;
106
use tokio::sync::OnceCell;
117

@@ -21,8 +17,6 @@ use hyper_tls::HttpsConnector;
2117
use hyper_util::client::legacy::{connect::HttpConnector, Client};
2218
use tokio::sync::Semaphore;
2319
use tracing::debug;
24-
use vrl::compiler::Program as VrlProgram;
25-
use vrl::core::Value as VrlValue;
2620

2721
use crate::executors::common::HttpExecutionRequest;
2822
use crate::executors::error::SubgraphExecutorError;
@@ -33,12 +27,6 @@ use crate::utils::consts::COMMA;
3327
use crate::utils::consts::QUOTE;
3428
use crate::{executors::common::SubgraphExecutor, json_writer::write_and_escape_string};
3529

36-
#[derive(Debug)]
37-
pub enum DurationOrProgram {
38-
Duration(Duration),
39-
Program(Box<VrlProgram>),
40-
}
41-
4230
#[derive(Debug)]
4331
pub struct HTTPSubgraphExecutor {
4432
pub subgraph_name: String,
@@ -48,7 +36,6 @@ pub struct HTTPSubgraphExecutor {
4836
pub semaphore: Arc<Semaphore>,
4937
pub dedupe_enabled: bool,
5038
pub in_flight_requests: Arc<DashMap<u64, Arc<OnceCell<SharedResponse>>, ABuildHasher>>,
51-
pub timeout: DurationOrProgram,
5239
}
5340

5441
const FIRST_VARIABLE_STR: &[u8] = b",\"variables\":{";
@@ -64,7 +51,6 @@ impl HTTPSubgraphExecutor {
6451
semaphore: Arc<Semaphore>,
6552
dedupe_enabled: bool,
6653
in_flight_requests: Arc<DashMap<u64, Arc<OnceCell<SharedResponse>>, ABuildHasher>>,
67-
timeout: DurationOrProgram,
6854
) -> Self {
6955
let mut header_map = HeaderMap::new();
7056
header_map.insert(
@@ -84,13 +70,12 @@ impl HTTPSubgraphExecutor {
8470
semaphore,
8571
dedupe_enabled,
8672
in_flight_requests,
87-
timeout,
8873
}
8974
}
9075

9176
fn build_request_body(
9277
&self,
93-
execution_request: &HttpExecutionRequest<'_, '_>,
78+
execution_request: &HttpExecutionRequest<'_>,
9479
) -> Result<Vec<u8>, SubgraphExecutorError> {
9580
let mut body = Vec::with_capacity(4096);
9681
body.put(FIRST_QUOTE_STR);
@@ -151,7 +136,6 @@ impl HTTPSubgraphExecutor {
151136
&self,
152137
body: Vec<u8>,
153138
headers: HeaderMap,
154-
client_request: &ClientRequestDetails<'_, '_>,
155139
) -> Result<SharedResponse, SubgraphExecutorError> {
156140
let mut req = hyper::Request::builder()
157141
.method(http::Method::POST)
@@ -166,75 +150,9 @@ impl HTTPSubgraphExecutor {
166150

167151
debug!("making http request to {}", self.endpoint.to_string());
168152

169-
let timeout = match &self.timeout {
170-
DurationOrProgram::Duration(dur) => *dur,
171-
DurationOrProgram::Program(program) => {
172-
let value =
173-
VrlValue::Object(BTreeMap::from([("request".into(), client_request.into())]));
174-
let result = execute_expression_with_value(program, value).map_err(|err| {
175-
SubgraphExecutorError::TimeoutExpressionResolution(
176-
self.subgraph_name.to_string(),
177-
err.to_string(),
178-
)
179-
})?;
180-
match result {
181-
VrlValue::Integer(i) => {
182-
if i < 0 {
183-
return Err(SubgraphExecutorError::TimeoutExpressionResolution(
184-
self.subgraph_name.to_string(),
185-
"Timeout expression resolved to a negative integer".to_string(),
186-
));
187-
}
188-
std::time::Duration::from_millis(i as u64)
189-
}
190-
VrlValue::Float(f) => {
191-
let f = f.into_inner();
192-
if f < 0.0 {
193-
return Err(SubgraphExecutorError::TimeoutExpressionResolution(
194-
self.subgraph_name.to_string(),
195-
"Timeout expression resolved to a negative float".to_string(),
196-
));
197-
}
198-
std::time::Duration::from_millis(f as u64)
199-
}
200-
VrlValue::Bytes(b) => {
201-
let s = std::str::from_utf8(&b).map_err(|e| {
202-
SubgraphExecutorError::TimeoutExpressionResolution(
203-
self.subgraph_name.to_string(),
204-
format!("Failed to parse duration string from bytes: {}", e),
205-
)
206-
})?;
207-
humantime::parse_duration(s).map_err(|e| {
208-
SubgraphExecutorError::TimeoutExpressionResolution(
209-
self.subgraph_name.to_string(),
210-
format!("Failed to parse duration string '{}': {}", s, e),
211-
)
212-
})?
213-
}
214-
other => {
215-
return Err(SubgraphExecutorError::TimeoutExpressionResolution(
216-
self.subgraph_name.to_string(),
217-
format!(
218-
"Timeout expression resolved to an unexpected type: {}. Expected a non-negative integer/float (ms) or a duration string.",
219-
other.kind()
220-
),
221-
));
222-
}
223-
}
224-
}
225-
};
226-
227-
let res = tokio::time::timeout(timeout, self.http_client.request(req))
228-
.await
229-
.map_err(|_| {
230-
SubgraphExecutorError::RequestTimeout(
231-
self.endpoint.to_string(),
232-
timeout.as_millis() as u64,
233-
)
234-
})?
235-
.map_err(|e| {
236-
SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string())
237-
})?;
153+
let res = self.http_client.request(req).await.map_err(|e| {
154+
SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string())
155+
})?;
238156

239157
debug!(
240158
"http request to {} completed, status: {}",
@@ -291,9 +209,9 @@ impl HTTPSubgraphExecutor {
291209
#[async_trait]
292210
impl SubgraphExecutor for HTTPSubgraphExecutor {
293211
#[tracing::instrument(skip_all, fields(subgraph_name = %self.subgraph_name))]
294-
async fn execute<'exec, 'req>(
212+
async fn execute<'a>(
295213
&self,
296-
execution_request: HttpExecutionRequest<'exec, 'req>,
214+
execution_request: HttpExecutionRequest<'a>,
297215
) -> HttpExecutionResponse {
298216
let body = match self.build_request_body(&execution_request) {
299217
Ok(body) => body,
@@ -315,10 +233,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
315233
// This unwrap is safe because the semaphore is never closed during the application's lifecycle.
316234
// `acquire()` only fails if the semaphore is closed, so this will always return `Ok`.
317235
let _permit = self.semaphore.acquire().await.unwrap();
318-
return match self
319-
._send_request(body, headers, execution_request.client_request)
320-
.await
321-
{
236+
return match self._send_request(body, headers).await {
322237
Ok(shared_response) => HttpExecutionResponse {
323238
body: shared_response.body,
324239
headers: shared_response.headers,
@@ -350,8 +265,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
350265
// This unwrap is safe because the semaphore is never closed during the application's lifecycle.
351266
// `acquire()` only fails if the semaphore is closed, so this will always return `Ok`.
352267
let _permit = self.semaphore.acquire().await.unwrap();
353-
self._send_request(body, headers, execution_request.client_request)
354-
.await
268+
self._send_request(body, headers).await
355269
};
356270
// It's important to remove the entry from the map before returning the result.
357271
// This ensures that once the OnceCell is set, no future requests can join it.

0 commit comments

Comments
 (0)