Skip to content

Commit

Permalink
resolve #5200
Browse files Browse the repository at this point in the history
  • Loading branch information
leoyvens committed Mar 11, 2024
1 parent 3c03b3f commit 0c1b53a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
6 changes: 6 additions & 0 deletions graph/src/env/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ pub struct EnvVarsGraphQl {
/// header `X-GraphTraceQuery` set to this value will include a trace of
/// the SQL queries that were run.
pub query_trace_token: String,
/// Set by the env var `GRAPH_PARALLEL_BLOCK_CONSTRAINTS`
/// Whether to run top-level queries with different block constraints in parallel
pub parallel_block_constraints: bool,
}

// This does not print any values avoid accidentally leaking any sensitive env vars
Expand Down Expand Up @@ -138,6 +141,7 @@ impl From<InnerGraphQl> for EnvVarsGraphQl {
disable_bool_filters: x.disable_bool_filters.0,
disable_child_sorting: x.disable_child_sorting.0,
query_trace_token: x.query_trace_token,
parallel_block_constraints: x.parallel_block_constraints.0,
}
}
}
Expand Down Expand Up @@ -187,4 +191,6 @@ pub struct InnerGraphQl {
pub disable_child_sorting: EnvVarBoolean,
#[envconfig(from = "GRAPH_GRAPHQL_TRACE_TOKEN", default = "")]
query_trace_token: String,
#[envconfig(from = "GRAPH_PARALLEL_BLOCK_CONSTRAINTS", default = "false")]
pub parallel_block_constraints: EnvVarBoolean,
}
23 changes: 18 additions & 5 deletions graphql/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::metrics::GraphQLMetrics;
use crate::prelude::{QueryExecutionOptions, StoreResolver, SubscriptionExecutionOptions};
use crate::query::execute_query;
use crate::subscription::execute_prepared_subscription;
use graph::prelude::MetricsRegistry;
use graph::prelude::{futures03::future, MetricsRegistry};
use graph::{
components::store::SubscriptionManager,
prelude::{
Expand Down Expand Up @@ -145,10 +145,11 @@ where
let by_block_constraint = query.block_constraint()?;
let mut max_block = 0;
let mut result: QueryResults = QueryResults::empty();
let mut query_res_futures: Vec<_> = vec![];

// Note: This will always iterate at least once.
let query_start = Instant::now();
for (bc, (selection_set, error_policy)) in by_block_constraint {
let query_start = Instant::now();
let resolver = StoreResolver::at_block(
&self.logger,
store.cheap_clone(),
Expand All @@ -162,7 +163,7 @@ where
)
.await?;
max_block = max_block.max(resolver.block_number());
let query_res = execute_query(
query_res_futures.push(execute_query(
query.clone(),
Some(selection_set),
resolver.block_ptr.as_ref().map(Into::into).clone(),
Expand All @@ -173,8 +174,20 @@ where
max_skip: max_skip.unwrap_or(ENV_VARS.graphql.max_skip),
trace,
},
)
.await;
));
}

let results: Vec<_> = if ENV_VARS.graphql.parallel_block_constraints {
future::join_all(query_res_futures).await
} else {
let mut results = vec![];
for query_res_future in query_res_futures {
results.push(query_res_future.await);
}
results
};

for query_res in results {
query_res.trace.finish(query_start.elapsed());
result.append(query_res);
}
Expand Down

0 comments on commit 0c1b53a

Please sign in to comment.