Skip to content

Commit

Permalink
Don't capture signals within lib (#991)
Browse files Browse the repository at this point in the history
Co-authored-by: Brandon Kite <brandonkite92@gmail.com>
  • Loading branch information
freesig and Voxelot authored Feb 7, 2023
1 parent 6257fc2 commit e55a260
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 47 deletions.
38 changes: 37 additions & 1 deletion bin/fuel-core/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,14 @@ pub async fn exec(command: Command) -> anyhow::Result<()> {
// initialize the server
let server = FuelService::new_node(config).await?;
// pause the main task while service is running
server.await_stop().await?;
tokio::select! {
result = server.await_stop() => {
result?;
}
_ = shutdown_signal() => {}
}

server.stop_and_await().await?;

Ok(())
}
Expand All @@ -281,3 +288,32 @@ fn load_consensus_key(
Ok(None)
}
}

async fn shutdown_signal() -> anyhow::Result<()> {
#[cfg(unix)]
{
let mut sigterm =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;

let mut sigint =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?;
loop {
tokio::select! {
_ = sigterm.recv() => {
tracing::info!("sigterm received");
break;
}
_ = sigint.recv() => {
tracing::log::info!("sigint received");
break;
}
}
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c().await?;
tracing::log::info!("CTRL+C received");
}
Ok(())
}
35 changes: 1 addition & 34 deletions crates/fuel-core/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use fuel_core_services::{
State,
StateWatcher,
};
use std::{
net::SocketAddr,
panic,
};
use std::net::SocketAddr;
use tracing::log::warn;

pub use config::{
Expand Down Expand Up @@ -221,7 +218,6 @@ impl RunnableTask for Task {
for service in &self.services {
stop_signals.push(service.await_stop())
}
stop_signals.push(Box::pin(shutdown_signal()));
stop_signals.push(Box::pin(watcher.while_started()));

let (result, _, _) = futures::future::select_all(stop_signals).await;
Expand All @@ -247,35 +243,6 @@ impl RunnableTask for Task {
}
}

async fn shutdown_signal() -> anyhow::Result<State> {
#[cfg(unix)]
{
let mut sigterm =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;

let mut sigint =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?;
loop {
tokio::select! {
_ = sigterm.recv() => {
tracing::info!("sigterm received");
break;
}
_ = sigint.recv() => {
tracing::log::info!("sigint received");
break;
}
}
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c().await?;
tracing::log::info!("CTRL+C received");
}
Ok(State::Stopped)
}

#[cfg(test)]
mod tests {
use crate::service::{
Expand Down
22 changes: 12 additions & 10 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,17 @@ pub fn init_sub_services(
let poa_adapter = PoAAdapter::new(poa.shared.clone());

#[cfg(feature = "p2p")]
let sync = {
fuel_core_sync::service::new_service(
last_height,
p2p_adapter,
importer_adapter.clone(),
verifier,
config.sync,
)?
};
let sync = (!production_enabled)
.then(|| {
fuel_core_sync::service::new_service(
last_height,
p2p_adapter,
importer_adapter.clone(),
verifier,
config.sync,
)
})
.transpose()?;

// TODO: Figure out on how to move it into `fuel-core-graphql-api`.
let schema = dap::init(
Expand Down Expand Up @@ -194,7 +196,7 @@ pub fn init_sub_services(
#[cfg(feature = "p2p")]
{
services.push(Box::new(network));
if !production_enabled {
if let Some(sync) = sync {
services.push(Box::new(sync));
}
}
Expand Down
10 changes: 9 additions & 1 deletion crates/services/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ where
}
}

#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, fields(service = S::NAME))]
/// Initialize the background loop as a spawned task.
fn initialize_loop<S>(service: S) -> Shared<watch::Sender<State>>
where
Expand All @@ -230,7 +230,9 @@ where
// Spawned as a task to check if the service is already running and to capture any panics.
tokio::task::spawn(
async move {
tracing::debug!("running");
let join_handler = run(service, stop_sender.clone());
tracing::debug!("awaiting run");
let result = join_handler.await;

let stopped_state = if let Err(e) = result {
Expand All @@ -239,11 +241,15 @@ where
State::Stopped
};

tracing::debug!("shutting down {:?}", stopped_state);

let _ = stop_sender.send_if_modified(|state| {
if !state.stopped() {
*state = stopped_state;
tracing::debug!("Wasn't stopped, so sent stop.");
true
} else {
tracing::debug!("Was already stopped.");
false
}
});
Expand Down Expand Up @@ -294,8 +300,10 @@ where
match task.run(&mut state).await {
Ok(should_continue) => {
if !should_continue {
tracing::debug!("stopping");
return
}
tracing::debug!("run loop");
}
Err(e) => {
let e: &dyn std::error::Error = &*e;
Expand Down
2 changes: 2 additions & 0 deletions crates/services/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ impl StateWatcher {
}

impl StateWatcher {
#[tracing::instrument(level = "debug", skip(self), err, ret)]
/// Infinity loop while the state is `State::Started`. Returns the next received state.
pub async fn while_started(&mut self) -> anyhow::Result<State> {
loop {
let state = self.borrow().clone();
if !state.started() {
return Ok(state)
}
tracing::debug!("Service is started, waiting for the next state...");

self.changed().await?;
}
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn can_submit_genesis_message() {
assert!(
matches!(status, TransactionStatus::Success { .. }),
"expected success, received {status:?}",
)
);
}

#[tokio::test]
Expand Down

0 comments on commit e55a260

Please sign in to comment.