Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace old actix-web with axum #302

Merged
merged 3 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,678 changes: 926 additions & 752 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 12 additions & 6 deletions graph-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ name = "graph-gateway"
version = "12.0.1"

[dependencies]
actix-cors = "=0.6.0-beta.4"
actix-http = { version = "=3.0.0-beta.14", default-features = false }
actix-web = { version = "=4.0.0-beta.13", default-features = false }
actix-router = "=0.5.0-beta.4"
axum = { version = "0.6.15", default-features = false, features = [
"json",
"tokio",
] }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
futures = "0.3"
futures-util = "0.3"
Expand All @@ -23,7 +23,7 @@ parking_lot = "0.12.0"
prelude = { path = "../prelude" }
primitive-types = "0.12"
prometheus = { version = "0.13", default-features = false }
rdkafka = { version = "0.29", features = [ "gssapi-vendored", "ssl-vendored" ] }
rdkafka = { version = "0.29", features = ["gssapi-vendored", "ssl-vendored"] }
secp256k1 = { version = "0.24", default-features = false }
semver = "1.0"
serde = "1.0"
Expand All @@ -36,7 +36,13 @@ trust-dns-resolver = { version = "0.22", features = ["dns-over-rustls"] }
uuid = { version = "1.0", default-features = false, features = ["v4"] }
prost = "0.11.8"
thiserror = "1.0.40"
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "json", "parking_lot"] }
tower = "0.4.13"
tower-http = { version = "0.4.0", features = ["cors"] }
tracing-subscriber = { version = "0.3.16", features = [
"env-filter",
"json",
"parking_lot",
] }

[dev-dependencies]
regex = "1.5"
32 changes: 16 additions & 16 deletions graph-gateway/src/block_constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ pub fn block_constraints<'c>(context: &'c Context<'c>) -> Option<BTreeSet<BlockC
let defaults = query
.variable_definitions
.iter()
.filter(|d| !vars.0.contains_key(d.name))
.filter_map(|d| Some((d.name, d.default_value.as_ref()?.to_graphql())))
.collect::<BTreeMap<&str, StaticValue>>();
.filter(|d| !vars.0.contains_key(&d.name))
.filter_map(|d| Some((d.name.clone(), d.default_value.as_ref()?.to_graphql())))
.collect::<BTreeMap<String, StaticValue>>();
(&query.selection_set, defaults)
}
OperationDefinition::Query(_)
Expand Down Expand Up @@ -88,9 +88,9 @@ pub fn make_query_deterministic(
let defaults = query
.variable_definitions
.iter()
.filter(|d| !vars.0.contains_key(d.name))
.filter_map(|d| Some((d.name, d.default_value.as_ref()?.to_graphql())))
.collect::<BTreeMap<&str, StaticValue>>();
.filter(|d| !vars.0.contains_key(&d.name))
.filter_map(|d| Some((d.name.clone(), d.default_value.as_ref()?.to_graphql())))
.collect::<BTreeMap<String, StaticValue>>();
(&mut query.selection_set, defaults)
}
OperationDefinition::Query(_)
Expand Down Expand Up @@ -123,7 +123,7 @@ pub fn make_query_deterministic(
None => {
selection_field
.arguments
.push(("block", deterministic_block(&latest.hash)));
.push(("block".to_string(), deterministic_block(&latest.hash)));
}
};
}
Expand All @@ -146,21 +146,21 @@ pub fn make_query_deterministic(
.ok()
}

fn deterministic_block<'c>(hash: &Bytes32) -> Value<'c, &'c str> {
fn deterministic_block<'c>(hash: &Bytes32) -> Value<'c, String> {
Value::Object(BTreeMap::from_iter([(
"hash",
"hash".to_string(),
Value::String(hash.to_string()),
)]))
}

