Skip to content

Commit

Permalink
grpc support + add docs (wip)
Browse files Browse the repository at this point in the history
  • Loading branch information
thaodt committed Nov 6, 2023
1 parent 2754490 commit bbe1c52
Show file tree
Hide file tree
Showing 9 changed files with 421 additions and 3 deletions.
304 changes: 302 additions & 2 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ log = "0.4.20"
env_logger = "0.10.0"
stripmargin = "0.1.1"
ring = "0.17.5"
tonic = { version = "0.10.2", features = ["tls"] }
tonic-reflection = "0.10.2"
hyper-tls = "0.5.0"
tokio-stream = "0.1.14"

[dev-dependencies]
criterion = "0.5.1"
Expand Down
3 changes: 3 additions & 0 deletions src/blueprint/blueprint.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/*!
* define the structure of a blueprint, which could include schema definitions.
*/
use std::collections::{BTreeSet, HashMap};

use async_graphql::dynamic::{Schema, SchemaBuilder};
Expand Down
12 changes: 11 additions & 1 deletion src/blueprint/from_config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*!
* Parse configurations into an internal representation.
* transform the application's configuration into a blueprint for the GraphQL schema.
*/
#![allow(clippy::too_many_arguments)]

use std::collections::{BTreeMap, BTreeSet, HashMap};
Expand Down Expand Up @@ -56,6 +60,8 @@ pub fn config_blueprint<'a>() -> TryFold<'a, Config, Blueprint, String> {
.update(super::compress::compress)
}

