Skip to content

Commit

Permalink
make into handle
Browse files Browse the repository at this point in the history
  • Loading branch information
freesig committed Nov 9, 2022
1 parent 2facbc6 commit 9832e8e
Showing 1 changed file with 61 additions and 4 deletions.
65 changes: 61 additions & 4 deletions fuel-core/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ pub mod metrics;
pub mod modules;

pub struct FuelService {
handle: JoinHandle<()>,
/// Shutdown the fuel service.
shutdown: oneshot::Sender<()>,
/// The address bound by the system for serving the API
pub bound_address: SocketAddr,
}

struct FuelServiceInner {
tasks: Vec<JoinHandle<Result<(), AnyError>>>,
/// handler for all modules.
modules: Modules,
Expand All @@ -47,7 +55,33 @@ impl FuelService {
_ => Database::in_memory(),
};
// initialize service
Self::init_service(database, config).await
Ok(Self::spawn_service(
Self::init_service(database, config).await?,
))
}

fn spawn_service(service: FuelServiceInner) -> Self {
let bound_address = service.bound_address.clone();
let (shutdown, stop_rx) = oneshot::channel();
let handle = tokio::spawn(async move {
let run_fut = service.run();
let shutdown_fut = stop_rx.then(|stop| async move {
if stop.is_error() {
// If the handle is dropped we don't want
// this to ever shutdown the service.
futures::future::pending().await;
}
// Only a successful recv results in a shutdown.
});
tokio::pin!(run_fut);
tokio::pin!(shutdown_fut);
futures::future::select(shutdown_fut, run_fut).await;
});
Self {
handle,
shutdown,
bound_address,
}
}

// TODO: Rework our configs system to avoid nesting of the same configs.
Expand All @@ -71,11 +105,16 @@ impl FuelService {
database: Database,
config: Config,
) -> Result<Self, AnyError> {
Self::init_service(database, config).await
Ok(Self::spawn_service(
Self::init_service(database, config).await?,
))
}

/// Private inner method for initializing the fuel service
async fn init_service(database: Database, config: Config) -> Result<Self, AnyError> {
async fn init_service(
database: Database,
config: Config,
) -> Result<FuelServiceInner, AnyError> {
// initialize state
Self::initialize_state(&config, &database)?;

Expand All @@ -91,14 +130,32 @@ impl FuelService {
// Socket is ignored for now, but as more services are added
// it may be helpful to have a way to list all services and their ports

Ok(FuelService {
Ok(FuelServiceInner {
tasks,
bound_address,
modules,
stop_graphql_api: stop_tx,
})
}

/// Awaits for the completion of any server background tasks
pub async fn run(self) {
if let Err(e) = self.handle.await {
if err.is_panic() {
// Resume the panic on the main task
panic::resume_unwind(err.into_panic());
}
}
}

/// Shutdown background tasks
pub async fn stop(self) {
let _ = self.shutdown.send(());
self.run();
}
}

impl FuelServiceInner {
/// Awaits for the completion of any server background tasks
pub async fn run(self) {
let Self {
Expand Down

0 comments on commit 9832e8e

Please sign in to comment.