fn field_constraint<'c>(
fn field_constraint(
vars: &QueryVariables,
defaults: &BTreeMap<&str, StaticValue>,
field: &Value<'c, &'c str>,
defaults: &BTreeMap<String, StaticValue>,
field: &Value<'_, String>,
) -> Option<BlockConstraint> {
match field {
Value::Object(fields) => parse_constraint(vars, defaults, fields),
Value::Variable(name) => match vars.get(*name)? {
Value::Variable(name) => match vars.get(name)? {
Value::Object(fields) => parse_constraint(vars, defaults, fields),
_ => None,
},
Expand All @@ -170,7 +170,7 @@ fn field_constraint<'c>(

fn parse_constraint<'c, T: Text<'c>>(
vars: &QueryVariables,
defaults: &BTreeMap<&str, StaticValue>,
defaults: &BTreeMap<String, StaticValue>,
fields: &BTreeMap<T::Value, Value<'c, T>>,
) -> Option<BlockConstraint> {
let field = fields.iter().at_most_one().ok()?;
Expand All @@ -190,7 +190,7 @@ fn parse_constraint<'c, T: Text<'c>>(
fn parse_hash<'c, T: Text<'c>>(
hash: &Value<'c, T>,
variables: &QueryVariables,
defaults: &BTreeMap<&str, StaticValue>,
defaults: &BTreeMap<String, StaticValue>,
) -> Option<Bytes32> {
match hash {
Value::String(hash) => hash.parse().ok(),
Expand All @@ -208,7 +208,7 @@ fn parse_hash<'c, T: Text<'c>>(
fn parse_number<'c, T: Text<'c>>(
number: &Value<'c, T>,
variables: &QueryVariables,
defaults: &BTreeMap<&str, StaticValue>,
defaults: &BTreeMap<String, StaticValue>,
) -> Option<u64> {
let n = match number {
Value::Int(n) => n,
Expand Down
1 change: 0 additions & 1 deletion graph-gateway/src/chains/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ pub mod ethereum;
pub mod test;

use crate::{block_constraints::*, metrics::*};
use actix_web::Result;
use indexer_selection::UnresolvedBlock;
use prelude::{epoch_cache::EpochCache, tokio::time::interval, *};
use std::collections::{BTreeSet, HashMap};
Expand Down
84 changes: 45 additions & 39 deletions graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ use crate::{
MISCATEGORIZED_ATTESTABLE_ERROR_MESSAGE_FRAGMENTS, UNATTESTABLE_ERROR_MESSAGE_FRAGMENTS,
},
};
use actix_http::{
header::{AUTHORIZATION, ORIGIN},
StatusCode,
use axum::{
body::Bytes,
extract::{Path, State},
http::{header, HeaderMap, Response, StatusCode},
Json,
};
use actix_web::{http::header, web, HttpRequest, HttpResponse, HttpResponseBuilder};
use futures::future::join_all;
use indexer_selection::{
actor::Update, BlockRequirements, Context as AgoraContext,
Expand All @@ -30,13 +31,11 @@ use prelude::{
anyhow::{anyhow, bail, Context as _},
buffer_queue::QueueWriter,
double_buffer::DoubleBufferReader,
graphql::{
graphql_parser::query::{OperationDefinition, SelectionSet},
http::Response,
},
graphql::graphql_parser::query::{OperationDefinition, SelectionSet},
url::Url,
DeploymentId, Eventual, *,
};
use prost::bytes::Buf;
use serde::Deserialize;
use serde_json::value::RawValue;
use std::{
Expand Down Expand Up @@ -108,18 +107,20 @@ pub struct QueryBody {
}

pub async fn handle_query(
request: HttpRequest,
payload: web::Json<QueryBody>,
ctx: web::Data<Context>,
) -> HttpResponse {
State(ctx): State<Context>,
Path(params): Path<BTreeMap<String, String>>,
headers: HeaderMap,
payload: Bytes,
) -> Response<String> {
let start_time_ms = unix_timestamp();
let headers = request.headers();
let ray_id = headers.get("cf-ray").and_then(|value| value.to_str().ok());
let query_id = ray_id.map(ToString::to_string).unwrap_or_else(query_id);

let auth = match (
request.match_info().get("api_key"),
headers.get(AUTHORIZATION).and_then(|h| h.to_str().ok()),
params.get("api_key"),
headers
.get(header::AUTHORIZATION)
.and_then(|h| h.to_str().ok()),
) {
(Some(param), _) => param,
(None, Some(header)) => header.trim_start_matches("Bearer").trim(),
Expand All @@ -131,12 +132,8 @@ pub async fn handle_query(
.parse_token(auth)
.context("Invalid API key");

let subgraph_resolution_result = resolve_subgraph_deployment(
&ctx.subgraph_deployments,
&ctx.subgraph_info,
request.match_info(),
)
.await;
let subgraph_resolution_result =
resolve_subgraph_deployment(&ctx.subgraph_deployments, &ctx.subgraph_info, &params).await;
let deployment = subgraph_resolution_result
.as_ref()
.map(|subgraph_info| subgraph_info.deployment.to_string())
Expand All @@ -150,14 +147,14 @@ pub async fn handle_query(
);

let domain = headers
.get(ORIGIN)
.get(header::ORIGIN)
.and_then(|v| v.to_str().ok())
.and_then(|v| Some(v.parse::<Url>().ok()?.host_str()?.to_string()))
.unwrap_or("".to_string());

let result = match (auth, subgraph_resolution_result) {
(Ok(auth), Ok(subgraph_info)) => {
handle_client_query_inner(&ctx, subgraph_info, payload.0, auth, domain)
handle_client_query_inner(&ctx, subgraph_info, payload, auth, domain)
.instrument(span.clone())
.await
}
Expand All @@ -181,26 +178,31 @@ pub async fn handle_query(
);
});

let mut response_builder = HttpResponseBuilder::new(StatusCode::OK);
response_builder.insert_header(header::ContentType::json());
let response = Response::builder().header(header::CONTENT_TYPE, "application/json");
match result {
Ok(ResponsePayload { body, attestation }) => {
let attestation = attestation
.as_ref()
.and_then(|attestation| serde_json::to_string(attestation).ok())
.unwrap_or_default();
response_builder
.insert_header(("Graph-Attestation", attestation))
.body(body.as_ref())
response
.header("Graph-Attestation", attestation)
.body(body.to_string())
.unwrap()
}
Err(err) => {
let (_, Json(body)) = graphql_error_response(err.to_string());
response
.body(serde_json::to_string(&body).unwrap())
.unwrap()
}
Err(err) => graphql_error_response(err.to_string()),
}
}

async fn resolve_subgraph_deployment(
deployments: &SubgraphDeployments,
subgraph_info: &SubgraphInfoMap,
params: &actix_web::dev::Path<actix_web::dev::Url>,
params: &BTreeMap<String, String>,
) -> Result<Ptr<SubgraphInfo>, Error> {
let deployment = if let Some(id) = params.get("subgraph_id") {
let subgraph = id
Expand Down Expand Up @@ -228,7 +230,7 @@ async fn resolve_subgraph_deployment(
async fn handle_client_query_inner(
ctx: &Context,
subgraph_info: Ptr<SubgraphInfo>,
payload: QueryBody,
payload: Bytes,
auth: AuthToken,
domain: String,
) -> Result<ResponsePayload, Error> {
Expand All @@ -252,6 +254,9 @@ async fn handle_client_query_inner(
),
};

let payload: QueryBody =
serde_json::from_reader(payload.reader()).map_err(|err| Error::InvalidQuery(err.into()))?;

ctx.auth_handler
.check_token(&auth, &subgraph_info, &domain)
.await
Expand Down Expand Up @@ -613,13 +618,14 @@ async fn handle_indexer_query_inner(
tracing::warn!(indexer_response_status = %response.status);
}

let indexer_errors = serde_json::from_str::<Response<Box<RawValue>>>(&response.payload.body)
.map_err(|_| IndexerError::UnexpectedPayload)?
.errors
.unwrap_or_default()
.into_iter()
.map(|err| err.message)
.collect::<Vec<String>>();
let indexer_errors =
serde_json::from_str::<graphql::http::Response<Box<RawValue>>>(&response.payload.body)
.map_err(|_| IndexerError::UnexpectedPayload)?
.errors
.unwrap_or_default()
.into_iter()
.map(|err| err.message)
.collect::<Vec<String>>();

tracing::info!(
target: reports::INDEXER_QUERY_TARGET,
Expand Down Expand Up @@ -732,6 +738,6 @@ fn count_top_level_selection_sets(ctx: &AgoraContext) -> anyhow::Result<usize> {
OperationDefinition::Mutation(_) => bail!("Mutations not yet supported"),
OperationDefinition::Subscription(_) => bail!("Subscriptions not yet supported"),
})
.collect::<anyhow::Result<Vec<&SelectionSet<&str>>>>()?;
.collect::<anyhow::Result<Vec<&SelectionSet<String>>>>()?;
Ok(selection_sets.into_iter().map(|set| set.items.len()).sum())
}
12 changes: 4 additions & 8 deletions graph-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub struct Config {
/// IPFS endpoint with access to the subgraph files
#[serde_as(as = "DisplayFromStr")]
pub ipfs: Url,
/// IP rate limit per window
pub ip_rate_limit: u16,
/// Duration of IP rate limiting window in seconds
pub ip_rate_limit_window_secs: u8,
/// See https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
#[serde(default)]
pub kafka: KafkaConfig,
Expand All @@ -44,14 +48,6 @@ pub struct Config {
pub port_metrics: u16,
pub query_budget_discount: f64,
pub query_budget_scale: f64,
/// Duration of API rate limiting window in seconds
pub rate_limit_api_window_secs: u8,
/// API rate limit per window
pub rate_limit_api_limit: u16,
/// Duration of IP rate limiting window in seconds
pub rate_limit_ip_window_secs: u8,
/// IP rate limit per window
pub rate_limit_ip_limit: u16,
pub restricted_deployments: Option<PathBuf>,
/// Mnemonic for voucher signing
#[serde_as(as = "DisplayFromStr")]
Expand Down
Loading