Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/graceful shutdown #117

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ deadpool = { version = "0.12", features = ["rt_tokio_1"] }
deadpool-lapin = "0.12"
teloxide = "0.12"
serenity = { version = "0.12", features = ["client", "framework"] }
once_cell = "1.19.0"

# build
jemallocator = { version = "0.5.0", optional = true }
Expand Down
67 changes: 35 additions & 32 deletions core/src/api/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ pub async fn start_graphql_server(
settings.disable_advanced_filters,
);

setup_ctrlc_handler(Arc::new(Mutex::new(None::<Child>)));
// Do not need now with the main shutdown keeping around in-case
// setup_ctrlc_handler(Arc::new(Mutex::new(None::<Child>)));

// Wait for the initial server startup
let pid = rx.await.map_err(|e| {
Expand Down Expand Up @@ -273,23 +274,6 @@ async fn start_server(
.map_err(|e| e.to_string())
}

fn setup_ctrlc_handler(child_arc: Arc<Mutex<Option<Child>>>) {
ctrlc::set_handler(move || {
MANUAL_STOP.store(true, Ordering::SeqCst);
if let Ok(mut guard) = child_arc.lock() {
if let Some(child) = guard.as_mut() {
if let Err(e) = kill_process_tree(child.id()) {
error!("Failed to kill child process: {}", e);
} else {
info!("GraphQL server process killed");
}
}
}
std::process::exit(0);
})
.expect("Error setting Ctrl-C handler");
}

async fn perform_health_check(
graphql_endpoint: &str,
graphql_playground: &str,
Expand Down Expand Up @@ -336,17 +320,36 @@ async fn perform_health_check(
Ok(())
}

fn kill_process_tree(pid: u32) -> Result<(), String> {
if cfg!(target_os = "windows") {
Command::new("taskkill")
.args(["/PID", &pid.to_string(), "/T", "/F"])
.output()
.map_err(|e| e.to_string())?;
} else {
Command::new("pkill")
.args(["-TERM", "-P", &pid.to_string()])
.output()
.map_err(|e| e.to_string())?;
}
Ok(())
}
// Do not need now with the main shutdown keeping around in-case
// fn setup_ctrlc_handler(child_arc: Arc<Mutex<Option<Child>>>) {
// ctrlc::set_handler(move || {
// MANUAL_STOP.store(true, Ordering::SeqCst);
// if let Ok(mut guard) = child_arc.lock() {
// if let Some(child) = guard.as_mut() {
// if let Err(e) = kill_process_tree(child.id()) {
// error!("Failed to kill child process: {}", e);
// } else {
// info!("GraphQL server process killed");
// }
// }
// }
// std::process::exit(0);
// })
// .expect("Error setting Ctrl-C handler");
// }

// Do not need now with the main shutdown keeping around in-case
// fn kill_process_tree(pid: u32) -> Result<(), String> {
// if cfg!(target_os = "windows") {
// Command::new("taskkill")
// .args(["/PID", &pid.to_string(), "/T", "/F"])
// .output()
// .map_err(|e| e.to_string())?;
// } else {
// Command::new("pkill")
// .args(["-TERM", "-P", &pid.to_string()])
// .output()
// .map_err(|e| e.to_string())?;
// }
// Ok(())
// }
20 changes: 13 additions & 7 deletions core/src/event/callback_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use ethers::{
types::{Bytes, Log, H256, U256, U64},
};
use futures::future::BoxFuture;
use rand::Rng;
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use tracing::{debug, error};
use tracing::{debug, error, info};

use crate::{
event::contract_setup::{ContractInformation, NetworkContract},
indexer::start::ProcessedNetworkContract,
is_running,
provider::WrappedLog,
};

Expand Down Expand Up @@ -144,6 +144,11 @@ impl EventCallbackRegistry {
debug!("{} - Pushed {} events", data.len(), event_information.info_log_name());

loop {
if !is_running() {
info!("Detected shutdown, stopping event trigger");
break;
}

match (event_information.callback)(data.clone()).await {
Ok(_) => {
debug!(
Expand All @@ -153,18 +158,19 @@ impl EventCallbackRegistry {
break;
}
Err(e) => {
if !is_running() {
info!("Detected shutdown, stopping event trigger");
break;
}
attempts += 1;
error!(
"{} Event processing failed - id: {} - topic_id: {}. Retrying... (attempt {}). Error: {}",
event_information.info_log_name(), id, event_information.topic_id, attempts, e
);

sleep(delay).await;
delay = (delay * 2).min(Duration::from_secs(15)); // Max delay of 15 seconds
delay = (delay * 2).min(Duration::from_secs(15));

// add some jitter to the delay to avoid thundering herd problem
let jitter = Duration::from_millis(rand::thread_rng().gen_range(0..1000));
sleep(delay + jitter).await;
sleep(delay).await;
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion core/src/indexer/last_synced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,11 @@ async fn update_last_synced_block_number_for_file(
Ok(())
}

pub fn update_progress_and_last_synced(config: Arc<EventProcessingConfig>, to_block: U64) {
pub fn update_progress_and_last_synced_task(
config: Arc<EventProcessingConfig>,
to_block: U64,
on_complete: impl FnOnce() + Send + 'static,
) {
tokio::spawn(async move {
let update_last_synced_block_result = config
.progress
Expand Down Expand Up @@ -281,5 +285,7 @@ pub fn update_progress_and_last_synced(config: Arc<EventProcessingConfig>, to_bl
);
}
}

on_complete();
});
}
2 changes: 2 additions & 0 deletions core/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ mod last_synced;
pub mod no_code;
mod reorg;
pub mod start;
pub mod task_tracker;

pub use dependency::{ContractEventDependencies, EventDependencies, EventsDependencyTree};

use crate::manifest::contract::Contract;
Expand Down
28 changes: 21 additions & 7 deletions core/src/indexer/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ use crate::{
indexer::{
dependency::{ContractEventsDependenciesConfig, EventDependencies},
fetch_logs::{fetch_logs_stream, FetchLogsResult},
last_synced::update_progress_and_last_synced,
last_synced::update_progress_and_last_synced_task,
log_helpers::is_relevant_block,
progress::IndexingEventProgressStatus,
task_tracker::{indexing_event_processed, indexing_event_processing},
},
is_running,
};

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -501,6 +503,16 @@ async fn live_indexing_for_contract_event_dependencies<'a>(
}
}

async fn trigger_event(
config: Arc<EventProcessingConfig>,
fn_data: Vec<EventResult>,
to_block: U64,
) {
indexing_event_processing();
config.trigger_event(fn_data).await;
update_progress_and_last_synced_task(config, to_block, indexing_event_processed);
}

async fn handle_logs_result(
config: Arc<EventProcessingConfig>,
result: Result<FetchLogsResult, Box<dyn std::error::Error + Send>>,
Expand All @@ -522,17 +534,19 @@ async fn handle_logs_result(
})
.collect::<Vec<_>>();

// if shutting down so do not process anymore event
while !is_running() {
tokio::time::sleep(Duration::from_millis(1000)).await;
}

if !fn_data.is_empty() {
return if config.index_event_in_order {
config.trigger_event(fn_data).await;
update_progress_and_last_synced(config, result.to_block);
Ok(tokio::spawn(async {})) // Return a completed task
trigger_event(config, fn_data, result.to_block).await;
Ok(tokio::spawn(async {}))
} else {
let task = tokio::spawn(async move {
config.trigger_event(fn_data).await;
update_progress_and_last_synced(config, result.to_block);
trigger_event(config, fn_data, result.to_block).await;
});

Ok(task)
}
}
Expand Down
17 changes: 17 additions & 0 deletions core/src/indexer/task_tracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use std::sync::atomic::{AtomicUsize, Ordering};

use once_cell::sync::Lazy;

static INDEXING_TASKS: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0));

pub fn indexing_event_processing() {
INDEXING_TASKS.fetch_add(1, Ordering::SeqCst);
}

pub fn indexing_event_processed() {
INDEXING_TASKS.fetch_sub(1, Ordering::SeqCst);
}

pub fn active_indexing_count() -> usize {
INDEXING_TASKS.load(Ordering::SeqCst)
}
4 changes: 4 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ pub mod generator;
pub mod indexer;
pub mod manifest;

mod system_state;
pub use system_state::{initiate_shutdown, is_running};

mod database;
pub use database::postgres::{
client::{PostgresClient, ToSql},
Expand Down Expand Up @@ -33,6 +36,7 @@ pub mod provider;
mod start;
mod streams;
mod types;

// export 3rd party dependencies
pub use async_trait::async_trait;
pub use colored::Colorize as RindexerColorize;
Expand Down
98 changes: 84 additions & 14 deletions core/src/logger.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,75 @@
use tracing::{debug, level_filters::LevelFilter};
use std::{
io::Write,
sync::atomic::{AtomicBool, Ordering},
};

use once_cell::sync::Lazy;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{
fmt::format::{Format, Writer},
fmt::{
format::{Format, Writer},
MakeWriter,
},
EnvFilter,
};

static SHUTDOWN_IN_PROGRESS: Lazy<AtomicBool> = Lazy::new(|| AtomicBool::new(false));

struct ShutdownAwareWriter {
buffer: std::io::BufWriter<std::io::Stdout>,
}

impl ShutdownAwareWriter {
fn new() -> Self {
Self { buffer: std::io::BufWriter::new(std::io::stdout()) }
}
}

impl Write for ShutdownAwareWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if SHUTDOWN_IN_PROGRESS.load(Ordering::Relaxed) {
// During shutdown, write directly to stdout
let stdout = std::io::stdout();
let mut handle = stdout.lock();
handle.write(buf)
} else {
self.buffer.write(buf)
}
}

fn flush(&mut self) -> std::io::Result<()> {
if SHUTDOWN_IN_PROGRESS.load(Ordering::Relaxed) {
let stdout = std::io::stdout();
let mut handle = stdout.lock();
handle.flush()
} else {
self.buffer.flush()
}
}
}

struct ShutdownAwareWriterMaker;

impl<'a> MakeWriter<'a> for ShutdownAwareWriterMaker {
type Writer = ShutdownAwareWriter;

fn make_writer(&'a self) -> Self::Writer {
ShutdownAwareWriter::new()
}
}

struct CustomTimer;

impl tracing_subscriber::fmt::time::FormatTime for CustomTimer {
fn format_time(&self, writer: &mut Writer<'_>) -> std::fmt::Result {
let now = chrono::Local::now();
write!(writer, "{} - {}", now.format("%d %B"), now.format("%H:%M:%S%.6f"))
// Use a simpler time format during shutdown
if SHUTDOWN_IN_PROGRESS.load(Ordering::Relaxed) {
let now = chrono::Local::now();
write!(writer, "{}", now.format("%H:%M:%S"))
} else {
let now = chrono::Local::now();
write!(writer, "{} - {}", now.format("%d %B"), now.format("%H:%M:%S%.6f"))
}
}
}

Expand All @@ -18,22 +78,32 @@ pub fn setup_logger(log_level: LevelFilter) {

let format = Format::default().with_timer(CustomTimer).with_level(true).with_target(false);

let subscriber =
tracing_subscriber::fmt().with_env_filter(filter).event_format(format).finish();
let subscriber = tracing_subscriber::fmt()
.with_writer(ShutdownAwareWriterMaker)
.with_env_filter(filter)
.event_format(format)
.finish();

if tracing::subscriber::set_global_default(subscriber).is_err() {
debug!("Logger has already been set up, continuing...");
// Use println! here since logging might not be set up yet
println!("Logger has already been set up, continuing...");
}
}

pub fn setup_info_logger() {
setup_logger(LevelFilter::INFO);
}

// pub fn set_no_op_logger() -> DefaultGuard {
// let no_op_subscriber = FmtSubscriber::builder().with_writer(|| NullWriter).finish();
//
// let no_op_dispatch = Dispatch::new(no_op_subscriber);
//
// tracing::dispatcher::set_default(&no_op_dispatch)
// }
// Call this when starting shutdown
pub fn mark_shutdown_started() {
SHUTDOWN_IN_PROGRESS.store(true, Ordering::Relaxed);
}

// Optional guard for temporary logger suppression
pub struct LoggerGuard;

impl Drop for LoggerGuard {
fn drop(&mut self) {
SHUTDOWN_IN_PROGRESS.store(false, Ordering::Relaxed);
}
}
Loading
Loading