/// parses and validates the upstream configuration,
/// such as the base URL for HTTP connections.
fn to_upstream<'a>() -> TryFold<'a, Config, Upstream, String> {
TryFoldConfig::<Upstream>::new(|config, up| {
let upstream = up.merge_right(config.upstream.clone());
Expand All @@ -82,6 +88,8 @@ pub fn apply_batching(mut blueprint: Blueprint) -> Blueprint {
blueprint
}

/// converts GraphQL directives from the configuration
/// into a valid Directive structure that can be used by the application.
fn to_directive(const_directive: ConstDirective) -> Valid<Directive, String> {
const_directive
.arguments
Expand All @@ -99,6 +107,8 @@ fn to_directive(const_directive: ConstDirective) -> Valid<Directive, String> {
.into()
}

/// transforms the GraphQL schema configuration into a SchemaDefinition,
/// validating the presence of the query root and handling server directives.
fn to_schema<'a>() -> TryFoldConfig<'a, SchemaDefinition> {
TryFoldConfig::new(|config, _| {
validate_query(config)
Expand All @@ -111,7 +121,7 @@ fn to_schema<'a>() -> TryFoldConfig<'a, SchemaDefinition> {
.map(|(query_type_name, directive)| SchemaDefinition {
query: query_type_name.to_owned(),
mutation: config.graphql.schema.mutation.clone(),
directives: vec![directive],
directives: vec![directive], //TODO: should check support grpc directive!
})
})
}
Expand Down
5 changes: 5 additions & 0 deletions src/blueprint/into_schema.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*!
*
* This module is responsible for converting configuration into a GraphQL schema.
*
*/
use std::borrow::Cow;
use std::sync::Arc;

Expand Down
3 changes: 3 additions & 0 deletions src/blueprint/server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/*!
* relate to server configuration or server-specific aspects of the blueprint.
*/
use std::collections::BTreeMap;
use std::net::{AddrParseError, IpAddr};

Expand Down
1 change: 1 addition & 0 deletions src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod reflection_client;
90 changes: 90 additions & 0 deletions src/grpc/reflection_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use hyper::http::uri::InvalidUri;
use hyper_tls::HttpsConnector;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Identity};
use tokio_stream::StreamExt;
use tonic_reflection::pb::{
server_reflection_client::ServerReflectionClient, server_reflection_request, server_reflection_response,
ListServiceResponse, ServerReflectionRequest,
};

/// Fetch service name and method name from a given grpc server dynamically
pub async fn fetch_service_schema(
address: &str,
service_name: &str,
/*tls_config: Option<(String, Identity)>*/
) -> Result<Vec<String>, tonic::Status> {
let https = HttpsConnector::new();
let channel = match Channel::builder(
address
.parse()
.map_err(|e: InvalidUri| tonic::Status::internal(e.to_string()))?,

Check warning on line 20 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L11-L20

Added lines #L11 - L20 were not covered by tests
)
.connect_with_connector(https)
.await

Check warning on line 23 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L22-L23

Added lines #L22 - L23 were not covered by tests
{
Ok(it) => it,
Err(err) => return Err(tonic::Status::internal(format!("Transport error: {}", err))),

Check warning on line 26 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L25-L26

Added lines #L25 - L26 were not covered by tests
};
// let channel = setup_channel(address, tls_config);

let mut client = ServerReflectionClient::new(channel);

let request = ServerReflectionRequest {
host: "".into(),
message_request: Some(server_reflection_request::MessageRequest::ListServices("".into())),
..Default::default()
};

Check warning on line 36 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L30-L36

Added lines #L30 - L36 were not covered by tests

let response = client
.server_reflection_info(tonic::Request::new(tokio_stream::once(request)))
.await?;

Check warning on line 40 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L38-L40

Added lines #L38 - L40 were not covered by tests

let mut response_stream = response.into_inner();

let mut services = Vec::new();

Check warning on line 44 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L42-L44

Added lines #L42 - L44 were not covered by tests

while let Some(result) = response_stream.next().await {
match result {
Ok(resp) => {
if let Some(msg_resp) = resp.message_response {
match msg_resp {

Check warning on line 50 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L46-L50

Added lines #L46 - L50 were not covered by tests
server_reflection_response::MessageResponse::ListServicesResponse(ListServiceResponse {
service: inner_services,

Check warning on line 52 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L52

Added line #L52 was not covered by tests
}) => {
for service in inner_services {
services.push(service.name);
}

Check warning on line 56 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L54-L56

Added lines #L54 - L56 were not covered by tests
}
_ => {}

Check warning on line 58 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L58

Added line #L58 was not covered by tests
}
}

Check warning on line 60 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L60

Added line #L60 was not covered by tests
}
Err(e) => {
eprintln!("Error fetching service schema: {}", e);
}

Check warning on line 64 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L62-L64

Added lines #L62 - L64 were not covered by tests
}
}

// Filter the services based on the provided service_name
services = services.into_iter().filter(|name| name == service_name).collect();

Ok(services)
}

Check warning on line 72 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L69-L72

Added lines #L69 - L72 were not covered by tests

//TODO: move this to utils, may change function signature! - util fns
pub async fn setup_channel(address: &str, tls_config: Option<(String, Identity)>) -> Result<Channel, tonic::Status> {
let mut endpoint = Endpoint::from_shared(address.to_string()).map_err(|e| tonic::Status::internal(e.to_string()))?;

Check warning on line 76 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L75-L76

Added lines #L75 - L76 were not covered by tests

if let Some((cert, identity)) = tls_config {
let cert = Certificate::from_pem(cert);
endpoint = endpoint.tls_config(ClientTlsConfig::new().identity(identity).ca_certificate(cert))
.map_err(|e| tonic::Status::internal(e.to_string()))?;
}

Check warning on line 82 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L78-L82

Added lines #L78 - L82 were not covered by tests

let channel = endpoint
.connect()
.await
.map_err(|err| tonic::Status::internal(format!("Transport error: {}", err)))?;

Check warning on line 87 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L84-L87

Added lines #L84 - L87 were not covered by tests

Ok(channel)
}

Check warning on line 90 in src/grpc/reflection_client.rs

View check run for this annotation

Codecov / codecov/patch

src/grpc/reflection_client.rs#L89-L90

Added lines #L89 - L90 were not covered by tests
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub mod document;
pub mod endpoint;
pub mod has_headers;
pub mod http;
pub mod grpc;

#[cfg(feature = "unsafe-js")]
pub mod javascript;
pub mod json;
Expand Down

0 comments on commit bbe1c52

Please sign in to comment.