1- use std:: collections:: BTreeMap ;
21use std:: sync:: Arc ;
3- use std:: time:: Duration ;
42
5- use crate :: execution:: client_request_details:: ClientRequestDetails ;
63use crate :: executors:: common:: HttpExecutionResponse ;
74use crate :: executors:: dedupe:: { request_fingerprint, ABuildHasher , SharedResponse } ;
8- use crate :: utils:: expression:: execute_expression_with_value;
95use dashmap:: DashMap ;
106use tokio:: sync:: OnceCell ;
117
@@ -21,8 +17,6 @@ use hyper_tls::HttpsConnector;
2117use hyper_util:: client:: legacy:: { connect:: HttpConnector , Client } ;
2218use tokio:: sync:: Semaphore ;
2319use tracing:: debug;
24- use vrl:: compiler:: Program as VrlProgram ;
25- use vrl:: core:: Value as VrlValue ;
2620
2721use crate :: executors:: common:: HttpExecutionRequest ;
2822use crate :: executors:: error:: SubgraphExecutorError ;
@@ -33,12 +27,6 @@ use crate::utils::consts::COMMA;
3327use crate :: utils:: consts:: QUOTE ;
3428use crate :: { executors:: common:: SubgraphExecutor , json_writer:: write_and_escape_string} ;
3529
36- #[ derive( Debug ) ]
37- pub enum DurationOrProgram {
38- Duration ( Duration ) ,
39- Program ( Box < VrlProgram > ) ,
40- }
41-
4230#[ derive( Debug ) ]
4331pub struct HTTPSubgraphExecutor {
4432 pub subgraph_name : String ,
@@ -48,7 +36,6 @@ pub struct HTTPSubgraphExecutor {
4836 pub semaphore : Arc < Semaphore > ,
4937 pub dedupe_enabled : bool ,
5038 pub in_flight_requests : Arc < DashMap < u64 , Arc < OnceCell < SharedResponse > > , ABuildHasher > > ,
51- pub timeout : DurationOrProgram ,
5239}
5340
5441const FIRST_VARIABLE_STR : & [ u8 ] = b",\" variables\" :{" ;
@@ -64,7 +51,6 @@ impl HTTPSubgraphExecutor {
6451 semaphore : Arc < Semaphore > ,
6552 dedupe_enabled : bool ,
6653 in_flight_requests : Arc < DashMap < u64 , Arc < OnceCell < SharedResponse > > , ABuildHasher > > ,
67- timeout : DurationOrProgram ,
6854 ) -> Self {
6955 let mut header_map = HeaderMap :: new ( ) ;
7056 header_map. insert (
@@ -84,13 +70,12 @@ impl HTTPSubgraphExecutor {
8470 semaphore,
8571 dedupe_enabled,
8672 in_flight_requests,
87- timeout,
8873 }
8974 }
9075
9176 fn build_request_body (
9277 & self ,
93- execution_request : & HttpExecutionRequest < ' _ , ' _ > ,
78+ execution_request : & HttpExecutionRequest < ' _ > ,
9479 ) -> Result < Vec < u8 > , SubgraphExecutorError > {
9580 let mut body = Vec :: with_capacity ( 4096 ) ;
9681 body. put ( FIRST_QUOTE_STR ) ;
@@ -151,7 +136,6 @@ impl HTTPSubgraphExecutor {
151136 & self ,
152137 body : Vec < u8 > ,
153138 headers : HeaderMap ,
154- client_request : & ClientRequestDetails < ' _ , ' _ > ,
155139 ) -> Result < SharedResponse , SubgraphExecutorError > {
156140 let mut req = hyper:: Request :: builder ( )
157141 . method ( http:: Method :: POST )
@@ -166,75 +150,9 @@ impl HTTPSubgraphExecutor {
166150
167151 debug ! ( "making http request to {}" , self . endpoint. to_string( ) ) ;
168152
169- let timeout = match & self . timeout {
170- DurationOrProgram :: Duration ( dur) => * dur,
171- DurationOrProgram :: Program ( program) => {
172- let value =
173- VrlValue :: Object ( BTreeMap :: from ( [ ( "request" . into ( ) , client_request. into ( ) ) ] ) ) ;
174- let result = execute_expression_with_value ( program, value) . map_err ( |err| {
175- SubgraphExecutorError :: TimeoutExpressionResolution (
176- self . subgraph_name . to_string ( ) ,
177- err. to_string ( ) ,
178- )
179- } ) ?;
180- match result {
181- VrlValue :: Integer ( i) => {
182- if i < 0 {
183- return Err ( SubgraphExecutorError :: TimeoutExpressionResolution (
184- self . subgraph_name . to_string ( ) ,
185- "Timeout expression resolved to a negative integer" . to_string ( ) ,
186- ) ) ;
187- }
188- std:: time:: Duration :: from_millis ( i as u64 )
189- }
190- VrlValue :: Float ( f) => {
191- let f = f. into_inner ( ) ;
192- if f < 0.0 {
193- return Err ( SubgraphExecutorError :: TimeoutExpressionResolution (
194- self . subgraph_name . to_string ( ) ,
195- "Timeout expression resolved to a negative float" . to_string ( ) ,
196- ) ) ;
197- }
198- std:: time:: Duration :: from_millis ( f as u64 )
199- }
200- VrlValue :: Bytes ( b) => {
201- let s = std:: str:: from_utf8 ( & b) . map_err ( |e| {
202- SubgraphExecutorError :: TimeoutExpressionResolution (
203- self . subgraph_name . to_string ( ) ,
204- format ! ( "Failed to parse duration string from bytes: {}" , e) ,
205- )
206- } ) ?;
207- humantime:: parse_duration ( s) . map_err ( |e| {
208- SubgraphExecutorError :: TimeoutExpressionResolution (
209- self . subgraph_name . to_string ( ) ,
210- format ! ( "Failed to parse duration string '{}': {}" , s, e) ,
211- )
212- } ) ?
213- }
214- other => {
215- return Err ( SubgraphExecutorError :: TimeoutExpressionResolution (
216- self . subgraph_name . to_string ( ) ,
217- format ! (
218- "Timeout expression resolved to an unexpected type: {}. Expected a non-negative integer/float (ms) or a duration string." ,
219- other. kind( )
220- ) ,
221- ) ) ;
222- }
223- }
224- }
225- } ;
226-
227- let res = tokio:: time:: timeout ( timeout, self . http_client . request ( req) )
228- . await
229- . map_err ( |_| {
230- SubgraphExecutorError :: RequestTimeout (
231- self . endpoint . to_string ( ) ,
232- timeout. as_millis ( ) as u64 ,
233- )
234- } ) ?
235- . map_err ( |e| {
236- SubgraphExecutorError :: RequestFailure ( self . endpoint . to_string ( ) , e. to_string ( ) )
237- } ) ?;
153+ let res = self . http_client . request ( req) . await . map_err ( |e| {
154+ SubgraphExecutorError :: RequestFailure ( self . endpoint . to_string ( ) , e. to_string ( ) )
155+ } ) ?;
238156
239157 debug ! (
240158 "http request to {} completed, status: {}" ,
@@ -291,9 +209,9 @@ impl HTTPSubgraphExecutor {
291209#[ async_trait]
292210impl SubgraphExecutor for HTTPSubgraphExecutor {
293211 #[ tracing:: instrument( skip_all, fields( subgraph_name = %self . subgraph_name) ) ]
294- async fn execute < ' exec , ' req > (
212+ async fn execute < ' a > (
295213 & self ,
296- execution_request : HttpExecutionRequest < ' exec , ' req > ,
214+ execution_request : HttpExecutionRequest < ' a > ,
297215 ) -> HttpExecutionResponse {
298216 let body = match self . build_request_body ( & execution_request) {
299217 Ok ( body) => body,
@@ -315,10 +233,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
315233 // This unwrap is safe because the semaphore is never closed during the application's lifecycle.
316234 // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`.
317235 let _permit = self . semaphore . acquire ( ) . await . unwrap ( ) ;
318- return match self
319- . _send_request ( body, headers, execution_request. client_request )
320- . await
321- {
236+ return match self . _send_request ( body, headers) . await {
322237 Ok ( shared_response) => HttpExecutionResponse {
323238 body : shared_response. body ,
324239 headers : shared_response. headers ,
@@ -350,8 +265,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
350265 // This unwrap is safe because the semaphore is never closed during the application's lifecycle.
351266 // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`.
352267 let _permit = self . semaphore . acquire ( ) . await . unwrap ( ) ;
353- self . _send_request ( body, headers, execution_request. client_request )
354- . await
268+ self . _send_request ( body, headers) . await
355269 } ;
356270 // It's important to remove the entry from the map before returning the result.
357271 // This ensures that once the OnceCell is set, no future requests can join it.
0 commit comments