Skip to content

Commit

Permalink
Refactor of GraphQL service and FuelService to use ServiceRunner (
Browse files Browse the repository at this point in the history
#875)

The final change of #860 epic.
Ref #809

Reworked `RunnableService` to be `RunnableService` and `RunnableTask`.
`RunnableService::initialize` replaced with the
`RunnableService::into_task` method. `into_task` returns a runnable task
that implements the `RunnableTask` trait. `into_task` may return another
type after initialization.

Updated all services to implement new traits. Implemented GraphQL
service via `ServiceRunner`. Extracted the graph QL logic into a
separate module(preparation to move this service into its own crate).

Re-used`ServiceRunner` for `FuelService`. Replaced `Modules` with
`SharedState` and `SubServices`.

Added a new `Starting` state of the service lifecycle. Added functions
to allow to await `Started` or `Stop` state.
  • Loading branch information
xgreenx authored Jan 5, 2023
1 parent 81d9ef8 commit a551442
Show file tree
Hide file tree
Showing 37 changed files with 1,092 additions and 797 deletions.
21 changes: 11 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion bin/fuel-core/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use fuel_core::{
config::default_consensus_dev_key,
Config,
DbType,
ServiceTrait,
VMConfig,
},
txpool::Config as TxPoolConfig,
Expand Down Expand Up @@ -208,7 +209,7 @@ pub async fn exec(command: Command) -> anyhow::Result<()> {
// initialize the server
let server = FuelService::new_node(config).await?;
// pause the main task while service is running
server.run().await;
server.await_stop().await?;

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ fuel-core-types = { path = "../types", version = "0.15.1", features = [
] }
futures = "0.3"
hex = { version = "0.4", features = ["serde"] }
hyper = { version = "0.14" }
itertools = "0.10"
num_cpus = "1.13"
primitive-types = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ use tracing::{
/// the transactions contained in the block and persist changes to the underlying database as needed.
/// In production mode, block fields like transaction commitments are set based on the executed txs.
/// In validation mode, the processed block commitments are compared with the proposed block.
#[derive(Clone, Debug)]
pub struct Executor {
pub database: Database,
pub config: Config,
Expand Down
21 changes: 21 additions & 0 deletions crates/fuel-core/src/graphql_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use fuel_core_types::{
blockchain::primitives::SecretKeyWrapper,
fuel_tx::ConsensusParameters,
secrecy::Secret,
};
use std::net::SocketAddr;

pub mod service;

#[derive(Clone, Debug)]
pub struct Config {
pub addr: SocketAddr,
pub utxo_validation: bool,
pub manual_blocks_enabled: bool,
pub vm_backtrace: bool,
pub min_gas_price: u64,
pub max_tx: usize,
pub max_depth: usize,
pub transaction_parameters: ConsensusParameters,
pub consensus_key: Option<Secret<SecretKeyWrapper>>,
}
223 changes: 223 additions & 0 deletions crates/fuel-core/src/graphql_api/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
use crate::{
graphql_api::Config,
schema::{
build_schema,
dap,
CoreSchema,
},
service::metrics::metrics,
};
use async_graphql::{
extensions::Tracing,
http::{
playground_source,
GraphQLPlaygroundConfig,
},
Request,
Response,
};
use axum::{
extract::{
DefaultBodyLimit,
Extension,
},
http::{
header::{
ACCESS_CONTROL_ALLOW_HEADERS,
ACCESS_CONTROL_ALLOW_METHODS,
ACCESS_CONTROL_ALLOW_ORIGIN,
},
HeaderValue,
},
response::{
sse::Event,
Html,
IntoResponse,
Sse,
},
routing::{
get,
post,
},
Json,
Router,
};
use fuel_core_services::{
RunnableService,
RunnableTask,
StateWatcher,
};
use futures::Stream;
use serde_json::json;
use std::{
future::Future,
net::{
SocketAddr,
TcpListener,
},
pin::Pin,
sync::Arc,
};
use tokio_stream::StreamExt;
use tower_http::{
set_header::SetResponseHeaderLayer,
trace::TraceLayer,
};

pub type Service = fuel_core_services::ServiceRunner<NotInitializedTask>;

// TODO: When the port of DB will exist we need to replace it with `Box<dyn DatabasePort>
pub type Database = crate::database::Database;
// TODO: When the port for `Executor` will exist we need to replace it with `Box<dyn ExecutorPort>
pub type Executor = crate::service::adapters::ExecutorAdapter;
// TODO: When the port of BlockProducer will exist we need to replace it with
// `Box<dyn BlockProducerPort>
pub type BlockProducer = Arc<fuel_core_producer::Producer<crate::database::Database>>;
// TODO: When the port of TxPool will exist we need to replace it with
// `Box<dyn TxPoolPort>. In the future GraphQL should not be aware of `TxPool`. It should
// use only `Database` to receive all information about
pub type TxPool = crate::service::sub_services::TxPoolService;

#[derive(Clone)]
pub struct SharedState {
pub bound_address: SocketAddr,
}

pub struct NotInitializedTask {
router: Router,
listener: TcpListener,
bound_address: SocketAddr,
}

pub struct Task {
// Ugly workaround because of https://github.com/hyperium/hyper/issues/2582
server: Pin<Box<dyn Future<Output = hyper::Result<()>> + Send + 'static>>,
}

#[async_trait::async_trait]
impl RunnableService for NotInitializedTask {
const NAME: &'static str = "GraphQL";

type SharedData = SharedState;
type Task = Task;

fn shared_data(&self) -> Self::SharedData {
SharedState {
bound_address: self.bound_address,
}
}

async fn into_task(self, state: &StateWatcher) -> anyhow::Result<Self::Task> {
let mut state = state.clone();
let server = axum::Server::from_tcp(self.listener)
.unwrap()
.serve(self.router.into_make_service())
.with_graceful_shutdown(async move {
loop {
state.changed().await.expect("The service is destroyed");

if !state.borrow().started() {
return
}
}
});

Ok(Task {
server: Box::pin(server),
})
}
}

#[async_trait::async_trait]
impl RunnableTask for Task {
async fn run(&mut self) -> anyhow::Result<bool> {
self.server.as_mut().await?;
// The `axum::Server` has its internal loop. If `await` is finished, we get an internal
// error or stop signal.
Ok(false /* should_continue */)
}
}

pub fn new_service(
config: Config,
database: Database,
producer: BlockProducer,
txpool: TxPool,
executor: Executor,
) -> anyhow::Result<Service> {
let network_addr = config.addr;
let params = config.transaction_parameters;
let schema = build_schema()
.data(config)
.data(database)
.data(producer)
.data(txpool)
.data(executor);
let schema = dap::init(schema, params).extension(Tracing).finish();

let router = Router::new()
.route("/playground", get(graphql_playground))
.route("/graphql", post(graphql_handler).options(ok))
.route(
"/graphql-sub",
post(graphql_subscription_handler).options(ok),
)
.route("/metrics", get(metrics))
.route("/health", get(health))
.layer(Extension(schema))
.layer(TraceLayer::new_for_http())
.layer(SetResponseHeaderLayer::<_>::overriding(
ACCESS_CONTROL_ALLOW_ORIGIN,
HeaderValue::from_static("*"),
))
.layer(SetResponseHeaderLayer::<_>::overriding(
ACCESS_CONTROL_ALLOW_METHODS,
HeaderValue::from_static("*"),
))
.layer(SetResponseHeaderLayer::<_>::overriding(
ACCESS_CONTROL_ALLOW_HEADERS,
HeaderValue::from_static("*"),
))
.layer(DefaultBodyLimit::disable());

let listener = TcpListener::bind(network_addr)?;
let bound_address = listener.local_addr()?;

tracing::info!("Binding GraphQL provider to {}", bound_address);

Ok(Service::new(NotInitializedTask {
router,
listener,
bound_address,
}))
}

async fn graphql_playground() -> impl IntoResponse {
Html(playground_source(GraphQLPlaygroundConfig::new("/graphql")))
}

async fn health() -> Json<serde_json::Value> {
Json(json!({ "up": true }))
}

async fn graphql_handler(
schema: Extension<CoreSchema>,
req: Json<Request>,
) -> Json<Response> {
schema.execute(req.0).await.into()
}

async fn graphql_subscription_handler(
schema: Extension<CoreSchema>,
req: Json<Request>,
) -> Sse<impl Stream<Item = anyhow::Result<Event, serde_json::Error>>> {
let stream = schema
.execute_stream(req.0)
.map(|r| Ok(Event::default().json_data(r).unwrap()));
Sse::new(stream)
.keep_alive(axum::response::sse::KeepAlive::new().text("keep-alive-text"))
}

async fn ok() -> anyhow::Result<(), ()> {
Ok(())
}
7 changes: 7 additions & 0 deletions crates/fuel-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,10 @@ pub mod resource_query;
pub mod schema;
pub mod service;
pub mod state;

// In the future this module will be a separate crate for `fuel-core-graphql-api`.
mod graphql_api;

pub mod fuel_core_graphql_api {
pub use crate::graphql_api::*;
}
12 changes: 5 additions & 7 deletions crates/fuel-core/src/schema/balance.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use crate::{
database::{
resource::{
AssetQuery,
AssetSpendTarget,
AssetsQuery,
},
Database,
database::resource::{
AssetQuery,
AssetSpendTarget,
AssetsQuery,
},
fuel_core_graphql_api::service::Database,
schema::scalars::{
Address,
AssetId,
Expand Down
Loading

0 comments on commit a551442

Please sign in to comment.